This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ee50359dbd8 Refactor CDCJob (#29338)
ee50359dbd8 is described below

commit ee50359dbd8bee8b5a8d767ae4b55dc080eb5aba
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 16:21:52 2023 +0800

    Refactor CDCJob (#29338)
    
    * Refactor AbstractInseparablePipelineJob
    
    * Refactor CDCJob
---
 .../core/job/AbstractInseparablePipelineJob.java   | 32 ++++++------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 57 ++++++++++------------
 2 files changed, 40 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 845d3605f0a..157fd259845 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -51,23 +51,23 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
-        PipelineJobConfiguration jobConfig = 
getJobConfiguration(shardingContext);
+        PipelineJobConfiguration jobConfig = 
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Collection<PipelineJobItemContext> jobItemContexts = new 
LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (isStopping()) {
-                log.info("stopping true, ignore");
+                log.info("Stopping true, ignore");
                 return;
             }
-            PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(jobConfig, shardingItem);
-            if (!addTasksRunner(shardingItem, 
buildPipelineTasksRunner(jobItemContext))) {
+            PipelineJobItemContext jobItemContext = 
buildJobItemContext(jobConfig, shardingItem);
+            if (!addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
             
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
-            log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
+            log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
-            log.warn("job item contexts empty, ignore");
+            log.warn("Job item contexts is empty, ignore");
             return;
         }
         prepare(jobItemContexts);
@@ -75,15 +75,13 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         executeIncrementalTasks(jobItemContexts);
     }
     
-    protected abstract PipelineJobConfiguration 
getJobConfiguration(ShardingContext shardingContext);
+    protected abstract PipelineJobItemContext 
buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem);
     
-    protected abstract PipelineJobItemContext 
buildPipelineJobItemContext(PipelineJobConfiguration jobConfig, int 
shardingItem);
-    
-    protected abstract PipelineTasksRunner 
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+    protected abstract PipelineTasksRunner 
buildTasksRunner(PipelineJobItemContext jobItemContext);
     
     private void prepare(final Collection<PipelineJobItemContext> 
jobItemContexts) {
         try {
-            doPrepare0(jobItemContexts);
+            doPrepare(jobItemContexts);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
@@ -94,10 +92,10 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         }
     }
     
-    protected abstract void doPrepare0(Collection<PipelineJobItemContext> 
jobItemContexts);
+    protected abstract void doPrepare(Collection<PipelineJobItemContext> 
jobItemContexts);
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
-        log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+        log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         PipelineJobRegistry.stop(jobId);
         processFailed(jobId);
@@ -108,7 +106,7 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
     private void executeInventoryTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (PipelineJobItemContext each : jobItemContexts) {
-            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INVENTORY_TASK);
+            updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
                     continue;
@@ -124,20 +122,20 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
     
     protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, 
Collection<PipelineJobItemContext> jobItemContexts);
     
-    private void updateLocalAndRemoteJobItemStatus(final 
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+    private void updateJobItemStatus(final PipelineJobItemContext 
jobItemContext, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
         jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
     private void executeIncrementalTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
-        log.info("execute incremental tasks, jobId={}", getJobId());
+        log.info("Execute incremental tasks, jobId={}", getJobId());
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (PipelineJobItemContext each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
                 log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
                 return;
             }
-            updateLocalAndRemoteJobItemStatus(each, 
JobStatus.EXECUTE_INCREMENTAL_TASK);
+            updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getIncrementalTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
                     continue;
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 3150895b804..81cf62b827e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -25,7 +25,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
@@ -55,13 +54,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -82,55 +79,42 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob implements Simp
     @Getter
     private final PipelineSink sink;
     
-    private final PipelineJobType jobType = 
TypedSPILoader.getService(PipelineJobType.class, "STREAMING");
+    private final CDCJobAPI jobAPI;
     
-    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
     
-    private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+    private final PipelineProcessConfigurationPersistService 
processConfigPersistService;
     
-    private final PipelineProcessConfigurationPersistService 
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+    private final PipelineDataSourceManager dataSourceManager;
     
-    private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
-    
-    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
+    private final CDCJobPreparer jobPreparer;
     
     public CDCJob(final String jobId, final PipelineSink sink) {
         super(jobId);
         this.sink = sink;
+        jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+        jobItemManager = new 
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+        processConfigPersistService = new 
PipelineProcessConfigurationPersistService();
+        dataSourceManager = new DefaultPipelineDataSourceManager();
+        jobPreparer = new CDCJobPreparer();
     }
     
     @Override
-    protected PipelineJobConfiguration getJobConfiguration(final 
ShardingContext shardingContext) {
-        return new 
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-    }
-    
-    @Override
-    protected PipelineJobItemContext buildPipelineJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
+    protected PipelineJobItemContext buildJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 jobType.getType()));
+                
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 getJobType().getType()));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        log.debug("buildTaskConfiguration, result={}", taskConfig);
         return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
dataSourceManager, sink);
     }
     
-    @Override
-    protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext jobItemContext) {
-        return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
-    }
-    
-    @Override
-    protected void doPrepare0(final Collection<PipelineJobItemContext> 
jobItemContexts) {
-        jobPreparer.initTasks(jobItemContexts.stream().map(each -> 
(CDCJobItemContext) each).collect(Collectors.toList()));
-    }
-    
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
         IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
-        CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
-        log.debug("buildTaskConfiguration, result={}", result);
-        return result;
+        return new CDCTaskConfiguration(dumperContext, importerConfig);
     }
     
     private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
@@ -146,14 +130,23 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob implements Simp
                                                              final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
                 jobConfig.getDataSourceConfig().getParameter());
-        TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
                 
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
+    @Override
+    protected PipelineTasksRunner buildTasksRunner(final 
PipelineJobItemContext jobItemContext) {
+        return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+    }
+    
+    @Override
+    protected void doPrepare(final Collection<PipelineJobItemContext> 
jobItemContexts) {
+        jobPreparer.initTasks(jobItemContexts.stream().map(each -> 
(CDCJobItemContext) each).collect(Collectors.toList()));
+    }
+    
     @Override
     protected void processFailed(final String jobId) {
         jobAPI.disable(jobId);

Reply via email to