This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ada091156d Add generic type of AbstractInseparablePipelineJob (#29340)
8ada091156d is described below

commit 8ada091156d815cb6917223f7715a38b9411d471
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 18:08:33 2023 +0800

    Add generic type of AbstractInseparablePipelineJob (#29340)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 30 +++++++-------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 47 ++++++++++------------
 2 files changed, 38 insertions(+), 39 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 157fd259845..9dcde50e8e2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -36,9 +36,11 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstract inseparable pipeline job.
+ * 
+ * @param <T> type of pipeline job item context
  */
 @Slf4j
-public abstract class AbstractInseparablePipelineJob extends 
AbstractPipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> extends AbstractPipelineJob {
     
     private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
     
@@ -52,13 +54,13 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
         PipelineJobConfiguration jobConfig = 
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        Collection<PipelineJobItemContext> jobItemContexts = new 
LinkedList<>();
+        Collection<T> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (isStopping()) {
                 log.info("Stopping true, ignore");
                 return;
             }
-            PipelineJobItemContext jobItemContext = 
buildJobItemContext(jobConfig, shardingItem);
+            T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
             if (!addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
@@ -75,11 +77,11 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         executeIncrementalTasks(jobItemContexts);
     }
     
-    protected abstract PipelineJobItemContext 
buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem);
+    protected abstract T buildJobItemContext(PipelineJobConfiguration 
jobConfig, int shardingItem);
     
-    protected abstract PipelineTasksRunner 
buildTasksRunner(PipelineJobItemContext jobItemContext);
+    protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
     
-    private void prepare(final Collection<PipelineJobItemContext> 
jobItemContexts) {
+    private void prepare(final Collection<T> jobItemContexts) {
         try {
             doPrepare(jobItemContexts);
             // CHECKSTYLE:OFF
@@ -92,7 +94,7 @@ public abstract class AbstractInseparablePipelineJob extends 
AbstractPipelineJob
         }
     }
     
-    protected abstract void doPrepare(Collection<PipelineJobItemContext> 
jobItemContexts);
+    protected abstract void doPrepare(Collection<T> jobItemContexts);
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
@@ -103,9 +105,9 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
     
     protected abstract void processFailed(String jobId);
     
-    private void executeInventoryTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
+    private void executeInventoryTasks(final Collection<T> jobItemContexts) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (PipelineJobItemContext each : jobItemContexts) {
+        for (T each : jobItemContexts) {
             updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
FinishedPosition) {
@@ -120,17 +122,17 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         executeInventoryTasks(futures, jobItemContexts);
     }
     
-    protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, 
Collection<PipelineJobItemContext> jobItemContexts);
+    protected abstract void 
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T> 
jobItemContexts);
     
-    private void updateJobItemStatus(final PipelineJobItemContext 
jobItemContext, final JobStatus jobStatus) {
+    private void updateJobItemStatus(final T jobItemContext, final JobStatus 
jobStatus) {
         jobItemContext.setStatus(jobStatus);
         jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
-    private void executeIncrementalTasks(final 
Collection<PipelineJobItemContext> jobItemContexts) {
+    private void executeIncrementalTasks(final Collection<T> jobItemContexts) {
         log.info("Execute incremental tasks, jobId={}", getJobId());
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (PipelineJobItemContext each : jobItemContexts) {
+        for (T each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
                 log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
                 return;
@@ -146,5 +148,5 @@ public abstract class AbstractInseparablePipelineJob 
extends AbstractPipelineJob
         executeIncrementalTasks(futures, jobItemContexts);
     }
     
-    protected abstract void 
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, 
Collection<PipelineJobItemContext> jobItemContexts);
+    protected abstract void 
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, Collection<T> 
jobItemContexts);
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 81cf62b827e..009ea5df9f6 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -30,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketS
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
@@ -74,7 +73,7 @@ import java.util.stream.Collectors;
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends AbstractInseparablePipelineJob implements 
SimpleJob {
+public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemContext> implements SimpleJob {
     
     @Getter
     private final PipelineSink sink;
@@ -100,51 +99,49 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob implements Simp
     }
     
     @Override
-    protected PipelineJobItemContext buildJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
+    protected CDCJobItemContext buildJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.convertWithDefaultValue(
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 getJobType().getType()));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
-        log.debug("buildTaskConfiguration, result={}", taskConfig);
         return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
dataSourceManager, sink);
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
-        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+        TableAndSchemaNameMapper mapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, mapper);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), mapper);
         return new CDCTaskConfiguration(dumperContext, importerConfig);
     }
     
-    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper mapper) {
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
         String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
         StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        return new IncrementalDumperContext(
-                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
-                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+        DumperCommonContext dumperCommonContext = new 
DumperCommonContext(dataSourceName, actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
+        return new IncrementalDumperContext(dumperCommonContext, 
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
-                                                             final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
-                jobConfig.getDataSourceConfig().getParameter());
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+                                                             final 
TableAndSchemaNameMapper mapper) {
+        PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(
+                jobConfig.getDataSourceConfig().getType(), 
jobConfig.getDataSourceConfig().getParameter());
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
                 
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
TransmissionProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
mapper, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
     @Override
-    protected PipelineTasksRunner buildTasksRunner(final 
PipelineJobItemContext jobItemContext) {
-        return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+    protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext 
jobItemContext) {
+        return new CDCTasksRunner(jobItemContext);
     }
     
     @Override
-    protected void doPrepare(final Collection<PipelineJobItemContext> 
jobItemContexts) {
-        jobPreparer.initTasks(jobItemContexts.stream().map(each -> 
(CDCJobItemContext) each).collect(Collectors.toList()));
+    protected void doPrepare(final Collection<CDCJobItemContext> 
jobItemContexts) {
+        jobPreparer.initTasks(jobItemContexts);
     }
     
     @Override
@@ -153,13 +150,13 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob implements Simp
     }
     
     @Override
-    protected void executeInventoryTasks(final 
Collection<CompletableFuture<?>> futures, final 
Collection<PipelineJobItemContext> jobItemContexts) {
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
(CDCJobItemContext) jobItemContexts.iterator().next()));
+    protected void executeInventoryTasks(final 
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> 
jobItemContexts) {
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
     }
     
     @Override
-    protected void executeIncrementalTasks(final 
Collection<CompletableFuture<?>> futures, final 
Collection<PipelineJobItemContext> jobItemContexts) {
-        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
(CDCJobItemContext) jobItemContexts.iterator().next()));
+    protected void executeIncrementalTasks(final 
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext> 
jobItemContexts) {
+        ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
     }
     
     @Override

Reply via email to