This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b9288841882 Add YamlPipelineJobItemProgressSwapper (#29073)
b9288841882 is described below
commit b9288841882245c911fa31bf6902517b0c99a328
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 15:01:07 2023 +0800
Add YamlPipelineJobItemProgressSwapper (#29073)
---
...YamlConsistencyCheckJobItemProgressSwapper.java | 4 +-
...InventoryIncrementalJobItemProgressSwapper.java | 4 +-
.../persist/PipelineJobProgressPersistService.java | 3 +-
.../job/service/InventoryIncrementalJobAPI.java | 6 +++
.../pipeline/core/job/service/PipelineJobAPI.java | 30 ++++++---------
.../core/job/service/PipelineJobManager.java | 44 +++++++++++++++++-----
.../AbstractInventoryIncrementalJobAPIImpl.java | 27 +------------
.../yaml/YamlPipelineJobItemProgressSwapper.java | 31 +++++++++++++++
.../runner/InventoryIncrementalTasksRunner.java | 2 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 +--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 5 ++-
.../api/impl/ConsistencyCheckJobAPI.java | 30 ++++-----------
.../task/ConsistencyCheckTasksRunner.java | 10 ++---
.../migration/api/impl/MigrationJobAPI.java | 2 +-
.../migration/prepare/MigrationJobPreparer.java | 5 ++-
.../api/impl/ConsistencyCheckJobAPITest.java | 16 ++++----
.../migration/api/impl/MigrationJobAPITest.java | 4 +-
17 files changed, 129 insertions(+), 100 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index c399505edf2..066df10fc28 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -19,12 +19,12 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
/**
* YAML data check job item progress swapper.
*/
-public final class YamlConsistencyCheckJobItemProgressSwapper implements
YamlConfigurationSwapper<YamlConsistencyCheckJobItemProgress,
ConsistencyCheckJobItemProgress> {
+public final class YamlConsistencyCheckJobItemProgressSwapper implements
YamlPipelineJobItemProgressSwapper<YamlConsistencyCheckJobItemProgress,
ConsistencyCheckJobItemProgress> {
@Override
public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final
ConsistencyCheckJobItemProgress data) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 696a4e5f34e..24c2f6a5a74 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -19,14 +19,14 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
* YAML inventory incremental job item progress swapper.
*/
-public final class YamlInventoryIncrementalJobItemProgressSwapper implements
YamlConfigurationSwapper<YamlInventoryIncrementalJobItemProgress,
InventoryIncrementalJobItemProgress> {
+public final class YamlInventoryIncrementalJobItemProgressSwapper implements
YamlPipelineJobItemProgressSwapper<YamlInventoryIncrementalJobItemProgress,
InventoryIncrementalJobItemProgress> {
private final YamlJobItemInventoryTasksProgressSwapper
inventoryTasksProgressSwapper = new YamlJobItemInventoryTasksProgressSwapper();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 478cd13bb6c..87e13a2ea09 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -129,7 +130,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).updateJobItemProgress(jobItemContext.get());
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 487555edcb5..908c3103c76 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrement
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
@@ -43,6 +44,11 @@ import java.util.Optional;
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
+ @Override
+ default YamlInventoryIncrementalJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlInventoryIncrementalJobItemProgressSwapper();
+ }
+
/**
* Get pipeline job info.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 6a4d6a5fcc4..b5ab26690a0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,11 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -34,12 +34,20 @@ import java.util.Optional;
public interface PipelineJobAPI extends TypedSPI {
/**
- * Get YAML job configuration swapper.
+ * Get YAML pipeline job configuration swapper.
*
- * @return YAML job configuration swapper
+ * @return YAML pipeline job configuration swapper
*/
YamlPipelineJobConfigurationSwapper<?, ?> getYamlJobConfigurationSwapper();
+ /**
+ * Get YAML pipeline job item progress swapper.
+ *
+ * @return YAML pipeline job item progress swapper
+ */
+ @SuppressWarnings("rawtypes")
+ YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper();
+
/**
* Whether to ignore to start disabled job when job item progress is
finished.
*
@@ -67,20 +75,6 @@ public interface PipelineJobAPI extends TypedSPI {
return Optional.empty();
}
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- void persistJobItemProgress(PipelineJobItemContext jobItemContext);
-
- /**
- * Update job item progress.
- *
- * @param jobItemContext job item context
- */
- void updateJobItemProgress(PipelineJobItemContext jobItemContext);
-
/**
* Get job item progress.
*
@@ -104,7 +98,7 @@ public interface PipelineJobAPI extends TypedSPI {
*
* @return pipeline job class
*/
- Class<? extends PipelineJob> getPipelineJobClass();
+ Class<? extends PipelineJob> getJobClass();
@Override
String getType();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index be7c8121cb3..99d846dd6d7 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
@@ -55,7 +56,7 @@ public final class PipelineJobManager {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private final PipelineJobAPI pipelineJobAPI;
+ private final PipelineJobAPI jobAPI;
/**
* Get job configuration.
@@ -64,7 +65,7 @@ public final class PipelineJobManager {
* @return pipeline job configuration
*/
public PipelineJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
- return
pipelineJobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+ return
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
/**
@@ -82,7 +83,7 @@ public final class PipelineJobManager {
log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
return Optional.of(jobId);
}
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
pipelineJobAPI.getPipelineJobClass().getName());
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
jobAPI.getJobClass().getName());
repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
return Optional.of(jobId);
}
@@ -93,15 +94,15 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void startDisabledJob(final String jobId) {
- if
(pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
- Optional<? extends PipelineJobItemProgress> jobItemProgress =
pipelineJobAPI.getJobItemProgress(jobId, 0);
+ if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
+ Optional<? extends PipelineJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobId, 0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
}
}
startCurrentDisabledJob(jobId);
- pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
+ jobAPI.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
}
@@ -139,7 +140,7 @@ public final class PipelineJobManager {
* @param jobId job id
*/
public void stop(final String jobId) {
- pipelineJobAPI.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
+ jobAPI.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
stopCurrentJob(jobId);
}
@@ -189,8 +190,8 @@ public final class PipelineJobManager {
* @return jobs info
*/
public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey
contextKey) {
- if (pipelineJobAPI instanceof InventoryIncrementalJobAPI) {
- return getJobBriefInfos(contextKey,
pipelineJobAPI.getType()).map(each -> ((InventoryIncrementalJobAPI)
pipelineJobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
+ if (jobAPI instanceof InventoryIncrementalJobAPI) {
+ return getJobBriefInfos(contextKey, jobAPI.getType()).map(each ->
((InventoryIncrementalJobAPI)
jobAPI).getJobInfo(each.getJobName())).collect(Collectors.toList());
}
return Collections.emptyList();
}
@@ -200,6 +201,31 @@ public final class PipelineJobManager {
.filter(each ->
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}
+ /**
+ * Persist job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+ }
+
+ /**
+ * Update job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
+ }
+
+ @SuppressWarnings("unchecked")
+ private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
+ return
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
+ }
+
/**
* Get job item error message.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 372971129cc..1706b2c4c9f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -17,19 +17,15 @@
package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-import lombok.AccessLevel;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -66,9 +62,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
implements Inventor
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
- @Getter(AccessLevel.PROTECTED)
- private final YamlInventoryIncrementalJobItemProgressSwapper
jobItemProgressSwapper = new YamlInventoryIncrementalJobItemProgressSwapper();
-
private final YamlJobOffsetInfoSwapper jobOffsetInfoSwapper = new
YamlJobOffsetInfoSwapper();
@Override
@@ -121,22 +114,6 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
return result;
}
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
- @Override
- public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
- private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- return
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration((InventoryIncrementalJobItemProgress)
jobItemContext.toProgress()));
- }
-
@Override
public void persistJobOffsetInfo(final String jobId, final JobOffsetInfo
jobOffsetInfo) {
String value =
YamlEngine.marshal(jobOffsetInfoSwapper.swapToYamlConfiguration(jobOffsetInfo));
@@ -156,7 +133,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
@Override
public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
- return progress.map(optional ->
jobItemProgressSwapper.swapToObject(YamlEngine.unmarshal(optional,
YamlInventoryIncrementalJobItemProgress.class)));
+ return progress.map(optional ->
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional,
YamlInventoryIncrementalJobItemProgress.class)));
}
@Override
@@ -167,7 +144,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
-
YamlEngine.marshal(jobItemProgressSwapper.swapToYamlConfiguration(jobItemProgress.get())));
+
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
new file mode 100644
index 00000000000..90f522b0910
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.yaml;
+
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * YAML pipeline job configuration swapper.
+ *
+ * @param <Y> type of YAML configuration
+ * @param <T> type of swapped pipeline job item progress
+ */
+public interface YamlPipelineJobItemProgressSwapper<Y extends
YamlConfiguration, T extends PipelineJobItemProgress> extends
YamlConfigurationSwapper<Y, T> {
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 1923ad14911..ad9e3ccb81b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -83,7 +83,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
+ new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
executeIncrementalTask();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index dc0bda53380..070328d868e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -123,7 +123,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
if (repositoryAPI.isExisted(jobConfigKey)) {
log.warn("CDC job already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
} else {
-
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getPipelineJobClass().getName());
+
repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobConfig.getJobId()),
getJobClass().getName());
JobConfigurationPOJO jobConfigPOJO =
jobConfig.convertToJobConfigurationPOJO();
jobConfigPOJO.setDisabled(true);
repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfigPOJO));
@@ -176,7 +176,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
InventoryIncrementalJobItemProgress jobItemProgress =
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager,
dumperContext);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
- jobId, i,
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
+ jobId, i,
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
}
} catch (final SQLException ex) {
throw new PrepareJobWithGetBinlogPositionException(jobId, ex);
@@ -329,7 +329,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- public Class<CDCJob> getPipelineJobClass() {
+ public Class<CDCJob> getJobClass() {
return CDCJob.class;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 681307a84c1..e79887f33cd 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -68,6 +69,8 @@ public final class CDCJobPreparer {
private final CDCJobAPI jobAPI = new CDCJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+
/**
* Do prepare work.
*
@@ -88,7 +91,7 @@ public final class CDCJobPreparer {
final AtomicBoolean incrementalImporterUsed, final
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
- jobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index da0f06e6ed9..62a1520c06b 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
@@ -73,8 +72,6 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
- private final YamlConsistencyCheckJobItemProgressSwapper swapper = new
YamlConsistencyCheckJobItemProgressSwapper();
-
/**
* Create consistency check configuration and start job.
*
@@ -120,26 +117,10 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
return true;
}
- @Override
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
- private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- return
YamlEngine.marshal(swapper.swapToYamlConfiguration((ConsistencyCheckJobItemProgress)
jobItemContext.toProgress()));
- }
-
- @Override
- public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
@Override
public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final
String jobId, final int shardingItem) {
Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
- return progress.map(s -> swapper.swapToObject(YamlEngine.unmarshal(s,
YamlConsistencyCheckJobItemProgress.class, true)));
+ return progress.map(s ->
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s,
YamlConsistencyCheckJobItemProgress.class, true)));
}
@Override
@@ -151,7 +132,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
jobItemProgress.get().setStatus(status);
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
-
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+
YamlEngine.marshal(getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
}
/**
@@ -326,7 +307,12 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
@Override
- public Class<ConsistencyCheckJob> getPipelineJobClass() {
+ public YamlConsistencyCheckJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
+ return new YamlConsistencyCheckJobItemProgressSwapper();
+ }
+
+ @Override
+ public Class<ConsistencyCheckJob> getJobClass() {
return ConsistencyCheckJob.class;
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 43f826dc52c..37947db77b8 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -50,9 +50,9 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public final class ConsistencyCheckTasksRunner implements PipelineTasksRunner {
- private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI();
+ private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(checkJobAPI);
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
@Getter
private final ConsistencyCheckJobItemContext jobItemContext;
@@ -80,7 +80,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).persistJobItemProgress(jobItemContext);
+ new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
ExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
}
@@ -95,7 +95,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
protected void runBlocking() {
- checkJobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
@@ -133,7 +133,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
}
log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
jobItemContext.setStatus(JobStatus.FINISHED);
- checkJobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
jobManager.stop(checkJobId);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1f2378d05d3..3d12a4363bc 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -445,7 +445,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- public Class<MigrationJob> getPipelineJobClass() {
+ public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index cf73d15646b..37772491969 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
@@ -81,6 +82,8 @@ public final class MigrationJobPreparer {
private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+
/**
* Do prepare work.
*
@@ -123,7 +126,7 @@ public final class MigrationJobPreparer {
String jobId = jobConfig.getJobId();
LockContext lockContext =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
if (!jobAPI.getJobItemProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
- jobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
}
LockDefinition lockDefinition = new
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(),
jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 6ab0ef41af0..deeae3d64e5 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -50,7 +50,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class ConsistencyCheckJobAPITest {
- private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI();
+ private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
+
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new
YamlMigrationJobConfigurationSwapper();
@@ -63,9 +65,9 @@ class ConsistencyCheckJobAPITest {
void assertCreateJobConfig() {
MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
String parentJobId = parentJobConfig.getJobId();
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+ String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig =
(ConsistencyCheckJobConfiguration) new PipelineJobManager(checkJobAPI)
+ ConsistencyCheckJobConfiguration checkJobConfig =
(ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI)
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
@@ -82,11 +84,11 @@ class ConsistencyCheckJobAPITest {
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+ String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
- checkJobAPI.persistJobItemProgress(checkJobItemContext);
+ jobManager.persistJobItemProgress(checkJobItemContext);
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
repositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
dataConsistencyCheckResult);
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
@@ -95,12 +97,12 @@ class ConsistencyCheckJobAPITest {
}
expectedSequence = 2;
for (int i = 0; i < 2; i++) {
- checkJobAPI.dropByParentJobId(parentJobId);
+ jobAPI.dropByParentJobId(parentJobId);
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
assertTrue(latestCheckJobId.isPresent());
assertThat(ConsistencyCheckJobId.parseSequence(latestCheckJobId.get()),
is(expectedSequence--));
}
- checkJobAPI.dropByParentJobId(parentJobId);
+ jobAPI.dropByParentJobId(parentJobId);
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
assertFalse(latestCheckJobId.isPresent());
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 472a14550de..fd0e33cdfa0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -212,7 +212,7 @@ class MigrationJobAPITest {
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
- jobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress =
jobAPI.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
progress.entrySet()) {
@@ -245,7 +245,7 @@ class MigrationJobAPITest {
void assertRenewJobStatus() {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
- jobAPI.persistJobItemProgress(jobItemContext);
+ jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
Optional<InventoryIncrementalJobItemProgress> actual =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());