This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bad450099fc Refactor PipelineJobAPI as SPI, extract 
PipelineJobPublicAPI for isolation and common usage (#20112)
bad450099fc is described below

commit bad450099fc8de3a6ddf12330e64d9f77864e397
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 12 16:18:05 2022 +0800

    Refactor PipelineJobAPI as SPI, extract PipelineJobPublicAPI for isolation 
and common usage (#20112)
---
 .../query/CheckMigrationQueryResultSet.java        |  10 +-
 ...ShowMigrationCheckAlgorithmsQueryResultSet.java |   8 +-
 .../ShowMigrationJobStatusQueryResultSet.java      |   8 +-
 .../query/ShowMigrationListQueryResultSet.java     |   8 +-
 .../handler/update/ApplyMigrationUpdater.java      |   8 +-
 .../handler/update/DropMigrationUpdater.java       |   8 +-
 .../handler/update/ResetMigrationUpdater.java      |   8 +-
 .../RestoreMigrationSourceWritingUpdater.java      |   8 +-
 .../handler/update/StartMigrationUpdater.java      |   8 +-
 .../update/StopMigrationSourceWritingUpdater.java  |   8 +-
 .../handler/update/StopMigrationUpdater.java       |   8 +-
 ...teredJobAPI.java => MigrationJobPublicAPI.java} | 106 +-----------------
 ...pelineJobAPI.java => PipelineJobPublicAPI.java} |  31 +-----
 ...ctory.java => PipelineJobPublicAPIFactory.java} |  12 +-
 .../data/pipeline/api/job/JobType.java             |   2 +-
 .../data/pipeline/api/job/PipelineJobId.java       |   2 +-
 .../data/pipeline/api/job/PipelineJobIdUtils.java  |  56 ++++++++++
 .../data/pipeline/core/api/PipelineAPIFactory.java |  17 +++
 .../data/pipeline/core}/api/PipelineJobAPI.java    |  45 +++++---
 .../data/pipeline/core}/api/RuleAlteredJobAPI.java | 123 ++-------------------
 .../core}/api/RuleAlteredJobAPIFactory.java        |   2 +-
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  18 +--
 .../core/api/impl/RuleAlteredJobAPIImpl.java       |  28 ++---
 .../InventoryIncrementalJobItemContext.java}       |  31 +++---
 .../pipeline/core/execute/PipelineJobExecutor.java |   2 +-
 .../data/pipeline/core/job/FinishedCheckJob.java   |   4 +-
 .../persist/PipelineJobProgressPersistService.java |   5 +-
 .../core/task/InventoryIncrementalTasksRunner.java |   5 +-
 .../scenario/rulealtered/RuleAlteredJob.java       |   2 +-
 .../rulealtered/RuleAlteredJobContext.java         |   4 +-
 .../rulealtered/RuleAlteredJobPreparer.java        |   2 +-
 .../scenario/rulealtered/RuleAlteredJobWorker.java |   2 +-
 ...sphere.data.pipeline.api.MigrationJobPublicAPI} |   0
 ...ngsphere.data.pipeline.core.api.PipelineJobAPI} |   0
 ...phere.data.pipeline.core.api.RuleAlteredJobAPI} |   0
 .../core}/RuleAlteredJobAPIFactoryTest.java        |   5 +-
 .../core}/fixture/RuleAlteredJobAPIFixture.java    |  10 +-
 .../pipeline/core/job/PipelineJobIdUtilsTest.java  |  42 +++++++
 ...phere.data.pipeline.core.api.RuleAlteredJobAPI} |   2 +-
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   |   4 +-
 .../pipeline/core/job/FinishedCheckJobTest.java    |   4 +-
 .../rulealtered/RuleAlteredJobWorkerTest.java      |   2 +-
 .../prepare/InventoryTaskSplitterTest.java         |   2 +-
 43 files changed, 272 insertions(+), 388 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
index 722afe7c2e2..9b08eae1ead 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
  */
 public final class CheckMigrationQueryResultSet implements 
DatabaseDistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     private Iterator<Collection<Object>> data;
     
@@ -48,9 +48,9 @@ public final class CheckMigrationQueryResultSet implements 
DatabaseDistSQLResult
         Map<String, DataConsistencyCheckResult> checkResultMap;
         AlgorithmSegment typeStrategy = 
