This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 720a2504a5d Refactor PipelineJobAPI (#29007)
720a2504a5d is described below

commit 720a2504a5dbd82ba73af2c25ce3ea6de16501df
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 11 13:09:41 2023 +0800

    Refactor PipelineJobAPI (#29007)
---
 .../data/pipeline/core/job/service/PipelineJobAPI.java      | 13 ++++---------
 .../core/job/service/impl/AbstractPipelineJobAPIImpl.java   |  5 -----
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java               |  6 ++----
 .../scenario/consistencycheck/ConsistencyCheckJobType.java  |  4 +---
 .../consistencycheck/api/impl/ConsistencyCheckJobAPI.java   |  9 +++------
 .../data/pipeline/scenario/migration/MigrationJobType.java  |  4 +---
 .../scenario/migration/api/impl/MigrationJobAPI.java        |  7 ++-----
 7 files changed, 13 insertions(+), 35 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 94e5c5e34dc..5a2bfefcbc5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
@@ -27,8 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessCon
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
@@ -41,13 +40,6 @@ import java.util.Optional;
 @SingletonSPI
 public interface PipelineJobAPI extends TypedSPI {
     
-    /**
-     * Get job type.
-     *
-     * @return job type
-     */
-    JobType getJobType();
-    
     /**
      * Marshal pipeline job id.
      *
@@ -177,4 +169,7 @@ public interface PipelineJobAPI extends TypedSPI {
      * @param shardingItem sharding item
      */
     void cleanJobItemErrorMessage(String jobId, int shardingItem);
+    
+    @Override
+    String getType();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 6c353f52694..9a0cf65b82c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -168,11 +168,6 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
         return result;
     }
     
-    @Override
-    public String getType() {
-        return getJobType().getType();
-    }
-    
     @Override
     public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
         return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 09ceb2debd5..e203e1a2533 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -24,7 +24,6 @@ import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
 import 
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
@@ -52,7 +51,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipeli
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -368,7 +366,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    public JobType getJobType() {
-        return new CDCJobType();
+    public String getType() {
+        return "STREAMING";
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index ce87dac4dfa..87e71d934de 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -24,11 +24,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
  */
 public final class ConsistencyCheckJobType implements JobType {
     
-    public static final String TYPE_CODE = "02";
-    
     @Override
     public String getCode() {
-        return TYPE_CODE;
+        return "02";
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b9d1a11426f..087ee683394 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
 
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
@@ -31,8 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -47,9 +44,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
+import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
-import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
@@ -405,7 +402,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     }
     
     @Override
-    public JobType getJobType() {
-        return JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE);
+    public String getType() {
+        return "CONSISTENCY_CHECK";
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 92d305bca0f..ab7c82ebd90 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -24,11 +24,9 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
  */
 public final class MigrationJobType implements JobType {
     
-    public static final String TYPE_CODE = "01";
-    
     @Override
     public String getCode() {
-        return TYPE_CODE;
+        return "01";
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1c310065bc5..2e273e15c40 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -38,8 +38,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
@@ -63,7 +61,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInv
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
@@ -490,8 +487,8 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    public JobType getJobType() {
-        return JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE);
+    public String getType() {
+        return "MIGRATION";
     }
     
     @Override

Reply via email to