This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f93d8bfdc7b Fix CDC processedRecordCount statistics (#29048)
f93d8bfdc7b is described below
commit f93d8bfdc7bdad468a5bff3f99a3df071619ee8b
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Nov 16 11:12:22 2023 +0800
Fix CDC processedRecordCount statistics (#29048)
---
.../data/pipeline/cdc/context/CDCJobItemContext.java | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
index 11f5dc38b00..16577032877 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/context/CDCJobItemContext.java
@@ -18,12 +18,10 @@
package org.apache.shardingsphere.data.pipeline.cdc.context;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
@@ -32,6 +30,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -44,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* CDC job item context.
*/
-@RequiredArgsConstructor
@Getter
public final class CDCJobItemContext implements
InventoryIncrementalJobItemContext {
@@ -92,6 +90,21 @@ public final class CDCJobItemContext implements
InventoryIncrementalJobItemConte
}
};
+ public CDCJobItemContext(final CDCJobConfiguration jobConfig, final int
shardingItem, final InventoryIncrementalJobItemProgress initProgress, final
CDCProcessContext jobProcessContext,
+ final CDCTaskConfiguration taskConfig, final
PipelineDataSourceManager dataSourceManager, final PipelineSink sink) {
+ this.jobConfig = jobConfig;
+ this.shardingItem = shardingItem;
+ this.initProgress = initProgress;
+ if (null != initProgress) {
+ processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+ inventoryRecordsCount.set(initProgress.getInventoryRecordsCount());
+ }
+ this.jobProcessContext = jobProcessContext;
+ this.taskConfig = taskConfig;
+ this.dataSourceManager = dataSourceManager;
+ this.sink = sink;
+ }
+
@Override
public String getJobId() {
return jobConfig.getJobId();