This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 51834fc05ff Refactor TransmissionJobManager.showProcessConfiguration
(#29262)
51834fc05ff is described below
commit 51834fc05ff37b5b21d0dcb3a831d46290f588f5
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 13:29:56 2023 +0800
Refactor TransmissionJobManager.showProcessConfiguration (#29262)
---
.../pipeline/core/job/service/TransmissionJobManager.java | 14 ++++++++++++--
.../apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 2 +-
.../consistencycheck/task/ConsistencyCheckTasksRunner.java | 2 +-
.../data/pipeline/scenario/migration/MigrationJob.java | 3 +--
.../scenario/migration/api/impl/MigrationJobAPITest.java | 2 +-
5 files changed, 16 insertions(+), 7 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 0bf66b41e5c..5107bf48695 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -62,10 +62,20 @@ public final class TransmissionJobManager {
}
/**
- * Show process configuration.
+ * Show pipeline process configuration.
+ *
+ * @param jobId job id
+ * @return pipeline process configuration
+ */
+ public PipelineProcessConfiguration showProcessConfiguration(final String
jobId) {
+ return
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId));
+ }
+
+ /**
+ * Show pipeline process configuration.
*
* @param contextKey context key
- * @return process configuration, non-null
+ * @return pipeline process configuration
*/
public PipelineProcessConfiguration showProcessConfiguration(final
PipelineContextKey contextKey) {
return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
jobOption.getType()));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 9f1976a72ff..e98dd3c8a35 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -132,7 +132,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
- PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index a93dcf5aed1..ed74f702edd 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -108,7 +108,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId());
PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(
parentJobConfig, new
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6dedfc50e04..0075da4b1e1 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,6 @@ import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtrac
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -87,7 +86,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
- PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ PipelineProcessConfiguration processConfig =
transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 992f68154a4..86e40c727b0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -192,7 +192,7 @@ class MigrationJobAPITest {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
jobManager.start(jobConfig);
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager(jobOption).showProcessConfiguration(jobConfig.getJobId());
TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobOption.buildDataConsistencyChecker(
jobConfig, processContext, new
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0,
"H2")).check("FIXTURE", null);