This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 51834fc05ff Refactor TransmissionJobManager.showProcessConfiguration 
(#29262)
51834fc05ff is described below

commit 51834fc05ff37b5b21d0dcb3a831d46290f588f5
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 2 13:29:56 2023 +0800

    Refactor TransmissionJobManager.showProcessConfiguration (#29262)
---
 .../pipeline/core/job/service/TransmissionJobManager.java  | 14 ++++++++++++--
 .../apache/shardingsphere/data/pipeline/cdc/CDCJob.java    |  2 +-
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java |  2 +-
 .../data/pipeline/scenario/migration/MigrationJob.java     |  3 +--
 .../scenario/migration/api/impl/MigrationJobAPITest.java   |  2 +-
 5 files changed, 16 insertions(+), 7 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 0bf66b41e5c..5107bf48695 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -62,10 +62,20 @@ public final class TransmissionJobManager {
     }
     
     /**
-     * Show process configuration.
+     * Show pipeline process configuration.
+     *
+     * @param jobId job id
+     * @return pipeline process configuration
+     */
+    public PipelineProcessConfiguration showProcessConfiguration(final String 
jobId) {
+        return 
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobId));
+    }
+    
+    /**
+     * Show pipeline process configuration.
      *
      * @param contextKey context key
-     * @return process configuration, non-null
+     * @return pipeline process configuration
      */
     public PipelineProcessConfiguration showProcessConfiguration(final 
PipelineContextKey contextKey) {
         return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 jobOption.getType()));
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 9f1976a72ff..e98dd3c8a35 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -132,7 +132,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
-        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index a93dcf5aed1..ed74f702edd 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -108,7 +108,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             TransmissionJobOption jobOption = (TransmissionJobOption) 
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
             try {
-                PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(parentJobConfig.getJobId()));
+                PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(parentJobConfig.getJobId());
                 PipelineDataConsistencyChecker checker = 
jobOption.buildDataConsistencyChecker(
                         parentJobConfig, new 
TransmissionProcessContext(parentJobConfig.getJobId(), processConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 6dedfc50e04..0075da4b1e1 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtrac
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -87,7 +86,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
-        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        PipelineProcessConfiguration processConfig = 
transmissionJobManager.showProcessConfiguration(jobConfig.getJobId());
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 992f68154a4..86e40c727b0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -192,7 +192,7 @@ class MigrationJobAPITest {
         MigrationJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         initTableData(jobConfig);
         jobManager.start(jobConfig);
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(jobOption).showProcessConfiguration(jobConfig.getJobId());
         TransmissionProcessContext processContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
         Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobOption.buildDataConsistencyChecker(
                 jobConfig, processContext, new 
ConsistencyCheckJobItemProgressContext(jobConfig.getJobId(), 0, 
"H2")).check("FIXTURE", null);

Reply via email to