This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 9b3798753ae Refactor JobType to extends TypedSPI (#26838)
9b3798753ae is described below
commit 9b3798753aea7fb3274513cc0b38b1e5728a2e5a
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 8 17:09:25 2023 +0800
Refactor JobType to extends TypedSPI (#26838)
---
.../pipeline/common/job/type/AbstractJobType.java | 12 +++++------
.../{JobTypeFactory.java => JobCodeRegistry.java} | 24 ++++++++--------------
.../data/pipeline/common/job/type/JobType.java | 16 +++++----------
.../common/metadata/node/PipelineMetaDataNode.java | 2 +-
.../AbstractChangedJobConfigurationProcessor.java | 2 +-
.../impl/ConfigMetaDataChangedEventHandler.java | 2 +-
.../pipeline/core/job/AbstractPipelineJob.java | 2 +-
.../data/pipeline/core/job/PipelineJobIdUtils.java | 9 ++++----
.../persist/PipelineJobProgressPersistService.java | 2 +-
.../service/impl/AbstractPipelineJobAPIImpl.java | 4 ++--
.../runner/InventoryIncrementalTasksRunner.java | 4 ++--
.../query/ShowStreamingJobStatusExecutor.java | 2 +-
.../api/impl/ConsistencyCheckJobAPI.java | 6 +++---
.../task/ConsistencyCheckTasksRunner.java | 4 ++--
.../migration/api/impl/MigrationJobAPI.java | 4 ++--
.../pipeline/cases/PipelineContainerComposer.java | 2 +-
...peFactoryTest.java => JobCodeRegistryTest.java} | 21 ++++++-------------
17 files changed, 49 insertions(+), 69 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
index 020f1f743cc..8a6610bb34a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/AbstractJobType.java
@@ -26,13 +26,13 @@ import lombok.Getter;
@Getter
public abstract class AbstractJobType implements JobType {
- private final String typeName;
+ private final String type;
- private final String typeCode;
+ private final String code;
- protected AbstractJobType(final String typeName, final String typeCode) {
- this.typeName = typeName;
- Preconditions.checkArgument(2 == typeCode.length(), "code length is
not 2");
- this.typeCode = typeCode;
+ protected AbstractJobType(final String type, final String code) {
+ this.type = type;
+ Preconditions.checkArgument(2 == code.length(), "code length is not
2");
+ this.code = code;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
similarity index 63%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
index e92422fa408..f21716d17f3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobTypeFactory.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
@@ -23,37 +23,31 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
- * Job type factory.
+ * Job code registry.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
-public final class JobTypeFactory {
+public final class JobCodeRegistry {
- private static final Map<String, JobType> CODE_JOB_TYPE_MAP = new
ConcurrentHashMap<>();
+ private static final Map<String, String> JOB_CODE_AND_TYPE_MAP = new
HashMap<>();
static {
- for (JobType each :
ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
- String typeCode = each.getTypeCode();
- JobType replaced = CODE_JOB_TYPE_MAP.put(typeCode, each);
- if (null != replaced) {
- log.error("Type code already exists, typeCode={}, replaced={},
current={}", typeCode, replaced, each, new Exception());
- }
- }
+
ShardingSphereServiceLoader.getServiceInstances(JobType.class).forEach(each ->
JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each.getType()));
}
/**
- * Get job type instance.
+ * Get job type.
*
* @param jobTypeCode job type code
* @return job type
*/
- public static JobType getInstance(final String jobTypeCode) {
- JobType result = CODE_JOB_TYPE_MAP.get(jobTypeCode);
- Preconditions.checkNotNull(result, "Can not get job type by `%s`",
jobTypeCode);
+ public static String getJobType(final String jobTypeCode) {
+ String result = JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
+ Preconditions.checkNotNull(result, "Can not get job type by `%s`.",
jobTypeCode);
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
index 2f6a51951f0..36b47bdaf63 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
@@ -18,24 +18,18 @@
package org.apache.shardingsphere.data.pipeline.common.job.type;
import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
/**
* Job type.
*/
@SingletonSPI
-public interface JobType {
+public interface JobType extends TypedSPI {
/**
- * Get type name.
+ * Get job type code.
*
- * @return type name
+ * @return job type code
*/
- String getTypeName();
-
- /**
- * Get type code.
- *
- * @return type code
- */
- String getTypeCode();
+ String getCode();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
index fd5cb83f824..45034bebfbe 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
@@ -49,7 +49,7 @@ public final class PipelineMetaDataNode {
private static String getMetaDataRootPath(final JobType jobType) {
return null == jobType
? String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
"metadata")
- : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobType.getTypeName().toLowerCase(), "metadata");
+ : String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT,
jobType.getType().toLowerCase(), "metadata");
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
index cd818c22ffa..6685cdb2de8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractChangedJobConfigurationProcessor.java
@@ -93,6 +93,6 @@ public abstract class
AbstractChangedJobConfigurationProcessor implements Change
@Override
public String getType() {
- return getJobType().getTypeName();
+ return getJobType().getType();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
index 8a0c862c041..3373d60091c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/event/handler/impl/ConfigMetaDataChangedEventHandler.java
@@ -53,7 +53,7 @@ public final class ConfigMetaDataChangedEventHandler
implements PipelineMetaData
return;
}
log.info("{} job configuration: {}, disabled={}", event.getType(),
event.getKey(), jobConfig.isDisabled());
- TypedSPILoader.findService(ChangedJobConfigurationProcessor.class,
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getTypeName())
+ TypedSPILoader.findService(ChangedJobConfigurationProcessor.class,
PipelineJobIdUtils.parseJobType(jobConfig.getJobName()).getType())
.ifPresent(optional -> optional.process(event.getType(),
jobConfig));
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index d0ff42f4373..a74f5aa1bf8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -65,7 +65,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
- jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getTypeName());
+ jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType());
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index cb170464e0b..ecc842a4900 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -26,10 +26,11 @@ import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.nio.charset.StandardCharsets;
@@ -51,7 +52,7 @@ public final class PipelineJobIdUtils {
String databaseNameHex =
Hex.encodeHexString(databaseName.getBytes(StandardCharsets.UTF_8), true);
String databaseNameLengthHex =
Hex.encodeHexString(Shorts.toByteArray((short) databaseNameHex.length()), true);
char encodedInstanceType = InstanceTypeUtils.encode(instanceType);
- return 'j' + pipelineJobId.getJobType().getTypeCode() +
pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex
+ databaseNameHex;
+ return 'j' + pipelineJobId.getJobType().getCode() +
pipelineJobId.getFormatVersion() + encodedInstanceType + databaseNameLengthHex
+ databaseNameHex;
}
/**
@@ -62,8 +63,8 @@ public final class PipelineJobIdUtils {
*/
public static JobType parseJobType(final String jobId) {
verifyJobId(jobId);
- String typeCode = jobId.substring(1, 3);
- return JobTypeFactory.getInstance(typeCode);
+ String jobTypeCode = jobId.substring(1, 3);
+ return TypedSPILoader.getService(JobType.class,
JobCodeRegistry.getJobType(jobTypeCode));
}
private static void verifyJobId(final String jobId) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index d2f2570d816..74ec319f1a7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -111,7 +111,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getTypeName()).updateJobItemProgress(jobItemContext.get());
+ TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).updateJobItemProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 3a1fab60254..8a73cacab1c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -71,7 +71,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey) {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
- .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()).getTypeCode().equals(getJobType().getTypeCode()));
+ .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()).getCode().equals(getJobType().getCode()));
}
// TODO Add getJobInfo
@@ -162,7 +162,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public String getType() {
- return getJobType().getTypeName();
+ return getJobType().getType();
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index c496fe693fc..19505abdcf4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -56,7 +56,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
- jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName());
+ jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
}
@Override
@@ -77,7 +77,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
+ TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
if
(PipelineJobProgressDetector.allInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
executeIncrementalTask();
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index bc49c4fc431..064cea4e7fa 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -40,7 +40,7 @@ public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getTypeName());
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, new CDCJobType().getType());
List<InventoryIncrementalJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 538285694e7..88ae8106d06 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCh
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -328,7 +328,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
result.setCheckSuccess(null);
} else {
InventoryIncrementalJobAPI inventoryIncrementalJobAPI =
(InventoryIncrementalJobAPI) TypedSPILoader.getService(
- PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getTypeName());
+ PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(parentJobId).getType());
result.setCheckSuccess(inventoryIncrementalJobAPI.aggregateDataConsistencyCheckResults(parentJobId,
checkJobResult));
}
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
@@ -377,6 +377,6 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
@Override
public JobType getJobType() {
- return JobTypeFactory.getInstance(ConsistencyCheckJobType.TYPE_CODE);
+ return TypedSPILoader.getService(JobType.class,
JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE));
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 8a0afdfe2df..82f5c2fd790 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -78,7 +78,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getTypeName()).persistJobItemProgress(jobItemContext);
+ TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
ExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
}
@@ -95,7 +95,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
protected void runBlocking() {
checkJobAPI.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
- InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getTypeName());
+ InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig =
jobAPI.getJobConfiguration(parentJobId);
DataConsistencyCalculateAlgorithm calculateAlgorithm =
jobAPI.buildDataConsistencyCalculateAlgorithm(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
ConsistencyCheckTasksRunner.this.calculateAlgorithm.set(calculateAlgorithm);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index e9e3f4a007a..de2dc1b6c04 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -48,7 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
@@ -517,7 +517,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
@Override
public JobType getJobType() {
- return JobTypeFactory.getInstance(MigrationJobType.TYPE_CODE);
+ return TypedSPILoader.getService(JobType.class,
JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE));
}
@Override
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 8e214531c1b..dcd6bdcfc40 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -148,7 +148,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
if (PipelineEnvTypeEnum.NATIVE !=
PipelineE2EEnvironment.getInstance().getItEnvType()) {
return;
}
- String jobTypeName = jobType.getTypeName();
+ String jobTypeName = jobType.getType();
for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
String jobId = each.get("id").toString();
Map<String, Object> jobInfo =
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName,
jobId)).get(0);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
similarity index 61%
rename from
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
rename to
test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
index 39dad303804..cc0112e8443 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobTypeFactoryTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/spi/job/JobCodeRegistryTest.java
@@ -17,28 +17,19 @@
package org.apache.shardingsphere.test.it.data.pipeline.spi.job;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobTypeFactory;
+import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.junit.jupiter.api.Test;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class JobTypeFactoryTest {
+class JobCodeRegistryTest {
@Test
- void assertGetInstance() {
- Collection<Pair<String, Class<? extends JobType>>> paramResult =
Arrays.asList(
- Pair.of(MigrationJobType.TYPE_CODE, MigrationJobType.class),
Pair.of(ConsistencyCheckJobType.TYPE_CODE, ConsistencyCheckJobType.class));
- for (Pair<String, Class<? extends JobType>> each : paramResult) {
- JobType actual = JobTypeFactory.getInstance(each.getKey());
- assertThat(actual, instanceOf(each.getValue()));
- }
+ void assertGetJobType() {
+ assertThat(JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE),
is("MIGRATION"));
+
assertThat(JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE),
is("CONSISTENCY_CHECK"));
}
}