nizhikov commented on code in PR #11044:
URL: https://github.com/apache/ignite/pull/11044#discussion_r1400019549


##########
modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java:
##########
@@ -491,8 +523,7 @@ private void consumeSegment(Path segment) {
                 .marshallerMappingFileStoreDir(marshaller)
                 .igniteConfigurationModifier((cfg) -> 
cfg.setPluginProviders(igniteCfg.getPluginProviders()))
                 .keepBinary(cdcCfg.isKeepBinary())
-                .filesOrDirs(segment.toFile())
-                .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == 
CDC_DATA_RECORD);
+                .filesOrDirs(segment.toFile());

Review Comment:
   Ca we, please extract lines from 538 to 559 into method.
   This will simplify `consumeSegment` logic a lot.
   And place this method below the `updateCaches`.
   ```java
       /**
        * Remove segment file if it already processed.
        * @return {@code True} if segment file was deleted, {@code false} 
otherwise.
        */
       private boolean removeProcessedSegmentOnFailover(Path segment, long 
segmentIdx) {
           if (segmentIdx > walState.get1().index()) {
               throw new IgniteException("Found segment greater then saved 
state. Some events are missed. Exiting! " +
                   "[state=" + walState + ", segment=" + segmentIdx + ']');
           }
   
           if (segmentIdx < walState.get1().index()) {
               if (log.isInfoEnabled()) {
                   log.info("Already processed segment found. Skipping and 
deleting the file [segment=" +
                       segmentIdx + ", state=" + walState.get1().index() + ']');
               }
   
               // WAL segment is a hard link to a segment file in the special 
Change Data Capture folder.
               // So, we can safely delete it after processing.
               try {
                   Files.delete(segment);
   
                   return true;
               }
               catch (IOException e) {
                   throw new IgniteException(e);
               }
           }
           return false;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to