This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4442e3d2d1d Refactor AbstractInseparablePipelineJob (#32748)
4442e3d2d1d is described below

commit 4442e3d2d1d66700b689df143a56176132499961
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 20:22:53 2024 +0800

    Refactor AbstractInseparablePipelineJob (#32748)
    
    * Refactor AbstractInseparablePipelineJob
    
    * Refactor AbstractInseparablePipelineJob
---
 .../core/job/AbstractInseparablePipelineJob.java    | 21 +++++++++++++++++++--
 .../core/job/AbstractSeparablePipelineJob.java      |  4 +---
 .../shardingsphere/data/pipeline/cdc/CDCJob.java    | 14 ++++----------
 .../data/pipeline/cdc/api/CDCJobAPI.java            |  2 +-
 4 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bf320c580ac..b6d4c28eac5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
@@ -30,8 +31,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfig
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -55,6 +59,19 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
+    private final TransmissionProcessContext jobProcessContext;
+    
+    protected AbstractInseparablePipelineJob(final String jobId, final 
PipelineJobRunnerManager jobRunnerManager) {
+        this.jobRunnerManager = jobRunnerManager;
+        jobProcessContext = createTransmissionProcessContext(jobId);
+    }
+    
+    private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
+        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
+                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
+        return new TransmissionProcessContext(jobId, processConfig);
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public final void execute(final ShardingContext shardingContext) {
@@ -71,7 +88,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
                 return;
             }
             P jobItemProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), 
shardingItem).orElse(null);
-            I jobItemContext = buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress);
+            I jobItemContext = buildJobItemContext(jobConfig, shardingItem, 
jobItemProgress, jobProcessContext);
             if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
@@ -88,7 +105,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobConfig
         executeIncrementalTasks(jobItemContexts, jobItemManager);
     }
     
-    protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress);
+    protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P 
jobItemProgress, TransmissionProcessContext jobProcessContext);
     
     protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 9ba500ebd93..5afdc66cb8b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -54,8 +54,6 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
     
     private final TransmissionProcessContext jobProcessContext;
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-    
     protected AbstractSeparablePipelineJob(final String jobId) {
         this(jobId, true);
     }
@@ -67,7 +65,7 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
     
     private TransmissionProcessContext createTransmissionProcessContext(final 
String jobId) {
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobId), 
PipelineJobIdUtils.parseJobType(jobId).getType()));
+                new 
PipelineProcessConfigurationPersistService().load(PipelineJobIdUtils.parseContextKey(jobId),
 PipelineJobIdUtils.parseJobType(jobId).getType()));
         return new TransmissionProcessContext(jobId, processConfig);
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 68ef97f5cae..c210fffc0e0 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -49,9 +49,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunner
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
@@ -72,23 +70,19 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
-    
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
     
     @Getter
     private final PipelineSink sink;
     
-    public CDCJob(final PipelineSink sink) {
-        super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
+    public CDCJob(final String jobId, final PipelineSink sink) {
+        super(jobId, new PipelineJobRunnerManager(new 
CDCJobRunnerCleaner(sink)));
         this.sink = sink;
     }
     
     @Override
-    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem, final TransmissionJobItemProgress 
jobItemProgress) {
-        PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "STREAMING"));
-        TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
+    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig,
+                                                    final int shardingItem, 
final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getProcessConfiguration());
         return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, 
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(), 
sink);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 2e9d6187cf8..805a15c1666 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -223,7 +223,7 @@ public final class CDCJobAPI implements TransmissionJobAPI {
      * @param sink sink
      */
     public void start(final String jobId, final PipelineSink sink) {
-        CDCJob job = new CDCJob(sink);
+        CDCJob job = new CDCJob(jobId, sink);
         PipelineJobRegistry.add(jobId, job);
         enable(jobId);
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);

Reply via email to