This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 750aa707f3d Refactor AbstractInseparablePipelineJob (#32746)
750aa707f3d is described below

commit 750aa707f3d342f4ba96ec7aa2cfc0c85d53b39d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 15:24:55 2024 +0800

    Refactor AbstractInseparablePipelineJob (#32746)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 26 +++++++++++++---------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 10 +++------
 2 files changed, 18 insertions(+), 18 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 48aec69d15e..d1949182d9b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
@@ -45,11 +46,12 @@ import java.util.concurrent.CompletableFuture;
  *
  * @param <T> type of pipeline job configuration
  * @param <I> type of pipeline job item context
+ * @param <P> type of pipeline job item progress
  */
 @RequiredArgsConstructor
 @Getter
 @Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext> implements 
PipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
@@ -65,9 +67,11 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
         T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Collection<I> jobItemContexts = new LinkedList<>();
+        PipelineJobItemManager<P> jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         PipelineGovernanceFacade governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
-            I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
+            P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
+            I jobItemContext = buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress);
             if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
@@ -80,11 +84,11 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
             return;
         }
         prepare(jobItemContexts, governanceFacade);
-        executeInventoryTasks(jobType, jobItemContexts);
-        executeIncrementalTasks(jobType, jobItemContexts);
+        executeInventoryTasks(jobItemContexts, jobItemManager);
+        executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
     
-    protected abstract I buildJobItemContext(T jobConfig, int shardingItem);
+    protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress);
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
@@ -112,10 +116,10 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     protected abstract void processFailed(String jobId);
     
-    private void executeInventoryTasks(final PipelineJobType jobType, final 
Collection<I> jobItemContexts) {
+    private void executeInventoryTasks(final Collection<I> jobItemContexts, 
final PipelineJobItemManager<P> jobItemManager) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (I each : jobItemContexts) {
-            updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INVENTORY_TASK);
+            updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK, 
jobItemManager);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
                     continue;
@@ -129,14 +133,14 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         ExecuteEngine.trigger(futures, buildExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
     }
     
-    private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<I> jobItemContexts) {
+    private void executeIncrementalTasks(final Collection<I> jobItemContexts, 
final PipelineJobItemManager<P> jobItemManager) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (I each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
                 log.info("Job status has already EXECUTE_INCREMENTAL_TASK, 
ignore.");
                 return;
             }
-            updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
+            updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK, 
jobItemManager);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
                     continue;
@@ -147,9 +151,9 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         ExecuteEngine.trigger(futures, buildExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
     }
     
-    private void updateJobItemStatus(final I jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
+    private void updateJobItemStatus(final I jobItemContext, final JobStatus 
jobStatus, final PipelineJobItemManager<P> jobItemManager) {
         jobItemContext.setStatus(jobStatus);
-        new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper()).updateStatus(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
I jobItemContext);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 64c6c8952b8..68ef97f5cae 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -51,7 +51,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -69,12 +68,10 @@ import java.util.stream.Collectors;
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext> {
+public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext, 
TransmissionJobItemProgress> {
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new PipelineJobItemManager<>(new 
CDCJobType().getYamlJobItemProgressSwapper());
-    
     private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
     
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -88,13 +85,12 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
     }
     
     @Override
-    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
-        Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
+    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem, final TransmissionJobItemProgress 
jobItemProgress) {
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "STREAMING"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getProcessConfiguration());
-        return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager(), sink);
+        return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, 
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), 
sink);
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {

Reply via email to