This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ab67665e6c8 Fix incorrect count under concurrency of CDC job (#24561)
ab67665e6c8 is described below

commit ab67665e6c8d6048250ee4921985642d9aa6729b
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Mar 13 09:21:02 2023 +0800

    Fix incorrect count under concurrency of CDC job (#24561)
    
    * Fix date record count not correctly at CDC ack position
    
    * Remove set status from init progress at CDC job item
---
 .../cdc/context/job/CDCJobItemContext.java         |  3 --
 .../data/pipeline/cdc/core/ack/CDCAckPosition.java | 34 ++++++++++++++++++----
 .../data/pipeline/cdc/util/CDCDataRecordUtil.java  |  2 +-
 .../pipeline/cdc/core/ack/CDCAckHolderTest.java    |  2 +-
 4 files changed, 31 insertions(+), 10 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
index f59fd9eb76b..e5111de8626 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/job/CDCJobItemContext.java
@@ -101,9 +101,6 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
         this.taskConfig = taskConfig;
         this.dataSourceManager = dataSourceManager;
         this.importerConnector = importerConnector;
-        if (null != initProgress) {
-            status = initProgress.getStatus();
-        }
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
index 469aa3870bb..a62ad348b9d 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckPosition.java
@@ -17,27 +17,51 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.core.ack;
 
-import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * CDC ack position.
  */
 @Getter
-@AllArgsConstructor
 public final class CDCAckPosition {
     
     @Setter
     private Record lastRecord;
     
-    @Setter
-    private int dataRecordCount;
+    private final AtomicInteger dataRecordCount = new AtomicInteger();
     
     private final long createTimeMills;
     
     public CDCAckPosition(final Record lastRecord, final int dataRecordCount) {
-        this(lastRecord, dataRecordCount, System.currentTimeMillis());
+        this.lastRecord = lastRecord;
+        this.dataRecordCount.set(dataRecordCount);
+        createTimeMills = System.currentTimeMillis();
+    }
+    
+    public CDCAckPosition(final Record lastRecord, final long createTimeMills) 
{
+        this.lastRecord = lastRecord;
+        this.createTimeMills = createTimeMills;
+    }
+    
+    /**
+     * Add data record count.
+     *
+     * @param count count.
+     */
+    public void addDataRecordCount(final int count) {
+        dataRecordCount.addAndGet(count);
+    }
+    
+    /**
+     * Get data record count.
+     *
+     * @return data record count.
+     */
+    public int getDataRecordCount() {
+        return dataRecordCount.get();
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
index bd3edd8cb10..1781fbd4aba 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCDataRecordUtil.java
@@ -69,7 +69,7 @@ public final class CDCDataRecordUtil {
             cdcAckPositionMap.put(socketSinkImporter, new 
CDCAckPosition(record, 1));
         } else {
             cdcAckPosition.setLastRecord(record);
-            
cdcAckPosition.setDataRecordCount(cdcAckPosition.getDataRecordCount());
+            
cdcAckPosition.addDataRecordCount(cdcAckPosition.getDataRecordCount());
         }
     }
     
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
index c5ac4e0f8b9..da67301a881 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/ack/CDCAckHolderTest.java
@@ -54,7 +54,7 @@ public final class CDCAckHolderTest {
         CDCAckHolder cdcAckHolder = CDCAckHolder.getInstance();
         final Map<SocketSinkImporter, CDCAckPosition> importerDataRecordMap = 
new HashMap<>();
         SocketSinkImporter socketSinkImporter = mock(SocketSinkImporter.class);
-        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new 
FinishedRecord(new FinishedPosition()), 0, System.currentTimeMillis() - 60 * 
1000 * 10));
+        importerDataRecordMap.put(socketSinkImporter, new CDCAckPosition(new 
FinishedRecord(new FinishedPosition()), System.currentTimeMillis() - 60 * 1000 
* 10));
         cdcAckHolder.bindAckIdWithPosition(importerDataRecordMap);
         cdcAckHolder.cleanUp(socketSinkImporter);
         Optional<Map<String, Map<SocketSinkImporter, CDCAckPosition>>> 
ackIdPositionMap = ReflectionUtil.getFieldValue(cdcAckHolder, 
"ackIdPositionMap");

Reply via email to