This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ab67665e6c8 Fix incorrect count under concurrency of CDC job (#24561)
ab67665e6c8 is described below
commit ab67665e6c8d6048250ee4921985642d9aa6729b
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Mar 13 09:21:02 2023 +0800
Fix incorrect count under concurrency of CDC job (#24561)
* Fix date record count not correctly at CDC ack position
* Remove set status from init progress at CDC job item
---
.../cdc/context/job/CDCJobItemContext.java | 3 --
.../data/pipeline/cdc/core/ack/CDCAckPosition.java | 34 ++++++++++++++++++----
.../data/pipeline/cdc/util/CDCDataRecordUtil.java | 2 +-
.../pipeline/cdc/core/ack/CDCAckHolderTest.java | 2 +-
4 files changed, 31 insertions(+), 10 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index f59fd9eb76b..e5111de8626 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -101,9 +101,6 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
this.taskConfig = taskConfig;
this.dataSourceManager = dataSourceManager;
this.importerConnector = importerConnector;
- if (null != initProgress) {
- status = initProgress.getStatus();
- }
}
@Override
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index 469aa3870bb..a62ad348b9d 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -17,27 +17,51 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
-import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* CDC ack position.
*/
@Getter
-@AllArgsConstructor
public final class CDCAckPosition {
@Setter
private Record lastRecord;
- @Setter
- private int dataRecordCount;
+ private final AtomicInteger dataRecordCount = new AtomicInteger();
private final long createTimeMills;
public CDCAckPosition(final Record lastRecord, final int dataRecordCount) {
- this(lastRecord, dataRecordCount, System.currentTimeMillis());
+ this.lastRecord = lastRecord;
+ this.dataRecordCount.set(dataRecordCount);
+ createTimeMills = System.currentTimeMillis();
+ }
+
+ public CDCAckPosition(final Record lastRecord, final long createTimeMills)
{
+ this.lastRecord = lastRecord;
+ this.createTimeMills = createTimeMills;
+ }
+
+ /**
+ * Add data record count.
+ *
+ * @param count count.
+ */
+ public void addDataRecordCount(final int count) {
+ dataRecordCount.addAndGet(count);
+ }
+
+ /**
+ * Get data record count.
+ *
+ * @return data record count.
+ */
+ public int getDataRecordCount() {
+ return dataRecordCount.get();
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
index bd3edd8cb10..1781fbd4aba 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -69,7 +69,7 @@ public final class CDCDataRecordUtil {
cdcAckPositionMap.put(socketSinkImporter, new
CDCAckPosition(record, 1));
} else {
cdcAckPosition.setLastRecord(record);
-
cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
+
cdcAckPosition.addDataRecordCount(cdcAckPosition.getDataRecordCount());
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
index c5ac4e0f8b9..da67301a881 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -54,7 +54,7 @@ public final class CDCAckHolderTest {
CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap =
new HashMap<>();
SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
- importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new
FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 *
1000 * 10));
+ importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new
FinishedRecord(new FinishedPosition()), System.currentTimeMillis() - 60 * 1000
* 10));
cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
cdcAckHolder.cleanUp(socketSinkImporter);
Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>>
ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder,
"ackIdPositionMap");