nizhikov commented on code in PR #11044:
URL: https://github.com/apache/ignite/pull/11044#discussion_r1400015882
##########
modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java:
##########
@@ -530,55 +561,126 @@ private void consumeSegment(Path segment) {
builder.from(walState.get1());
}
+ if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
+ if (consumeSegmentPassively(builder))
+ return true;
+ }
+ else
+ consumeSegmentActively(builder);
+
+ processedSegments.add(segment);
+
+ return false;
+ }
+
+ /**
+ * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
+ */
+ private void
consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder
builder) {
+ builder.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type ==
CDC_DATA_RECORD);
+
try (DataEntryIterator iter = new DataEntryIterator(new
IgniteWalIteratorFactory(log).iterator(builder))) {
if (walState != null) {
iter.init(walState.get2());
walState = null;
Review Comment:
Let's nullify `walState` in the caller -
```
if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
if (consumeSegmentPassively(builder))
return true;
}
else
consumeSegmentActively(builder);
walState = null;
```
##########
modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java:
##########
@@ -530,55 +561,126 @@ private void consumeSegment(Path segment) {
builder.from(walState.get1());
}
+ if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
+ if (consumeSegmentPassively(builder))
+ return true;
+ }
+ else
+ consumeSegmentActively(builder);
+
+ processedSegments.add(segment);
+
+ return false;
+ }
+
+ /**
+ * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
+ */
+ private void
consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder
builder) {
+ builder.addFilter((type, ptr) -> type == DATA_RECORD_V2 || type ==
CDC_DATA_RECORD);
+
try (DataEntryIterator iter = new DataEntryIterator(new
IgniteWalIteratorFactory(log).iterator(builder))) {
if (walState != null) {
iter.init(walState.get2());
walState = null;
Review Comment:
Let's nullify `walState` in the caller -
```java
if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
if (consumeSegmentPassively(builder))
return true;
}
else
consumeSegmentActively(builder);
walState = null;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]