This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7206d6b0f12 Remove TransmissionJobOption.buildProcessContext (#29261)
7206d6b0f12 is described below

commit 7206d6b0f1252a56985803f81a3e7b3930f408eb
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 13:03:10 2023 +0800

    Remove TransmissionJobOption.buildProcessContext (#29261)
---
 .../data/pipeline/core/job/option/TransmissionJobOption.java     | 8 --------
 .../java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 6 +++++-
 .../apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java    | 8 --------
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java       | 9 +++++++--
 .../data/pipeline/scenario/migration/MigrationJob.java           | 7 ++++++-
 .../data/pipeline/scenario/migration/MigrationJobOption.java     | 8 --------
 .../scenario/migration/api/impl/MigrationJobAPITest.java         | 6 +++++-
 7 files changed, 23 insertions(+), 29 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index 3582b71ebe6..587a2ddb5df 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -43,14 +43,6 @@ public interface TransmissionJobOption extends 
PipelineJobOption {
      */
     PipelineJobInfo getJobInfo(String jobId);
     
-    /**
-     * Build transmission process context.
-     *
-     * @param jobConfig pipeline job configuration
-     * @return transmission process context
-     */
-    TransmissionProcessContext buildProcessContext(PipelineJobConfiguration 
jobConfig);
-    
     /**
      * Build pipeline data consistency checker.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 2daeb29de80..9f1976a72ff 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -58,6 +58,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -89,6 +90,8 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
     
+    private final TransmissionJobManager transmissionJobManager = new 
TransmissionJobManager(jobOption);
+    
     private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
@@ -129,7 +132,8 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
-        TransmissionProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
+        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index fe6a30238d6..e37492fd1de 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -30,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 
 /**
  * CDC job option.
@@ -61,12 +59,6 @@ public final class CDCJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
-    @Override
-    public TransmissionProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        return new TransmissionProcessContext(jobConfig.getJobId(), 
processConfig);
-    }
-    
     @Override
     public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
                                                                       final 
ConsistencyCheckJobItemProgressContext progressContext) {
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 298a0a7706a..a93dcf5aed1 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -30,11 +32,12 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -105,7 +108,9 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
             try {
-                PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(parentJobConfig, 
jobOption.buildProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
+                PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
+                PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(
+                        parentJobConfig, new 
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
                 Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
                 log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index b92fc6eb22f..6dedfc50e04 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtrac
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -67,6 +69,8 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
     
     private final MigrationJobOption jobOption = new MigrationJobOption();
     
+    private final TransmissionJobManager transmissionJobManager = new 
TransmissionJobManager(jobOption);
+    
     private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
@@ -83,7 +87,8 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
-        TransmissionProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
+        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 5e70e0a5360..378b8df3d5d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -29,7 +28,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
@@ -74,12 +72,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
     
-    @Override
-    public TransmissionProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
-        return new TransmissionProcessContext(jobConfig.getJobId(), 
processConfig);
-    }
-    
     @Override
     public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
                                                                       final 
ConsistencyCheckJobItemProgressContext progressContext) {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 612f482f987..992f68154a4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.i
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
@@ -190,8 +192,10 @@ class MigrationJobAPITest {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         initTableData(jobConfig);
         jobManager.start(jobConfig);
+        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobOption.buildDataConsistencyChecker(
-                jobConfig, jobOption.buildProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, 
"H2")).check("FIXTURE", null);
+                jobConfig, processContext, new 
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, 
"H2")).check("FIXTURE", null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "t_order";
         assertTrue(checkResultMap.get(checkKey).isMatched());

Reply via email to