This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b3798753ae Refactor JobType to extends TypedSPI (#26838)
9b3798753ae is described below

commit 9b3798753aea7fb3274513cc0b38b1e5728a2e5a
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 8 17:09:25 2023 +0800

    Refactor JobType to extends TypedSPI (#26838)
---
 .../pipeline/common/job/type/AbstractJobType.java  | 12 +++++------
 .../{JobTypeFactory.java => JobCodeRegistry.java}  | 24 ++++++++--------------
 .../data/pipeline/common/job/type/JobType.java     | 16 +++++----------
 .../common/metadata/node/PipelineMetaDataNode.java |  2 +-
 .../AbstractChangedJobConfigurationProcessor.java  |  2 +-
 .../impl/ConfigMetaDataChangedEventHandler.java    |  2 +-
 .../pipeline/core/job/AbstractPipelineJob.java     |  2 +-
 .../data/pipeline/core/job/PipelineJobIdUtils.java |  9 ++++----
 .../persist/PipelineJobProgressPersistService.java |  2 +-
 .../service/impl/AbstractPipelineJobAPIImpl.java   |  4 ++--
 .../runner/InventoryIncrementalTasksRunner.java    |  4 ++--
 .../query/ShowStreamingJobStatusExecutor.java      |  2 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  6 +++---
 .../task/ConsistencyCheckTasksRunner.java          |  4 ++--
 .../migration/api/impl/MigrationJobAPI.java        |  4 ++--
 .../pipeline/cases/PipelineContainerComposer.java  |  2 +-
 ...peFactoryTest.java => JobCodeRegistryTest.java} | 21 ++++++-------------
 17 files changed, 49 insertions(+), 69 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
index 020f1f743cc..8a6610bb34a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
@@ -26,13 +26,13 @@ import lombok.Getter;
 @Getter
 public abstract class AbstractJobType implements JobType {
     
-    private final String typeName;
+    private final String type;
     
-    private final String typeCode;
+    private final String code;
     
-    protected AbstractJobType(final String typeName, final String typeCode) {
-        this.typeName = typeName;
-        Preconditions.checkArgument(2 == typeCode.length(), "code length is 
not 2");
-        this.typeCode = typeCode;
+    protected AbstractJobType(final String type, final String code) {
+        this.type = type;
+        Preconditions.checkArgument(2 == code.length(), "code length is not 
2");
+        this.code = code;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
similarity index 63%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
index e92422fa408..f21716d17f3 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
@@ -23,37 +23,31 @@ import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 
+import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Job type factory.
+ * Job code registry.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 @Slf4j
-public final class JobTypeFactory {
+public final class JobCodeRegistry {
     
-    private static final Map<String, JobType> CODE_JOB_TYPE_MAP = new 
ConcurrentHashMap<>();
+    private static final Map<String, String> JOB_CODE_AND_TYPE_MAP = new 
HashMap<>();
     
     static {
-        for (JobType each : 
ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
-            String typeCode = each.getTypeCode();
-            JobType replaced = CODE_JOB_TYPE_MAP.put(typeCode, each);
-            if (null != replaced) {
-                log.error("Type code already exists, typeCode={}, replaced={}, 
current={}", typeCode, replaced, each, new Exception());
-            }
-        }
+        
ShardingSphereServiceLoader.getServiceInstances(JobType.class).forEach(each -> 
JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each.getType()));
     }
     
     /**
-     * Get job type instance.
+     * Get job type.
      *
      * @param jobTypeCode job type code
      * @return job type
      */
-    public static JobType getInstance(final String jobTypeCode) {
-        JobType result = CODE_JOB_TYPE_MAP.get(jobTypeCode);
-        Preconditions.checkNotNull(result, "Can not get job type by `%s`", 
jobTypeCode);
+    public static String getJobType(final String jobTypeCode) {
+        String result = JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
+        Preconditions.checkNotNull(result, "Can not get job type by `%s`.", 
jobTypeCode);
         return result;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
index 2f6a51951f0..36b47bdaf63 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
@@ -18,24 +18,18 @@
 package org.apache.shardingsphere.data.pipeline.common.job.type;
 
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
  * Job type.
  */
 @SingletonSPI
-public interface JobType {
+public interface JobType extends TypedSPI {
     
     /**
-     * Get type name.
+     * Get job type code.
      *
-     * @return type name
+     * @return job type code
      */
-    String getTypeName();
-    
-    /**
-     * Get type code.
-     *
-     * @return type code
-     */
-    String getTypeCode();
+    String getCode();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
index fd5cb83f824..45034bebfbe 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
@@ -49,7 +49,7 @@ public final class PipelineMetaDataNode {
     private static String getMetaDataRootPath(final JobType jobType) {
         return null == jobType
                 ? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
"metadata")
-                : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobType.getTypeName().toLowerCase(), "metadata");
+                : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobType.getType().toLowerCase(), "metadata");
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index cd818c22ffa..6685cdb2de8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -93,6 +93,6 @@ public abstract class 
AbstractChangedJobConfigurationProcessor implements Change
     
     @Override
     public String getType() {
-        return getJobType().getTypeName();
+        return getJobType().getType();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
index 8a0c862c041..3373d60091c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
@@ -53,7 +53,7 @@ public final class ConfigMetaDataChangedEventHandler 
implements PipelineMetaData
             return;
         }
         log.info("{} job configuration: {}, disabled={}", event.getType(), 
event.getKey(), jobConfig.isDisabled());
-        TypedSPILoader.findService(ChangedJobConfigurationProcessor.class, 
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getTypeName())
+        TypedSPILoader.findService(ChangedJobConfigurationProcessor.class, 
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getType())
                 .ifPresent(optional -> optional.process(event.getType(), 
jobConfig));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index d0ff42f4373..a74f5aa1bf8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -65,7 +65,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     protected AbstractPipelineJob(final String jobId) {
         this.jobId = jobId;
-        jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+        jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index cb170464e0b..ecc842a4900 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -26,10 +26,11 @@ import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
 import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 
 import java.nio.charset.StandardCharsets;
 
@@ -51,7 +52,7 @@ public final class PipelineJobIdUtils {
         String databaseNameHex = 
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
         String databaseNameLengthHex = 
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
         char encodedInstanceType = InstanceTypeUtils.encode(instanceType);
-        return 'j' + pipelineJobId.getJobType().getTypeCode() + 
pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex 
+ databaseNameHex;
+        return 'j' + pipelineJobId.getJobType().getCode() + 
pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex 
+ databaseNameHex;
     }
     
     /**
@@ -62,8 +63,8 @@ public final class PipelineJobIdUtils {
      */
     public static JobType parseJobType(final String jobId) {
         verifyJobId(jobId);
-        String typeCode = jobId.substring(1, 3);
-        return JobTypeFactory.getInstance(typeCode);
+        String jobTypeCode = jobId.substring(1, 3);
+        return TypedSPILoader.getService(JobType.class, 
JobCodeRegistry.getJobType(jobTypeCode));
     }
     
     private static void verifyJobId(final String jobId) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index d2f2570d816..74ec319f1a7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -111,7 +111,7 @@ public final class PipelineJobProgressPersistService {
             }
             persistContext.getHasNewEvents().set(false);
             long startTimeMillis = System.currentTimeMillis();
-            TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).updateJobItemProgress(jobItemContext.get());
+            TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobId).getType()).updateJobItemProgress(jobItemContext.get());
             persistContext.getBeforePersistingProgressMillis().set(null);
             if (6 == ThreadLocalRandom.current().nextInt(100)) {
                 log.info("persist, jobId={}, shardingItem={}, cost {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 3a1fab60254..8a73cacab1c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -71,7 +71,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey) {
         return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
-                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()).getTypeCode().equals(getJobType().getTypeCode()));
+                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()).getCode().equals(getJobType().getCode()));
     }
     
     // TODO Add getJobInfo
@@ -162,7 +162,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public String getType() {
-        return getJobType().getTypeName();
+        return getJobType().getType();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index c496fe693fc..19505abdcf4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -56,7 +56,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         this.jobItemContext = jobItemContext;
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
-        jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName());
+        jobAPI = TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
     }
     
     @Override
@@ -77,7 +77,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
+        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
             executeIncrementalTask();
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index bc49c4fc431..064cea4e7fa 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -40,7 +40,7 @@ public final class ShowStreamingJobStatusExecutor implements 
QueryableRALExecuto
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingStatusStatement sqlStatement) {
-        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getTypeName());
+        InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType());
         List<InventoryIncrementalJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> generateResultRow(each, 
currentTimeMillis)).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 538285694e7..88ae8106d06 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCh
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -328,7 +328,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
             result.setCheckSuccess(null);
         } else {
             InventoryIncrementalJobAPI inventoryIncrementalJobAPI = 
(InventoryIncrementalJobAPI) TypedSPILoader.getService(
-                    PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getTypeName());
+                    PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(parentJobId).getType());
             
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId,
 checkJobResult));
         }
         
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
@@ -377,6 +377,6 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     
     @Override
     public JobType getJobType() {
-        return JobTypeFactory.getInstance(ConsistencyCheckJobType.TYPE_CODE);
+        return TypedSPILoader.getService(JobType.class, 
JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 8a0afdfe2df..82f5c2fd790 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -78,7 +78,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         if (jobItemContext.isStopping()) {
             return;
         }
-        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
+        TypedSPILoader.getService(PipelineJobAPI.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
         CompletableFuture<?> future = 
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
         ExecuteEngine.trigger(Collections.singletonList(future), new 
CheckExecuteCallback());
     }
@@ -95,7 +95,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         protected void runBlocking() {
             checkJobAPI.persistJobItemProgress(jobItemContext);
             JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
-            InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getTypeName());
+            InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
             PipelineJobConfiguration parentJobConfig = 
jobAPI.getJobConfiguration(parentJobId);
             DataConsistencyCalculateAlgorithm calculateAlgorithm = 
jobAPI.buildDataConsistencyCalculateAlgorithm(checkJobConfig.getAlgorithmTypeName(),
 checkJobConfig.getAlgorithmProps());
             
ConsistencyCheckTasksRunner.this.calculateAlgorithm.set(calculateAlgorithm);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index e9e3f4a007a..de2dc1b6c04 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -48,7 +48,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
 import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
@@ -517,7 +517,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     @Override
     public JobType getJobType() {
-        return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
+        return TypedSPILoader.getService(JobType.class, 
JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE));
     }
     
     @Override
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 8e214531c1b..dcd6bdcfc40 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -148,7 +148,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         if (PipelineEnvTypeEnum.NATIVE != 
PipelineE2EEnvironment.getInstance().getItEnvType()) {
             return;
         }
-        String jobTypeName = jobType.getTypeName();
+        String jobTypeName = jobType.getType();
         for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
             String jobId = each.get("id").toString();
             Map<String, Object> jobInfo = 
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, 
jobId)).get(0);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
similarity index 61%
rename from 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
rename to 
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
index 39dad303804..cc0112e8443 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
@@ -17,28 +17,19 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.spi.job;
 
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-class JobTypeFactoryTest {
+class JobCodeRegistryTest {
     
     @Test
-    void assertGetInstance() {
-        Collection<Pair<String, Class<? extends JobType>>> paramResult = 
Arrays.asList(
-                Pair.of(MigrationJobType.TYPE_CODE, MigrationJobType.class), 
Pair.of(ConsistencyCheckJobType.TYPE_CODE, ConsistencyCheckJobType.class));
-        for (Pair<String, Class<? extends JobType>> each : paramResult) {
-            JobType actual = JobTypeFactory.getInstance(each.getKey());
-            assertThat(actual, instanceOf(each.getValue()));
-        }
+    void assertGetJobType() {
+        assertThat(JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE), 
is("MIGRATION"));
+        
assertThat(JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE), 
is("CONSISTENCY_CHECK"));
     }
 }

Reply via email to