This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f93d8bfdc7b Fix CDC processedRecordCount statistics (#29048)
f93d8bfdc7b is described below

commit f93d8bfdc7bdad468a5bff3f99a3df071619ee8b
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 16 11:12:22 2023 +0800

    Fix CDC processedRecordCount statistics (#29048)
---
 .../data/pipeline/cdc/context/CDCJobItemContext.java  | 19 ++++++++++++++++---
 1 file changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index 11f5dc38b00..16577032877 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -18,12 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.cdc.context;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
@@ -32,6 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -44,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
 /**
  * CDC job item context.
  */
-@RequiredArgsConstructor
 @Getter
 public final class CDCJobItemContext implements 
InventoryIncrementalJobItemContext {
     
@@ -92,6 +90,21 @@ public final class CDCJobItemContext implements 
InventoryIncrementalJobItemConte
         }
     };
     
+    public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int 
shardingItem, final InventoryIncrementalJobItemProgress initProgress, final 
CDCProcessContext jobProcessContext,
+                             final CDCTaskConfiguration taskConfig, final 
PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
+        this.jobConfig = jobConfig;
+        this.shardingItem = shardingItem;
+        this.initProgress = initProgress;
+        if (null != initProgress) {
+            processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+            inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
+        }
+        this.jobProcessContext = jobProcessContext;
+        this.taskConfig = taskConfig;
+        this.dataSourceManager = dataSourceManager;
+        this.sink = sink;
+    }
+    
     @Override
     public String getJobId() {
         return jobConfig.getJobId();

Reply via email to