This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7206d6b0f12 Remove TransmissionJobOption.buildProcessContext (#29261)
7206d6b0f12 is described below
commit 7206d6b0f1252a56985803f81a3e7b3930f408eb
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 13:03:10 2023 +0800
Remove TransmissionJobOption.buildProcessContext (#29261)
---
.../data/pipeline/core/job/option/TransmissionJobOption.java | 8 --------
.../java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 6 +++++-
.../apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java | 8 --------
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 9 +++++++--
.../data/pipeline/scenario/migration/MigrationJob.java | 7 ++++++-
.../data/pipeline/scenario/migration/MigrationJobOption.java | 8 --------
.../scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++++-
7 files changed, 23 insertions(+), 29 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index 3582b71ebe6..587a2ddb5df 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -43,14 +43,6 @@ public interface TransmissionJobOption extends
PipelineJobOption {
*/
PipelineJobInfo getJobInfo(String jobId);
- /**
- * Build transmission process context.
- *
- * @param jobConfig pipeline job configuration
- * @return transmission process context
- */
- TransmissionProcessContext buildProcessContext(PipelineJobConfiguration
jobConfig);
-
/**
* Build pipeline data consistency checker.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 2daeb29de80..9f1976a72ff 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -58,6 +58,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -89,6 +90,8 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
+ private final TransmissionJobManager transmissionJobManager = new
TransmissionJobManager(jobOption);
+
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@@ -129,7 +132,8 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
- TransmissionProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
+ PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index fe6a30238d6..e37492fd1de 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -30,7 +29,6 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
/**
* CDC job option.
@@ -61,12 +59,6 @@ public final class CDCJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
- @Override
- public TransmissionProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- return new TransmissionProcessContext(jobConfig.getJobId(),
processConfig);
- }
-
@Override
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 298a0a7706a..a93dcf5aed1 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
@@ -30,11 +32,12 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -105,7 +108,9 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
- PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(parentJobConfig,
jobOption.buildProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
+ PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(
+ parentJobConfig, new
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index b92fc6eb22f..6dedfc50e04 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,9 @@ import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtrac
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -67,6 +69,8 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final TransmissionJobManager transmissionJobManager = new
TransmissionJobManager(jobOption);
+
private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@@ -83,7 +87,8 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
- TransmissionProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
+ PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index 5e70e0a5360..378b8df3d5d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -29,7 +28,6 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDat
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
@@ -74,12 +72,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}
- @Override
- public TransmissionProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- return new TransmissionProcessContext(jobConfig.getJobId(),
processConfig);
- }
-
@Override
public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 612f482f987..992f68154a4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.i
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
@@ -190,8 +192,10 @@ class MigrationJobAPITest {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
jobManager.start(jobConfig);
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobOption.buildDataConsistencyChecker(
- jobConfig, jobOption.buildProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0,
"H2")).check("FIXTURE", null);
+ jobConfig, processContext, new
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0,
"H2")).check("FIXTURE", null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).isMatched());