This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e905a381f2d Persist job progress when start incremental task of CDC
(#26768)
e905a381f2d is described below
commit e905a381f2d038ce4ce8383818c89c11afa9469b
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Jul 5 15:34:13 2023 +0800
Persist job progress when start incremental task of CDC (#26768)
---
.../apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 1d7ee243f9a..b93aa324d3b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
@@ -36,6 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.listener.PipelineJobProgressUpdatedParameter;
import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
@@ -181,6 +182,9 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
futures.addAll(task.start());
}
}
+ for (CDCJobItemContext each : jobItemContexts) {
+ each.onProgressUpdated(new PipelineJobProgressUpdatedParameter(0));
+ }
ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.get(0)));
}