nizhikov commented on code in PR #11044:
URL: https://github.com/apache/ignite/pull/11044#discussion_r1400015610


##########
modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java:
##########
@@ -530,55 +561,126 @@ private void consumeSegment(Path segment) {
             builder.from(walState.get1());
         }
 
+        if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
+            if (consumeSegmentPassively(builder))
+                return true;
+        }
+        else
+            consumeSegmentActively(builder);
+
+        processedSegments.add(segment);
+
+        return false;
+    }
+
+    /**
+     * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
+     */
+    private void 
consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder 
builder) {
+        builder.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == 
CDC_DATA_RECORD);
+
         try (DataEntryIterator iter = new DataEntryIterator(new 
IgniteWalIteratorFactory(log).iterator(builder))) {
             if (walState != null) {
                 iter.init(walState.get2());
 
                 walState = null;
             }
 
-            boolean interrupted = false;
+            boolean interrupted;
 
             do {
                 boolean commit = consumer.onRecords(iter);
 
-                if (commit) {
-                    T2<WALPointer, Integer> curState = iter.state();
+                if (commit)
+                    saveState(iter.state());
 
-                    if (curState == null)
-                        continue;
+                interrupted = Thread.interrupted();
+            } while (iter.hasNext() && !interrupted);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Saving state [curState=" + curState + ']');
+            if (interrupted)
+                throw new IgniteException("Change Data Capture Application 
interrupted");
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+    }
 
-                    state.saveWal(curState);
+    /** Saves WAL state. */
+    private void saveState(T2<WALPointer, Integer> curState) throws 
IOException {
+        if (curState == null)
+            return;
 
-                    committedSegmentIdx.value(curState.get1().index());
-                    committedSegmentOffset.value(curState.get1().fileOffset());
+        if (log.isDebugEnabled())
+            log.debug("Saving state [curState=" + curState + ']');
 
-                    // Can delete after new file state save.
-                    if (!processedSegments.isEmpty()) {
-                        // WAL segment is a hard link to a segment file in a 
specifal Change Data Capture folder.
-                        // So we can safely delete it after success processing.
-                        for (Path processedSegment : processedSegments) {
-                            // Can't delete current segment, because state 
points to it.
-                            if (processedSegment.equals(segment))
-                                continue;
+        state.saveWal(curState);
 
-                            Files.delete(processedSegment);
-                        }
+        committedSegmentIdx.value(curState.get1().index());
+        committedSegmentOffset.value(curState.get1().fileOffset());
 
-                        processedSegments.clear();
-                    }
+        // Can delete after new file state save.
+        if (!processedSegments.isEmpty()) {
+            Set<Path> processingSegments = new HashSet<>(processedSegments);
+
+            // WAL segment is a hard link to a segment file in a specifal 
Change Data Capture folder.
+            // So we can safely delete it after success processing.
+            for (Path processedSegment : processedSegments) {
+                // Can't delete current segment, because state points to it.
+                if (segmentIndex(processedSegment) >= curState.get1().index())
+                    continue;
+
+                Files.delete(processedSegment);
+
+                processingSegments.remove(processedSegment);
+            }
+
+            processedSegments.clear();
+            processedSegments.addAll(processingSegments);
+        }
+    }
+
+    /**
+     * Consumes CDC events in {@link CdcMode#IGNITE_NODE_ACTIVE} mode.
+     *
+     * @return {@code true} if mode switched.
+     */
+    private boolean 
consumeSegmentPassively(IgniteWalIteratorFactory.IteratorParametersBuilder 
builder) {
+        builder.addFilter((type, ptr) -> type == CDC_MANAGER_STOP_RECORD || 
type == CDC_MANAGER_RECORD);
+
+        try (WALIterator iter = new 
IgniteWalIteratorFactory(log).iterator(builder)) {
+            walState = null;

Review Comment:
   Let's nullify `walState` in the caller - 
   ```
           if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
               if (consumeSegmentPassively(builder))
                   return true;
           }
           else
               consumeSegmentActively(builder);
   
           walState = null;



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to