This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new bad450099fc Refactor PipelineJobAPI as SPI, extract
PipelineJobPublicAPI for isolation and common usage (#20112)
bad450099fc is described below
commit bad450099fc8de3a6ddf12330e64d9f77864e397
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Aug 12 16:18:05 2022 +0800
Refactor PipelineJobAPI as SPI, extract PipelineJobPublicAPI for isolation
and common usage (#20112)
---
.../query/CheckMigrationQueryResultSet.java | 10 +-
...ShowMigrationCheckAlgorithmsQueryResultSet.java | 8 +-
.../ShowMigrationJobStatusQueryResultSet.java | 8 +-
.../query/ShowMigrationListQueryResultSet.java | 8 +-
.../handler/update/ApplyMigrationUpdater.java | 8 +-
.../handler/update/DropMigrationUpdater.java | 8 +-
.../handler/update/ResetMigrationUpdater.java | 8 +-
.../RestoreMigrationSourceWritingUpdater.java | 8 +-
.../handler/update/StartMigrationUpdater.java | 8 +-
.../update/StopMigrationSourceWritingUpdater.java | 8 +-
.../handler/update/StopMigrationUpdater.java | 8 +-
...teredJobAPI.java => MigrationJobPublicAPI.java} | 106 +-----------------
...pelineJobAPI.java => PipelineJobPublicAPI.java} | 31 +-----
...ctory.java => PipelineJobPublicAPIFactory.java} | 12 +-
.../data/pipeline/api/job/JobType.java | 2 +-
.../data/pipeline/api/job/PipelineJobId.java | 2 +-
.../data/pipeline/api/job/PipelineJobIdUtils.java | 56 ++++++++++
.../data/pipeline/core/api/PipelineAPIFactory.java | 17 +++
.../data/pipeline/core}/api/PipelineJobAPI.java | 45 +++++---
.../data/pipeline/core}/api/RuleAlteredJobAPI.java | 123 ++-------------------
.../core}/api/RuleAlteredJobAPIFactory.java | 2 +-
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 18 +--
.../core/api/impl/RuleAlteredJobAPIImpl.java | 28 ++---
.../InventoryIncrementalJobItemContext.java} | 31 +++---
.../pipeline/core/execute/PipelineJobExecutor.java | 2 +-
.../data/pipeline/core/job/FinishedCheckJob.java | 4 +-
.../persist/PipelineJobProgressPersistService.java | 5 +-
.../core/task/InventoryIncrementalTasksRunner.java | 5 +-
.../scenario/rulealtered/RuleAlteredJob.java | 2 +-
.../rulealtered/RuleAlteredJobContext.java | 4 +-
.../rulealtered/RuleAlteredJobPreparer.java | 2 +-
.../scenario/rulealtered/RuleAlteredJobWorker.java | 2 +-
...sphere.data.pipeline.api.MigrationJobPublicAPI} | 0
...ngsphere.data.pipeline.core.api.PipelineJobAPI} | 0
...phere.data.pipeline.core.api.RuleAlteredJobAPI} | 0
.../core}/RuleAlteredJobAPIFactoryTest.java | 5 +-
.../core}/fixture/RuleAlteredJobAPIFixture.java | 10 +-
.../pipeline/core/job/PipelineJobIdUtilsTest.java | 42 +++++++
...phere.data.pipeline.core.api.RuleAlteredJobAPI} | 2 +-
.../core/api/impl/RuleAlteredJobAPIImplTest.java | 4 +-
.../pipeline/core/job/FinishedCheckJobTest.java | 4 +-
.../rulealtered/RuleAlteredJobWorkerTest.java | 2 +-
.../prepare/InventoryTaskSplitterTest.java | 2 +-
43 files changed, 272 insertions(+), 388 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
index 722afe7c2e2..9b08eae1ead 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/CheckMigrationQueryResultSet.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
*/
public final class CheckMigrationQueryResultSet implements
DatabaseDistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
private Iterator<Collection<Object>> data;
@@ -48,9 +48,9 @@ public final class CheckMigrationQueryResultSet implements
DatabaseDistSQLResult
Map<String, DataConsistencyCheckResult> checkResultMap;
AlgorithmSegment typeStrategy =
checkMigrationStatement.getTypeStrategy();
if (null == typeStrategy) {
- checkResultMap =
RULE_ALTERED_JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId());
+ checkResultMap =
JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId());
} else {
- checkResultMap =
RULE_ALTERED_JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(),
typeStrategy.getName(), typeStrategy.getProps());
+ checkResultMap =
JOB_API.dataConsistencyCheck(checkMigrationStatement.getJobId(),
typeStrategy.getName(), typeStrategy.getProps());
}
data = checkResultMap.entrySet().stream()
.map(each -> {
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
index 7d875520f92..92d43f35ff0 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsQueryResultSet.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
@@ -34,13 +34,13 @@ import java.util.stream.Collectors;
*/
public final class ShowMigrationCheckAlgorithmsQueryResultSet implements
DatabaseDistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
private Iterator<Collection<Object>> data;
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement
sqlStatement) {
- data =
RULE_ALTERED_JOB_API.listDataConsistencyCheckAlgorithms().stream().map(
+ data = JOB_API.listDataConsistencyCheckAlgorithms().stream().map(
each -> (Collection<Object>)
Arrays.<Object>asList(each.getType(), String.join(",",
each.getSupportedDatabaseTypes()),
each.getDescription())).collect(Collectors.toList()).iterator();
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
index f573f604e3b..2aa717e9f48 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusQueryResultSet.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationStatusStatement;
@@ -36,14 +36,14 @@ import java.util.stream.Collectors;
*/
public final class ShowMigrationJobStatusQueryResultSet implements
DatabaseDistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
private Iterator<Collection<Object>> data;
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement
sqlStatement) {
long currentTimeMillis = System.currentTimeMillis();
- data =
RULE_ALTERED_JOB_API.getJobProgress(((ShowMigrationStatusStatement)
sqlStatement).getJobId()).entrySet().stream()
+ data = JOB_API.getJobProgress(((ShowMigrationStatusStatement)
sqlStatement).getJobId()).entrySet().stream()
.map(entry -> {
Collection<Object> result = new LinkedList<>();
result.add(entry.getKey());
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
index 90f01d9aac0..fdca0dc082b 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListQueryResultSet.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.query.DatabaseDistSQLResultSet;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationListStatement;
@@ -35,13 +35,13 @@ import java.util.stream.Collectors;
*/
public final class ShowMigrationListQueryResultSet implements
DatabaseDistSQLResultSet {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
private Iterator<Collection<Object>> data;
@Override
public void init(final ShardingSphereDatabase database, final SQLStatement
sqlStatement) {
- data = RULE_ALTERED_JOB_API.list().stream()
+ data = JOB_API.list().stream()
.map(each -> {
Collection<Object> result = new LinkedList<>();
result.add(each.getJobId());
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
index ce616c9a4d5..793be29ceef 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ApplyMigrationUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.ApplyMigrationStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.ApplyMigrationState
*/
public final class ApplyMigrationUpdater implements
RALUpdater<ApplyMigrationStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final ApplyMigrationStatement sqlStatement) {
-
RULE_ALTERED_JOB_API.switchClusterConfiguration(sqlStatement.getJobId());
+ JOB_API.switchClusterConfiguration(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
index 3070ff19947..eac7749d309 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/DropMigrationUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.DropMigrationStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.DropMigrationStatem
*/
public final class DropMigrationUpdater implements
RALUpdater<DropMigrationStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final DropMigrationStatement sqlStatement) {
- RULE_ALTERED_JOB_API.remove(sqlStatement.getJobId());
+ JOB_API.remove(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
index e20ae3417e5..5356a368483 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/ResetMigrationUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.ResetMigrationStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.ResetMigrationState
*/
public final class ResetMigrationUpdater implements
RALUpdater<ResetMigrationStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final ResetMigrationStatement sqlStatement) {
- RULE_ALTERED_JOB_API.reset(sqlStatement.getJobId());
+ JOB_API.reset(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
index 0f644ceb72a..8907586f6e6 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RestoreMigrationSourceWritingUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.RestoreMigrationSourceWritingStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.RestoreMigrationSou
*/
public final class RestoreMigrationSourceWritingUpdater implements
RALUpdater<RestoreMigrationSourceWritingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final RestoreMigrationSourceWritingStatement
sqlStatement) {
- RULE_ALTERED_JOB_API.restoreClusterWriteDB(sqlStatement.getJobId());
+ JOB_API.restoreClusterWriteDB(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
index 905fe5d2c3e..3846be4ceba 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
*/
public final class StartMigrationUpdater implements
RALUpdater<StartMigrationStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final StartMigrationStatement sqlStatement) {
- RULE_ALTERED_JOB_API.startDisabledJob(sqlStatement.getJobId());
+ JOB_API.startDisabledJob(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
index 7b42756fb7c..c096aa65e99 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationSourceWritingUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationSourceWritingStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationSource
*/
public final class StopMigrationSourceWritingUpdater implements
RALUpdater<StopMigrationSourceWritingStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final StopMigrationSourceWritingStatement
sqlStatement) {
- RULE_ALTERED_JOB_API.stopClusterWriteDB(sqlStatement.getJobId());
+ JOB_API.stopClusterWriteDB(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
index 008c64ba269..bb7883bdfbf 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -17,8 +17,8 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPIFactory;
import org.apache.shardingsphere.infra.distsql.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
@@ -27,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatem
*/
public final class StopMigrationUpdater implements
RALUpdater<StopMigrationStatement> {
- private static final RuleAlteredJobAPI RULE_ALTERED_JOB_API =
RuleAlteredJobAPIFactory.getInstance();
+ private static final MigrationJobPublicAPI JOB_API =
PipelineJobPublicAPIFactory.getMigrationJobPublicAPI();
@Override
public void executeUpdate(final StopMigrationStatement sqlStatement) {
- RULE_ALTERED_JOB_API.stop(sqlStatement.getJobId());
+ JOB_API.stop(sqlStatement.getJobId());
}
@Override
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
similarity index 52%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
index 9cfe9a3f896..99851859fe4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/MigrationJobPublicAPI.java
@@ -18,24 +18,22 @@
package org.apache.shardingsphere.data.pipeline.api;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
/**
- * Rule altered job API.
+ * Migration job public API.
*/
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
+@SingletonSPI
+public interface MigrationJobPublicAPI extends PipelineJobPublicAPI,
RequiredSPI {
/**
* List all jobs.
@@ -44,30 +42,15 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
List<JobInfo> list();
- /**
- * Start scaling job by config.
- *
- * @param jobConfig job config
- * @return job id
- */
- Optional<String> start(RuleAlteredJobConfiguration jobConfig);
-
/**
* Get job progress.
*
* @param jobId job id
* @return each sharding item progress
*/
+ // TODO add JobProgress
Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String
jobId);
- /**
- * Get job progress.
- *
- * @param jobConfig job configuration
- * @return each sharding item progress
- */
- Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(RuleAlteredJobConfiguration jobConfig);
-
/**
* Stop cluster writing.
*
@@ -75,13 +58,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
void stopClusterWriteDB(String jobId);
- /**
- * Stop cluster writing.
- *
- * @param jobConfig job configuration
- */
- void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
-
/**
* Restore cluster writing.
*
@@ -89,13 +65,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
void restoreClusterWriteDB(String jobId);
- /**
- * Restore cluster writing.
- *
- * @param jobConfig job configuration
- */
- void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
-
/**
* List all data consistency check algorithms from SPI.
*
@@ -111,14 +80,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
boolean isDataConsistencyCheckNeeded(String jobId);
- /**
- * Is data consistency check needed.
- *
- * @param jobConfig job configuration
- * @return data consistency check needed or not
- */
- boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration
jobConfig);
-
/**
* Do data consistency check.
*
@@ -127,14 +88,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
- /**
- * Do data consistency check.
- *
- * @param jobConfig job configuration
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
-
/**
* Do data consistency check.
*
@@ -145,15 +98,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
- /**
- * Aggregate data consistency check results.
- *
- * @param jobId job id
- * @param checkResults check results
- * @return check success or not
- */
- boolean aggregateDataConsistencyCheckResults(String jobId, Map<String,
DataConsistencyCheckResult> checkResults);
-
/**
* Switch cluster configuration.
*
@@ -161,50 +105,10 @@ public interface RuleAlteredJobAPI extends
PipelineJobAPI, RequiredSPI {
*/
void switchClusterConfiguration(String jobId);
- /**
- * Switch cluster configuration.
- *
- * @param jobConfig job configuration
- */
- void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
-
/**
* Reset scaling job.
*
* @param jobId job id
*/
void reset(String jobId);
-
- /**
- * Get job configuration.
- *
- * @param jobId job id
- * @return job configuration
- */
- RuleAlteredJobConfiguration getJobConfig(String jobId);
-
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress
- */
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
-
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
similarity index 60%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
index 7388cf15821..efd9ce54bd8 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPI.java
@@ -17,37 +17,12 @@
package org.apache.shardingsphere.data.pipeline.api;
-import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
/**
- * Pipeline job API.
+ * Pipeline job public API.
*/
-public interface PipelineJobAPI {
-
- /**
- * Marshal pipeline job id.
- *
- * @param pipelineJobId pipeline job id
- * @return marshaled text
- */
- String marshalJobId(PipelineJobId pipelineJobId);
-
- /**
- * Parse job type.
- *
- * @param jobId job id
- * @return job type
- */
- JobType parseJobType(String jobId);
-
- /**
- * Extend job configuration.
- *
- * @param yamlJobConfig yaml job configuration
- */
- void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
+public interface PipelineJobPublicAPI extends TypedSPI {
/**
* Start pipeline job by id.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
similarity index 75%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
index 2d640a62af8..557e9c49783 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactory.java
@@ -21,20 +21,20 @@ import
org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
/**
- * Rule altered job API factory.
+ * Pipeline job public API factory.
*/
-public final class RuleAlteredJobAPIFactory {
+public final class PipelineJobPublicAPIFactory {
static {
- ShardingSphereServiceLoader.register(RuleAlteredJobAPI.class);
+ ShardingSphereServiceLoader.register(MigrationJobPublicAPI.class);
}
/**
- * Get instance of rule altered job API.
+ * Get instance of migration job public API.
*
* @return got instance
*/
- public static RuleAlteredJobAPI getInstance() {
- return
RequiredSPIRegistry.getRegisteredService(RuleAlteredJobAPI.class);
+ public static MigrationJobPublicAPI getMigrationJobPublicAPI() {
+ return
RequiredSPIRegistry.getRegisteredService(MigrationJobPublicAPI.class);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
index 57118ef052c..174550b90fe 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobType.java
@@ -52,7 +52,7 @@ public enum JobType {
* Value of by code.
*
* @param typeCode type code
- * @return job type
+ * @return job type, might be null
*/
public static JobType valueOfByCode(final String typeCode) {
return CODE_JOB_TYPE_MAP.get(typeCode);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
index 77749a0cf6d..a038c1b953c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.api.job;
/**
- * Job id.
+ * Pipeline job id.
*/
public interface PipelineJobId {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
new file mode 100644
index 00000000000..a25ec5d3b60
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobIdUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.api.job;
+
+/**
+ * Pipeline job id utils.
+ */
+public final class PipelineJobIdUtils {
+
+ /**
+ * Marshal job id common prefix.
+ *
+ * @param pipelineJobId pipeline job id
+ * @return job id common prefix
+ */
+ public static String marshalJobIdCommonPrefix(final PipelineJobId
pipelineJobId) {
+ return 'j' + pipelineJobId.getTypeCode();
+ }
+
+ /**
+ * Parse job type.
+ *
+ * @param jobId job id
+ * @return job type
+ * @throws IllegalArgumentException if job id is invalid
+ */
+ public static JobType parseJobType(final String jobId) {
+ if (jobId.length() <= 3) {
+ throw new IllegalArgumentException("Invalid jobId length, jobId="
+ jobId);
+ }
+ if ('j' != jobId.charAt(0)) {
+ throw new IllegalArgumentException("Invalid jobId, first char=" +
jobId.charAt(0));
+ }
+ String typeCode = jobId.substring(1, 3);
+ JobType result = JobType.valueOfByCode(typeCode);
+ if (null == result) {
+ throw new IllegalArgumentException("Could not get JobType by '" +
typeCode + "', jobId: " + jobId);
+ }
+ return result;
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
index 29c30aecf6a..b9438115173 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineAPIFactory.java
@@ -23,6 +23,7 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import
org.apache.shardingsphere.data.pipeline.core.api.impl.GovernanceRepositoryAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
@@ -33,6 +34,8 @@ import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
import
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
@@ -42,6 +45,10 @@ import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class PipelineAPIFactory {
+ static {
+ ShardingSphereServiceLoader.register(PipelineJobAPI.class);
+ }
+
private static final LazyInitializer<GovernanceRepositoryAPI>
REPOSITORY_API_LAZY_INITIALIZER = new
LazyInitializer<GovernanceRepositoryAPI>() {
@Override
@@ -60,6 +67,16 @@ public final class PipelineAPIFactory {
return REPOSITORY_API_LAZY_INITIALIZER.get();
}
+ /**
+ * Get pipeline job API.
+ *
+ * @param jobType job type
+ * @return pipeline job API
+ */
+ public static PipelineJobAPI getPipelineJobAPI(final JobType jobType) {
+ return TypedSPIRegistry.getRegisteredService(PipelineJobAPI.class,
jobType.getTypeName());
+ }
+
/**
* Get job statistics API.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
similarity index 54%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
index 7388cf15821..36e16b2e8eb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/PipelineJobAPI.java
@@ -15,16 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.api.PipelineJobPublicAPI;
+import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import
org.apache.shardingsphere.data.pipeline.api.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
/**
* Pipeline job API.
*/
-public interface PipelineJobAPI {
+@SingletonSPI
+public interface PipelineJobAPI extends PipelineJobPublicAPI, TypedSPI {
/**
* Marshal pipeline job id.
@@ -34,14 +41,6 @@ public interface PipelineJobAPI {
*/
String marshalJobId(PipelineJobId pipelineJobId);
- /**
- * Parse job type.
- *
- * @param jobId job id
- * @return job type
- */
- JobType parseJobType(String jobId);
-
/**
* Extend job configuration.
*
@@ -50,23 +49,35 @@ public interface PipelineJobAPI {
void extendJobConfiguration(YamlPipelineJobConfiguration yamlJobConfig);
/**
- * Start pipeline job by id.
+ * Get job configuration.
*
* @param jobId job id
+ * @return job configuration
+ */
+ PipelineJobConfiguration getJobConfig(String jobId);
+
+ /**
+ * Persist job item progress.
+ *
+ * @param jobItemContext job item context
*/
- void startDisabledJob(String jobId);
+ void persistJobItemProgress(PipelineJobItemContext jobItemContext);
/**
- * Stop pipeline job.
+ * Get job item progress.
*
* @param jobId job id
+ * @param shardingItem sharding item
+ * @return job item progress
*/
- void stop(String jobId);
+ PipelineJobItemProgress getJobItemProgress(String jobId, int shardingItem);
/**
- * Remove pipeline job.
+ * Update job item status.
*
* @param jobId job id
+ * @param shardingItem sharding item
+ * @param status status
*/
- void remove(String jobId);
+ void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
similarity index 51%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
index 9cfe9a3f896..7a8def8601d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPI.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPI.java
@@ -15,34 +15,23 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
+import org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
+import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
-import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
/**
* Rule altered job API.
*/
-public interface RuleAlteredJobAPI extends PipelineJobAPI, RequiredSPI {
-
- /**
- * List all jobs.
- *
- * @return job infos
- */
- List<JobInfo> list();
+@SingletonSPI
+public interface RuleAlteredJobAPI extends PipelineJobAPI,
MigrationJobPublicAPI, RequiredSPI {
/**
* Start scaling job by config.
@@ -52,14 +41,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
Optional<String> start(RuleAlteredJobConfiguration jobConfig);
- /**
- * Get job progress.
- *
- * @param jobId job id
- * @return each sharding item progress
- */
- Map<Integer, InventoryIncrementalJobItemProgress> getJobProgress(String
jobId);
-
/**
* Get job progress.
*
@@ -68,12 +49,8 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(RuleAlteredJobConfiguration jobConfig);
- /**
- * Stop cluster writing.
- *
- * @param jobId job id
- */
- void stopClusterWriteDB(String jobId);
+ @Override
+ InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
/**
* Stop cluster writing.
@@ -82,13 +59,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
void stopClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
- /**
- * Restore cluster writing.
- *
- * @param jobId job id
- */
- void restoreClusterWriteDB(String jobId);
-
/**
* Restore cluster writing.
*
@@ -96,21 +66,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
void restoreClusterWriteDB(RuleAlteredJobConfiguration jobConfig);
- /**
- * List all data consistency check algorithms from SPI.
- *
- * @return data consistency check algorithms
- */
- Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms();
-
- /**
- * Is data consistency check needed.
- *
- * @param jobId job id
- * @return data consistency check needed or not
- */
- boolean isDataConsistencyCheckNeeded(String jobId);
-
/**
* Is data consistency check needed.
*
@@ -119,14 +74,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
boolean isDataConsistencyCheckNeeded(RuleAlteredJobConfiguration
jobConfig);
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId);
-
/**
* Do data consistency check.
*
@@ -135,16 +82,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
Map<String, DataConsistencyCheckResult>
dataConsistencyCheck(RuleAlteredJobConfiguration jobConfig);
- /**
- * Do data consistency check.
- *
- * @param jobId job id
- * @param algorithmType algorithm type
- * @param algorithmProps algorithm props. Nullable
- * @return each logic table check result
- */
- Map<String, DataConsistencyCheckResult> dataConsistencyCheck(String jobId,
String algorithmType, Properties algorithmProps);
-
/**
* Aggregate data consistency check results.
*
@@ -154,13 +91,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
boolean aggregateDataConsistencyCheckResults(String jobId, Map<String,
DataConsistencyCheckResult> checkResults);
- /**
- * Switch cluster configuration.
- *
- * @param jobId job id
- */
- void switchClusterConfiguration(String jobId);
-
/**
* Switch cluster configuration.
*
@@ -168,43 +98,6 @@ public interface RuleAlteredJobAPI extends PipelineJobAPI,
RequiredSPI {
*/
void switchClusterConfiguration(RuleAlteredJobConfiguration jobConfig);
- /**
- * Reset scaling job.
- *
- * @param jobId job id
- */
- void reset(String jobId);
-
- /**
- * Get job configuration.
- *
- * @param jobId job id
- * @return job configuration
- */
+ @Override
RuleAlteredJobConfiguration getJobConfig(String jobId);
-
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress
- */
- InventoryIncrementalJobItemProgress getJobItemProgress(String jobId, int
shardingItem);
-
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
similarity index 95%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
index 2d640a62af8..e0a87e03ec4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/RuleAlteredJobAPIFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core.api;
import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 9f496fa20e2..aca2c9f609a 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -18,9 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.PipelineJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -39,23 +39,11 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
@Override
public final String marshalJobId(final PipelineJobId pipelineJobId) {
- return 'j' + pipelineJobId.getTypeCode() +
marshalJobIdLeftPart(pipelineJobId);
+ return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) +
marshalJobIdLeftPart(pipelineJobId);
}
protected abstract String marshalJobIdLeftPart(PipelineJobId
pipelineJobId);
- @Override
- public JobType parseJobType(final String jobId) {
- if (jobId.length() <= 3) {
- throw new IllegalArgumentException("Invalid jobId length, jobId="
+ jobId);
- }
- if ('j' == jobId.charAt(0)) {
- throw new IllegalArgumentException("Invalid jobId, first char=" +
jobId.charAt(0));
- }
- String typeCode = jobId.substring(1, 3);
- return JobType.valueOfByCode(typeCode);
- }
-
@Override
public void startDisabledJob(final String jobId) {
log.info("Start disabled pipeline job {}", jobId);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
index 0822a728cb0..6e17cf8a6f9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImpl.java
@@ -22,7 +22,7 @@ import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang3.StringUtils;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
@@ -46,6 +46,7 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyCalculateAlgorithmFactory;
import
org.apache.shardingsphere.data.pipeline.core.check.consistency.DataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
@@ -58,7 +59,6 @@ import
org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobContext;
import
org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.rulealtered.RuleAlteredJobConfigurationPreparerFactory;
@@ -408,30 +408,27 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
@Override
public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- if (!(jobItemContext instanceof RuleAlteredJobContext)) {
- return;
- }
- RuleAlteredJobContext context = (RuleAlteredJobContext) jobItemContext;
+ InventoryIncrementalJobItemContext context =
(InventoryIncrementalJobItemContext) jobItemContext;
InventoryIncrementalJobItemProgress jobItemProgress = new
InventoryIncrementalJobItemProgress();
jobItemProgress.setStatus(jobItemContext.getStatus());
-
jobItemProgress.setSourceDatabaseType(context.getJobConfig().getSourceDatabaseType());
- jobItemProgress.setIncremental(getIncrementalTasksProgress(context));
- jobItemProgress.setInventory(getInventoryTasksProgress(context));
+
jobItemProgress.setSourceDatabaseType(jobItemContext.getJobConfig().getSourceDatabaseType());
+
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
+
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), value);
}
- private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
RuleAlteredJobContext jobItemContext) {
+ private JobItemIncrementalTasksProgress getIncrementalTasksProgress(final
Collection<IncrementalTask> incrementalTasks) {
Map<String, IncrementalTaskProgress> incrementalTaskProgressMap = new
HashMap<>();
- for (IncrementalTask each : jobItemContext.getIncrementalTasks()) {
+ for (IncrementalTask each : incrementalTasks) {
incrementalTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
}
return new JobItemIncrementalTasksProgress(incrementalTaskProgressMap);
}
- private JobItemInventoryTasksProgress getInventoryTasksProgress(final
RuleAlteredJobContext jobItemContext) {
+ private JobItemInventoryTasksProgress getInventoryTasksProgress(final
Collection<InventoryTask> inventoryTasks) {
Map<String, InventoryTaskProgress> inventoryTaskProgressMap = new
HashMap<>();
- for (InventoryTask each : jobItemContext.getInventoryTasks()) {
+ for (InventoryTask each : inventoryTasks) {
inventoryTaskProgressMap.put(each.getTaskId(),
each.getTaskProgress());
}
return new JobItemInventoryTasksProgress(inventoryTaskProgressMap);
@@ -456,4 +453,9 @@ public final class RuleAlteredJobAPIImpl extends
AbstractPipelineJobAPIImpl impl
jobItemProgress.setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress)));
}
+
+ @Override
+ public String getType() {
+ return JobType.MIGRATION.getTypeName();
+ }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
similarity index 56%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 77749a0cf6d..91637df225d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/PipelineJobId.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -15,31 +15,30 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.job;
+package org.apache.shardingsphere.data.pipeline.core.context;
+
+import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
/**
- * Job id.
+ * Inventory incremental job item context.
*/
-public interface PipelineJobId {
-
- /**
- * Get type.
- *
- * @return type
- */
- String getTypeCode();
+public interface InventoryIncrementalJobItemContext extends
PipelineJobItemContext {
/**
- * Get format version.
+ * Get inventory tasks.
*
- * @return format version
+ * @return inventory tasks
*/
- String getFormatVersion();
+ Collection<InventoryTask> getInventoryTasks();
/**
- * Get database name.
+ * Get incremental tasks.
*
- * @return database name
+ * @return incremental tasks
*/
- String getDatabaseName();
+ Collection<IncrementalTask> getIncrementalTasks();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index f7f984a13c7..373fa3ac1ec 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.execute;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
index ef1f4a4dea0..6bb7b0df73d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJob.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.detect.RuleAlteredJobAlmostCompletedParameter;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 1ff05a8bdf4..20066a1a272 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -18,8 +18,9 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.persist;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
@@ -102,7 +103,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
-
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext.get());
+
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobId)).persistJobItemProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost time: {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index 649721fe593..5ea0c7045b4 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -20,11 +20,12 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -72,7 +73,7 @@ public final class InventoryIncrementalTasksRunner implements
PipelineTasksRunne
log.info("job stopping, ignore inventory task");
return;
}
-
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
+
PipelineAPIFactory.getPipelineJobAPI(PipelineJobIdUtils.parseJobType(jobItemContext.getJobId())).persistJobItemProgress(jobItemContext);
if (executeInventoryTask()) {
if (jobItemContext.isStopping()) {
log.info("stopping, ignore incremental task");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index f6194582526..58768c0ac95 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index 639e59edd84..e2b2e4b8772 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,10 +25,10 @@ import
org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -43,7 +43,7 @@ import java.util.LinkedList;
@Getter
@Setter
@Slf4j
-public final class RuleAlteredJobContext implements PipelineJobItemContext {
+public final class RuleAlteredJobContext implements
InventoryIncrementalJobItemContext {
private final String jobId;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 9807d41ecd7..ddd31ebe961 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
index dbbc32adca7..069797aa427 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorker.java
@@ -21,7 +21,7 @@ import com.google.common.base.Preconditions;
import com.google.common.eventbus.Subscribe;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
similarity index 100%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.MigrationJobPublicAPI
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
similarity index 100%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
similarity index 100%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
similarity index 83%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
index f817f3fe269..6a8459b838c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/RuleAlteredJobAPIFactoryTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/RuleAlteredJobAPIFactoryTest.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api;
+package org.apache.shardingsphere.data.pipeline.core;
-import
org.apache.shardingsphere.data.pipeline.api.fixture.RuleAlteredJobAPIFixture;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.instanceOf;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
similarity index 94%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
index ca36f9f6785..df5abb4fefa 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/java/org/apache/shardingsphere/data/pipeline/api/fixture/RuleAlteredJobAPIFixture.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/RuleAlteredJobAPIFixture.java
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.fixture;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.config.job.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.JobType;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -42,11 +41,6 @@ public final class RuleAlteredJobAPIFixture implements
RuleAlteredJobAPI {
return null;
}
- @Override
- public JobType parseJobType(final String jobId) {
- return null;
- }
-
@Override
public void extendJobConfiguration(final YamlPipelineJobConfiguration
yamlJobConfig) {
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
new file mode 100644
index 00000000000..97128667c37
--- /dev/null
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtilsTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.data.pipeline.api.job.RuleAlteredJobId;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineJobIdUtilsTest {
+
+ @Test
+ public void assertCodec() {
+ RuleAlteredJobId pipelineJobId = new RuleAlteredJobId();
+ pipelineJobId.setTypeCode(JobType.MIGRATION.getTypeCode());
+ pipelineJobId.setFormatVersion(RuleAlteredJobId.CURRENT_VERSION);
+ pipelineJobId.setDatabaseName("sharding_db");
+ pipelineJobId.setCurrentMetadataVersion(0);
+ pipelineJobId.setNewMetadataVersion(1);
+ String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
+ JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
+ assertThat(actualJobType, is(JobType.MIGRATION));
+ }
+}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
similarity index 90%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
index e1837337615..dbf2c59932c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI
@@ -15,4 +15,4 @@
# limitations under the License.
#
-org.apache.shardingsphere.data.pipeline.api.fixture.RuleAlteredJobAPIFixture
+org.apache.shardingsphere.data.pipeline.core.fixture.RuleAlteredJobAPIFixture
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 7444ee930f3..6d364a790ab 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
index 04ec9197cba..d9545f63918 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/FinishedCheckJobTest.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.SneakyThrows;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index 5305bc155d8..d94acec2c48 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
import org.apache.commons.io.FileUtils;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index 795a2e3c838..c7606c53531 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
-import org.apache.shardingsphere.data.pipeline.api.RuleAlteredJobAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;