This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e905a381f2d Persist job progress when start incremental task of CDC 
(#26768)
e905a381f2d is described below

commit e905a381f2d038ce4ce8383818c89c11afa9469b
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Jul 5 15:34:13 2023 +0800

    Persist job progress when start incremental task of CDC (#26768)
---
 .../apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java    | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 1d7ee243f9a..b93aa324d3b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
@@ -36,6 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
 import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
@@ -181,6 +182,9 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 futures.addAll(task.start());
             }
         }
+        for (CDCJobItemContext each : jobItemContexts) {
+            each.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
+        }
         ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.get(0)));
     }
     

Reply via email to