This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1928e739ba2 Refactor PipelineJobAPI.getJobConfiguration() (#29059)
1928e739ba2 is described below

commit 1928e739ba2328f16356c74c23e1196002675bae
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 23:44:30 2023 +0800

    Refactor PipelineJobAPI.getJobConfiguration() (#29059)
---
 .../data/pipeline/core/job/service/PipelineJobAPI.java           | 8 --------
 .../distsql/handler/update/CheckMigrationJobUpdater.java         | 3 ++-
 .../shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java     | 5 -----
 .../data/pipeline/cdc/handler/CDCBackendHandler.java             | 5 +++--
 .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java        | 7 +------
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java       | 2 +-
 .../pipeline/scenario/migration/api/impl/MigrationJobAPI.java    | 9 ++-------
 .../consistencycheck/api/impl/ConsistencyCheckJobAPITest.java    | 2 +-
 .../scenario/migration/api/impl/MigrationJobAPITest.java         | 6 +++---
 9 files changed, 13 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index b823bba76bf..4ac59eefcf9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -34,14 +34,6 @@ import java.util.Optional;
 @SingletonSPI
 public interface PipelineJobAPI extends TypedSPI {
     
-    /**
-     * Get job configuration.
-     *
-     * @param jobId job id
-     * @return job configuration
-     */
-    PipelineJobConfiguration getJobConfiguration(String jobId);
-    
     /**
      * Get job configuration.
      *
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index a875439f4da..547d6255f4b 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
@@ -46,7 +47,7 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = 
migrationJobAPI.getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = 
migrationJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         verifyInventoryFinished(jobConfig);
         checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, 
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 1993cde6e7b..4d6d31a3cfb 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -278,11 +278,6 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         return new CDCProcessContext(pipelineJobConfig.getJobId(), 
showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
     }
     
-    @Override
-    public CDCJobConfiguration getJobConfiguration(final String jobId) {
-        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-    }
-    
     @Override
     public CDCJobConfiguration getJobConfiguration(final JobConfigurationPOJO 
jobConfigPOJO) {
         return new 
YamlCDCJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index ad7f7b77410..2ef78db661e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -47,6 +47,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextMan
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import 
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -78,7 +79,7 @@ public final class CDCBackendHandler {
      * @return database
      */
     public String getDatabaseNameByJobId(final String jobId) {
-        return jobAPI.getJobConfiguration(jobId).getDatabaseName();
+        return 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
     }
     
     /**
@@ -126,7 +127,7 @@ public final class CDCBackendHandler {
      * @param connectionContext connection context
      */
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
-        CDCJobConfiguration cdcJobConfig = jobAPI.getJobConfiguration(jobId);
+        CDCJobConfiguration cdcJobConfig = 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         if (PipelineJobCenter.isJobExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index ac99a1917c8..88e39ee70d3 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -312,7 +312,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private void fillInJobItemInfoWithCheckAlgorithm(final 
ConsistencyCheckJobItemInfo result, final String checkJobId) {
-        ConsistencyCheckJobConfiguration jobConfig = 
getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration jobConfig = 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
         result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
         if (null != jobConfig.getAlgorithmProps()) {
             
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
 -> String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(",")));
@@ -329,11 +329,6 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         }
     }
     
-    @Override
-    public ConsistencyCheckJobConfiguration getJobConfiguration(final String 
jobId) {
-        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-    }
-    
     @Override
     public ConsistencyCheckJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
         return new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 3ae18d58129..9c8b1f0f949 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -98,7 +98,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             checkJobAPI.persistJobItemProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
             InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
-            PipelineJobConfiguration parentJobConfig = 
jobAPI.getJobConfiguration(parentJobId);
+            PipelineJobConfiguration parentJobConfig = 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
             try {
                 PipelineDataConsistencyChecker checker = 
jobAPI.buildPipelineDataConsistencyChecker(
                         parentJobConfig, 
jobAPI.buildPipelineProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1fe00789a82..52527c61901 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -226,11 +226,6 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         }
     }
     
-    @Override
-    public MigrationJobConfiguration getJobConfiguration(final String jobId) {
-        return 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
-    }
-    
     @Override
     public MigrationJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
         return new 
YamlMigrationJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
@@ -328,7 +323,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
@@ -352,7 +347,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         PipelineJobManager jobManager = new PipelineJobManager(this);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
-        MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
+        MigrationJobConfiguration jobConfig = 
getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
         jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 6e90b0b2224..169f76ab6d5 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -64,7 +64,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = 
checkJobAPI.getJobConfiguration(checkJobId);
+        ConsistencyCheckJobConfiguration checkJobConfig = 
checkJobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index e206c202875..bbd98ed6fbd 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -142,7 +142,7 @@ class MigrationJobAPITest {
     void assertRollback() throws SQLException {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
+        MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -154,7 +154,7 @@ class MigrationJobAPITest {
     void assertCommit() {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(jobId.get());
+        MigrationJobConfiguration jobConfig = 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -277,7 +277,7 @@ class MigrationJobAPITest {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
         String jobId = 
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
-        MigrationJobConfiguration actual = jobAPI.getJobConfiguration(jobId);
+        MigrationJobConfiguration actual = 
jobAPI.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
         assertThat(dataNodeLines.size(), is(1));

Reply via email to