checkMigrationStatement.getTypeStrategy();
         if (null == typeStrategy) {
-            checkResultMap = 
RULE_ALTERED_JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId());
+            checkResultMap = 
JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId());
         } else {
-            checkResultMap = 
RULE_ALTERED_JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(), 
typeStrategy.getName(), typeStrategy.getProps());
+            checkResultMap = 
JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(), 
typeStrategy.getName(), typeStrategy.getProps());
         }
         data = checkResultMap.entrySet().stream()
                 .map(each -> {
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
index 7d875520f92..92d43f35ff0 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
@@ -34,13 +34,13 @@ import java.util.stream.Collectors;
  */
 public final class ShowMigrationCheckAlgorithmsQueryResultSet implements 
DatabaseDistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     private Iterator<Collection<Object>> data;
     
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement 
sqlStatement) {
-        data = 
RULE_ALTERED_JOB_API.listDataConsistencyCheckAlgorithms().stream().map(
+        data = JOB_API.listDataConsistencyCheckAlgorithms().stream().map(
                 each -> (Collection<Object>) 
Arrays.<Object>asList(each.getType(), String.join(",", 
each.getSupportedDatabaseTypes()), 
each.getDescription())).collect(Collectors.toList()).iterator();
     }
     
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index f573f604e3b..2aa717e9f48 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -36,14 +36,14 @@ import java.util.stream.Collectors;
  */
 public final class ShowMigrationJobStatusQueryResultSet implements 
DatabaseDistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     private Iterator<Collection<Object>> data;
     
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement 
sqlStatement) {
         long currentTimeMillis = System.currentTimeMillis();
-        data = 
RULE_ALTERED_JOB_API.getJobProgress(((ShowMigrationStatusStatement) 
sqlStatement).getJobId()).entrySet().stream()
+        data = JOB_API.getJobProgress(((ShowMigrationStatusStatement) 
sqlStatement).getJobId()).entrySet().stream()
                 .map(entry -> {
                     Collection<Object> result = new LinkedList<>();
                     result.add(entry.getKey());
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
index 90f01d9aac0..fdca0dc082b 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.query;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
@@ -35,13 +35,13 @@ import java.util.stream.Collectors;
  */
 public final class ShowMigrationListQueryResultSet implements 
DatabaseDistSQLResultSet {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     private Iterator<Collection<Object>> data;
     
     @Override
     public void init(final ShardingSphereDatabase database, final SQLStatement 
sqlStatement) {
-        data = RULE_ALTERED_JOB_API.list().stream()
+        data = JOB_API.list().stream()
                 .map(each -> {
                     Collection<Object> result = new LinkedList<>();
                     result.add(each.getJobId());
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
index ce616c9a4d5..793be29ceef 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.ApplyMigrationStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.ApplyMigrationState
  */
 public final class ApplyMigrationUpdater implements 
RALUpdater<ApplyMigrationStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final ApplyMigrationStatement sqlStatement) {
-        
RULE_ALTERED_JOB_API.switchClusterConfiguration(sqlStatement.getJobId());
+        JOB_API.switchClusterConfiguration(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
index 3070ff19947..eac7749d309 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.DropMigrationStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.DropMigrationStatem
  */
 public final class DropMigrationUpdater implements 
RALUpdater<DropMigrationStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final DropMigrationStatement sqlStatement) {
-        RULE_ALTERED_JOB_API.remove(sqlStatement.getJobId());
+        JOB_API.remove(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
index e20ae3417e5..5356a368483 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.ResetMigrationStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.ResetMigrationState
  */
 public final class ResetMigrationUpdater implements 
RALUpdater<ResetMigrationStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final ResetMigrationStatement sqlStatement) {
-        RULE_ALTERED_JOB_API.reset(sqlStatement.getJobId());
+        JOB_API.reset(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
index 0f644ceb72a..8907586f6e6 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.RestoreMigrationSourceWritingStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.RestoreMigrationSou
  */
 public final class RestoreMigrationSourceWritingUpdater implements 
RALUpdater<RestoreMigrationSourceWritingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final RestoreMigrationSourceWritingStatement 
sqlStatement) {
-        RULE_ALTERED_JOB_API.restoreClusterWriteDB(sqlStatement.getJobId());
+        JOB_API.restoreClusterWriteDB(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
index 905fe5d2c3e..3846be4ceba 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
  */
 public final class StartMigrationUpdater implements 
RALUpdater<StartMigrationStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final StartMigrationStatement sqlStatement) {
-        RULE_ALTERED_JOB_API.startDisabledJob(sqlStatement.getJobId());
+        JOB_API.startDisabledJob(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
index 7b42756fb7c..c096aa65e99 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationSourceWritingStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationSource
  */
 public final class StopMigrationSourceWritingUpdater implements 
RALUpdater<StopMigrationSourceWritingStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final StopMigrationSourceWritingStatement 
sqlStatement) {
-        RULE_ALTERED_JOB_API.stopClusterWriteDB(sqlStatement.getJobId());
+        JOB_API.stopClusterWriteDB(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
index 008c64ba269..bb7883bdfbf 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -17,8 +17,8 @@
 
 package org.apache.shardingsphere.migration.distsql.handler.update;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
 import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
 import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
 
@@ -27,11 +27,11 @@ import 
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatem
  */
 public final class StopMigrationUpdater implements 
RALUpdater<StopMigrationStatement> {
     
-    private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API = 
RuleAlteredJobAPIFactory.getInstance();
+    private static final MigrationJobPublicAPI JOB_API = 
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
     
     @Override
     public void executeUpdate(final StopMigrationStatement sqlStatement) {
-        RULE_ALTERED_JOB_API.stop(sqlStatement.getJobId());
+        JOB_API.stop(sqlStatement.getJobId());
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
similarity index 52%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 9cfe9a3f896..99851859fe4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -18,24 +18,22 @@
 package org.apache.shardingsphere.data.pipeline.api;
 
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
 /**
- * Rule altered job API.
+ * Migration job public API.
  */
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
+@SingletonSPI
+public interface MigrationJobPublicAPI extends PipelineJobPublicAPI, 
RequiredSPI {
     
     /**
      * List all jobs.
@@ -44,30 +42,15 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     List<JobInfo> list();
     
-    /**
-     * Start scaling job by config.
-     *
-     * @param jobConfig job config
-     * @return job id
-     */
-    Optional<String> start(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * Get job progress.
      *
      * @param jobId job id
      * @return each sharding item progress
      */
+    // TODO add JobProgress
     Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String 
jobId);
     
-    /**
-     * Get job progress.
-     *
-     * @param jobConfig job configuration
-     * @return each sharding item progress
-     */
-    Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * Stop cluster writing.
      *
@@ -75,13 +58,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     void stopClusterWriteDB(String jobId);
     
-    /**
-     * Stop cluster writing.
-     *
-     * @param jobConfig job configuration
-     */
-    void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * Restore cluster writing.
      *
@@ -89,13 +65,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     void restoreClusterWriteDB(String jobId);
     
-    /**
-     * Restore cluster writing.
-     *
-     * @param jobConfig job configuration
-     */
-    void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * List all data consistency check algorithms from SPI.
      *
@@ -111,14 +80,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     boolean isDataConsistencyCheckNeeded(String jobId);
     
-    /**
-     * Is data consistency check needed.
-     *
-     * @param jobConfig job configuration
-     * @return data consistency check needed or not
-     */
-    boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration 
jobConfig);
-    
     /**
      * Do data consistency check.
      *
@@ -127,14 +88,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
     
-    /**
-     * Do data consistency check.
-     *
-     * @param jobConfig job configuration
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * Do data consistency check.
      *
@@ -145,15 +98,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, 
String algorithmType, Properties algorithmProps);
     
-    /**
-     * Aggregate data consistency check results.
-     *
-     * @param jobId job id
-     * @param checkResults check results
-     * @return check success or not
-     */
-    boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, 
DataConsistencyCheckResult> checkResults);
-    
     /**
      * Switch cluster configuration.
      *
@@ -161,50 +105,10 @@ public interface RuleAlteredJobAPI extends 
PipelineJobAPI, RequiredSPI {
      */
     void switchClusterConfiguration(String jobId);
     
-    /**
-     * Switch cluster configuration.
-     *
-     * @param jobConfig job configuration
-     */
-    void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
-    
     /**
      * Reset scaling job.
      *
      * @param jobId job id
      */
     void reset(String jobId);
-    
-    /**
-     * Get job configuration.
-     *
-     * @param jobId job id
-     * @return job configuration
-     */
-    RuleAlteredJobConfiguration getJobConfig(String jobId);
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress
-     */
-    InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int 
shardingItem);
-    
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
similarity index 60%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 7388cf15821..efd9ce54bd8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,37 +17,12 @@
 
 package org.apache.shardingsphere.data.pipeline.api;
 
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
- * Pipeline job API.
+ * Pipeline job public API.
  */
-public interface PipelineJobAPI {
-    
-    /**
-     * Marshal pipeline job id.
-     *
-     * @param pipelineJobId pipeline job id
-     * @return marshaled text
-     */
-    String marshalJobId(PipelineJobId pipelineJobId);
-    
-    /**
-     * Parse job type.
-     *
-     * @param jobId job id
-     * @return job type
-     */
-    JobType parseJobType(String jobId);
-    
-    /**
-     * Extend job configuration.
-     *
-     * @param yamlJobConfig yaml job configuration
-     */
-    void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+public interface PipelineJobPublicAPI extends TypedSPI {
     
     /**
      * Start pipeline job by id.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
similarity index 75%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 2d640a62af8..557e9c49783 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -21,20 +21,20 @@ import 
org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 
 /**
- * Rule altered job API factory.
+ * Pipeline job public API factory.
  */
-public final class RuleAlteredJobAPIFactory {
+public final class PipelineJobPublicAPIFactory {
     
     static {
-        ShardingSphereServiceLoader.register(RuleAlteredJobAPI.class);
+        ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
     }
     
     /**
-     * Get instance of rule altered job API.
+     * Get instance of migration job public API.
      *
      * @return got instance
      */
-    public static RuleAlteredJobAPI getInstance() {
-        return 
RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
+    public static MigrationJobPublicAPI getMigrationJobPublicAPI() {
+        return 
RequiredSPIRegistry.getRegisteredService(MigrationJobPublicAPI.class);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index 57118ef052c..174550b90fe 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -52,7 +52,7 @@ public enum JobType {
      * Value of by code.
      *
      * @param typeCode type code
-     * @return job type
+     * @return job type, might be null
      */
     public static JobType valueOfByCode(final String typeCode) {
         return CODE_JOB_TYPE_MAP.get(typeCode);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
index 77749a0cf6d..a038c1b953c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.api.job;
 
 /**
- * Job id.
+ * Pipeline job id.
  */
 public interface PipelineJobId {
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
new file mode 100644
index 00000000000..a25ec5d3b60
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job;
+
+/**
+ * Pipeline job id utils.
+ */
+public final class PipelineJobIdUtils {
+    
+    /**
+     * Marshal job id common prefix.
+     *
+     * @param pipelineJobId pipeline job id
+     * @return job id common prefix
+     */
+    public static String marshalJobIdCommonPrefix(final PipelineJobId 
pipelineJobId) {
+        return 'j' + pipelineJobId.getTypeCode();
+    }
+    
+    /**
+     * Parse job type.
+     *
+     * @param jobId job id
+     * @return job type
+     * @throws IllegalArgumentException if job id is invalid
+     */
+    public static JobType parseJobType(final String jobId) {
+        if (jobId.length() <= 3) {
+            throw new IllegalArgumentException("Invalid jobId length, jobId=" 
+ jobId);
+        }
+        if ('j' != jobId.charAt(0)) {
+            throw new IllegalArgumentException("Invalid jobId, first char=" + 
jobId.charAt(0));
+        }
+        String typeCode = jobId.substring(1, 3);
+        JobType result = JobType.valueOfByCode(typeCode);
+        if (null == result) {
+            throw new IllegalArgumentException("Could not get JobType by '" + 
typeCode + "', jobId: " + jobId);
+        }
+        return result;
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 29c30aecf6a..b9438115173 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -23,6 +23,7 @@ import lombok.NoArgsConstructor;
 import lombok.SneakyThrows;
 import org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -33,6 +34,8 @@ import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
 
@@ -42,6 +45,10 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class PipelineAPIFactory {
     
+    static {
+        ShardingSphereServiceLoader.register(PipelineJobAPI.class);
+    }
+    
     private static final LazyInitializer<GovernanceRepositoryAPI> 
REPOSITORY_API_LAZY_INITIALIZER = new 
LazyInitializer<GovernanceRepositoryAPI>() {
         
         @Override
@@ -60,6 +67,16 @@ public final class PipelineAPIFactory {
         return REPOSITORY_API_LAZY_INITIALIZER.get();
     }
     
+    /**
+     * Get pipeline job API.
+     *
+     * @param jobType job type
+     * @return pipeline job API
+     */
+    public static PipelineJobAPI getPipelineJobAPI(final JobType jobType) {
+        return TypedSPIRegistry.getRegisteredService(PipelineJobAPI.class, 
jobType.getTypeName());
+    }
+    
     /**
      * Get job statistics API.
      *
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
similarity index 54%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 7388cf15821..36e16b2e8eb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
 
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import 
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 /**
  * Pipeline job API.
  */
-public interface PipelineJobAPI {
+@SingletonSPI
+public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
     
     /**
      * Marshal pipeline job id.
@@ -34,14 +41,6 @@ public interface PipelineJobAPI {
      */
     String marshalJobId(PipelineJobId pipelineJobId);
     
-    /**
-     * Parse job type.
-     *
-     * @param jobId job id
-     * @return job type
-     */
-    JobType parseJobType(String jobId);
-    
     /**
      * Extend job configuration.
      *
@@ -50,23 +49,35 @@ public interface PipelineJobAPI {
     void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
     
     /**
-     * Start pipeline job by id.
+     * Get job configuration.
      *
      * @param jobId job id
+     * @return job configuration
+     */
+    PipelineJobConfiguration getJobConfig(String jobId);
+    
+    /**
+     * Persist job item progress.
+     *
+     * @param jobItemContext job item context
      */
-    void startDisabledJob(String jobId);
+    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
     
     /**
-     * Stop pipeline job.
+     * Get job item progress.
      *
      * @param jobId job id
+     * @param shardingItem sharding item
+     * @return job item progress
      */
-    void stop(String jobId);
+    PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
     
     /**
-     * Remove pipeline job.
+     * Update job item status.
      *
      * @param jobId job id
+     * @param shardingItem sharding item
+     * @param status status
      */
-    void remove(String jobId);
+    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
similarity index 51%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
index 9cfe9a3f896..7a8def8601d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
@@ -15,34 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
 
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Properties;
 
 /**
  * Rule altered job API.
  */
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
-    
-    /**
-     * List all jobs.
-     *
-     * @return job infos
-     */
-    List<JobInfo> list();
+@SingletonSPI
+public interface RuleAlteredJobAPI extends PipelineJobAPI, 
MigrationJobPublicAPI, RequiredSPI {
     
     /**
      * Start scaling job by config.
@@ -52,14 +41,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     Optional<String> start(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * Get job progress.
-     *
-     * @param jobId job id
-     * @return each sharding item progress
-     */
-    Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String 
jobId);
-    
     /**
      * Get job progress.
      *
@@ -68,12 +49,8 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * Stop cluster writing.
-     *
-     * @param jobId job id
-     */
-    void stopClusterWriteDB(String jobId);
+    @Override
+    InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int 
shardingItem);
     
     /**
      * Stop cluster writing.
@@ -82,13 +59,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * Restore cluster writing.
-     *
-     * @param jobId job id
-     */
-    void restoreClusterWriteDB(String jobId);
-    
     /**
      * Restore cluster writing.
      *
@@ -96,21 +66,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * List all data consistency check algorithms from SPI.
-     *
-     * @return data consistency check algorithms
-     */
-    Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms();
-    
-    /**
-     * Is data consistency check needed.
-     *
-     * @param jobId job id
-     * @return data consistency check needed or not
-     */
-    boolean isDataConsistencyCheckNeeded(String jobId);
-    
     /**
      * Is data consistency check needed.
      *
@@ -119,14 +74,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration 
jobConfig);
     
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-    
     /**
      * Do data consistency check.
      *
@@ -135,16 +82,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     Map<String, DataConsistencyCheckResult> 
dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * Do data consistency check.
-     *
-     * @param jobId job id
-     * @param algorithmType algorithm type
-     * @param algorithmProps algorithm props. Nullable
-     * @return each logic table check result
-     */
-    Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId, 
String algorithmType, Properties algorithmProps);
-    
     /**
      * Aggregate data consistency check results.
      *
@@ -154,13 +91,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     boolean aggregateDataConsistencyCheckResults(String jobId, Map<String, 
DataConsistencyCheckResult> checkResults);
     
-    /**
-     * Switch cluster configuration.
-     *
-     * @param jobId job id
-     */
-    void switchClusterConfiguration(String jobId);
-    
     /**
      * Switch cluster configuration.
      *
@@ -168,43 +98,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI, 
RequiredSPI {
      */
     void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
     
-    /**
-     * Reset scaling job.
-     *
-     * @param jobId job id
-     */
-    void reset(String jobId);
-    
-    /**
-     * Get job configuration.
-     *
-     * @param jobId job id
-     * @return job configuration
-     */
+    @Override
     RuleAlteredJobConfiguration getJobConfig(String jobId);
-    
-    /**
-     * Persist job item progress.
-     *
-     * @param jobItemContext job item context
-     */
-    void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-    
-    /**
-     * Get job item progress.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return job item progress
-     */
-    InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int 
shardingItem);
-    
-    /**
-     * Update job item status.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param status status
-     */
-    void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
similarity index 95%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
index 2d640a62af8..e0a87e03ec4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
 
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 9f496fa20e2..aca2c9f609a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,9 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -39,23 +39,11 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     @Override
     public final String marshalJobId(final PipelineJobId pipelineJobId) {
-        return 'j' + pipelineJobId.getTypeCode() + 
marshalJobIdLeftPart(pipelineJobId);
+        return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + 
marshalJobIdLeftPart(pipelineJobId);
     }
     
     protected abstract String marshalJobIdLeftPart(PipelineJobId 
pipelineJobId);
     
-    @Override
-    public JobType parseJobType(final String jobId) {
-        if (jobId.length() <= 3) {
-            throw new IllegalArgumentException("Invalid jobId length, jobId=" 
+ jobId);
-        }
-        if ('j' == jobId.charAt(0)) {
-            throw new IllegalArgumentException("Invalid jobId, first char=" + 
jobId.charAt(0));
-        }
-        String typeCode = jobId.substring(1, 3);
-        return JobType.valueOfByCode(typeCode);
-    }
-    
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 0822a728cb0..6e17cf8a6f9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
@@ -46,6 +46,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -58,7 +59,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -408,30 +408,27 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
     
     @Override
     public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-        if (!(jobItemContext instanceof RuleAlteredJobContext)) {
-            return;
-        }
-        RuleAlteredJobContext context = (RuleAlteredJobContext) jobItemContext;
+        InventoryIncrementalJobItemContext context = 
(InventoryIncrementalJobItemContext) jobItemContext;
         InventoryIncrementalJobItemProgress jobItemProgress = new 
InventoryIncrementalJobItemProgress();
         jobItemProgress.setStatus(jobItemContext.getStatus());
-        
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
-        jobItemProgress.setIncremental(getIncrementalTasksProgress(context));
-        jobItemProgress.setInventory(getInventoryTasksProgress(context));
+        
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+        
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+        
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
         String value = 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
 jobItemContext.getShardingItem(), value);
     }
     
-    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
RuleAlteredJobContext jobItemContext) {
+    private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final 
Collection<IncrementalTask> incrementalTasks) {
         Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new 
HashMap<>();
-        for (IncrementalTask each : jobItemContext.getIncrementalTasks()) {
+        for (IncrementalTask each : incrementalTasks) {
             incrementalTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
         }
         return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
     }
     
-    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
RuleAlteredJobContext jobItemContext) {
+    private JobItemInventoryTasksProgress getInventoryTasksProgress(final 
Collection<InventoryTask> inventoryTasks) {
         Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new 
HashMap<>();
-        for (InventoryTask each : jobItemContext.getInventoryTasks()) {
+        for (InventoryTask each : inventoryTasks) {
             inventoryTaskProgressMap.put(each.getTaskId(), 
each.getTaskProgress());
         }
         return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
@@ -456,4 +453,9 @@ public final class RuleAlteredJobAPIImpl extends 
AbstractPipelineJobAPIImpl impl
         jobItemProgress.setStatus(status);
         
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId, 
shardingItem, 
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
     }
+    
+    @Override
+    public String getType() {
+        return JobType.MIGRATION.getTypeName();
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
similarity index 56%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 77749a0cf6d..91637df225d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -15,31 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
 
 /**
- * Job id.
+ * Inventory incremental job item context.
  */
-public interface PipelineJobId {
-    
-    /**
-     * Get type.
-     *
-     * @return type
-     */
-    String getTypeCode();
+public interface InventoryIncrementalJobItemContext extends 
PipelineJobItemContext {
     
     /**
-     * Get format version.
+     * Get inventory tasks.
      *
-     * @return format version
+     * @return inventory tasks
      */
-    String getFormatVersion();
+    Collection<InventoryTask> getInventoryTasks();
     
     /**
-     * Get database name.
+     * Get incremental tasks.
      *
-     * @return database name
+     * @return incremental tasks
      */
-    String getDatabaseName();
+    Collection<IncrementalTask> getIncrementalTasks();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index f7f984a13c7..373fa3ac1ec 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.execute;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index ef1f4a4dea0..6bb7b0df73d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 1ff05a8bdf4..20066a1a272 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,8 +18,9 @@
 package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 
@@ -102,7 +103,7 @@ public final class PipelineJobProgressPersistService {
         }
         persistContext.getHasNewEvents().set(false);
         long startTimeMillis = System.currentTimeMillis();
-        
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext.get());
+        
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId)).persistJobItemProgress(jobItemContext.get());
         persistContext.getBeforePersistingProgressMillis().set(null);
         if (6 == ThreadLocalRandom.current().nextInt(100)) {
             log.info("persist, jobId={}, shardingItem={}, cost time: {} ms", 
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 649721fe593..5ea0c7045b4 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -72,7 +73,7 @@ public final class InventoryIncrementalTasksRunner implements 
PipelineTasksRunne
             log.info("job stopping, ignore inventory task");
             return;
         }
-        
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+        
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
         if (executeInventoryTask()) {
             if (jobItemContext.isStopping()) {
                 log.info("stopping, ignore incremental task");
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index f6194582526..58768c0ac95 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 639e59edd84..e2b2e4b8772 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,10 +25,10 @@ import 
org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -43,7 +43,7 @@ import java.util.LinkedList;
 @Getter
 @Setter
 @Slf4j
-public final class RuleAlteredJobContext implements PipelineJobItemContext {
+public final class RuleAlteredJobContext implements 
InventoryIncrementalJobItemContext {
     
     private final String jobId;
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 9807d41ecd7..ddd31ebe961 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index dbbc32adca7..069797aa427 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.eventbus.Subscribe;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
similarity index 100%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
similarity index 100%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
similarity index 83%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
index f817f3fe269..6a8459b838c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core;
 
-import 
org.apache.shardingsphere.data.pipeline.api.fixture.RuleAlteredJobAPIFixture;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
similarity index 94%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
index ca36f9f6785..df5abb4fefa 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.fixture;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -42,11 +41,6 @@ public final class RuleAlteredJobAPIFixture implements 
RuleAlteredJobAPI {
         return null;
     }
     
-    @Override
-    public JobType parseJobType(final String jobId) {
-        return null;
-    }
-    
     @Override
     public void extendJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
     }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
new file mode 100644
index 00000000000..97128667c37
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineJobIdUtilsTest {
+    
+    @Test
+    public void assertCodec() {
+        RuleAlteredJobId pipelineJobId = new RuleAlteredJobId();
+        pipelineJobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+        pipelineJobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+        pipelineJobId.setDatabaseName("sharding_db");
+        pipelineJobId.setCurrentMetadataVersion(0);
+        pipelineJobId.setNewMetadataVersion(1);
+        String jobId = 
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
+        JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
+        assertThat(actualJobType, is(JobType.MIGRATION));
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
similarity index 90%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
index e1837337615..dbf2c59932c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.api.fixture.RuleAlteredJobAPIFixture
+org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 7444ee930f3..6d364a790ab 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 04ec9197cba..d9545f63918 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -18,8 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 5305bc155d8..d94acec2c48 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index 795a2e3c838..c7606c53531 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
 
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;

Reply via email to