This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 19c6551e3c5 Add YamlPipelineJobItemProgressConfiguration (#29074)
19c6551e3c5 is described below
commit 19c6551e3c540dcf49b554859fe9627007303f72
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 17:11:35 2023 +0800
Add YamlPipelineJobItemProgressConfiguration (#29074)
---
.../yaml/YamlConsistencyCheckJobItemProgress.java | 4 ++--
.../YamlConsistencyCheckJobItemProgressSwapper.java | 5 +++++
.../YamlInventoryIncrementalJobItemProgress.java | 4 ++--
...mlInventoryIncrementalJobItemProgressSwapper.java | 5 +++++
.../core/job/service/InventoryIncrementalJobAPI.java | 5 +----
.../pipeline/core/job/service/PipelineJobAPI.java | 14 +++-----------
.../core/job/service/PipelineJobManager.java | 19 +++++++++++++++++--
.../impl/AbstractInventoryIncrementalJobAPIImpl.java | 13 ++++---------
...=> YamlPipelineJobItemProgressConfiguration.java} | 9 ++-------
.../job/yaml/YamlPipelineJobItemProgressSwapper.java | 12 +++++++++---
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 3 ++-
.../data/pipeline/cdc/core/job/CDCJob.java | 2 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 2 +-
.../consistencycheck/ConsistencyCheckJob.java | 4 +++-
.../api/impl/ConsistencyCheckJobAPI.java | 20 +++++++++-----------
.../pipeline/scenario/migration/MigrationJob.java | 5 ++++-
.../migration/prepare/MigrationJobPreparer.java | 2 +-
.../migration/api/impl/MigrationJobAPITest.java | 2 +-
18 files changed, 72 insertions(+), 58 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
index 2ed595408b3..72ea55e1865 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgress.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import lombok.Getter;
import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -29,7 +29,7 @@ import java.util.Map;
*/
@Getter
@Setter
-public final class YamlConsistencyCheckJobItemProgress implements
YamlConfiguration {
+public final class YamlConsistencyCheckJobItemProgress implements
YamlPipelineJobItemProgressConfiguration {
private String status;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
index 066df10fc28..eb582d1d896 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -50,4 +50,9 @@ public final class YamlConsistencyCheckJobItemProgressSwapper
implements YamlPip
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
return result;
}
+
+ @Override
+ public Class<YamlConsistencyCheckJobItemProgress> getYamlProgressClass() {
+ return YamlConsistencyCheckJobItemProgress.class;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index e73596ce667..1e44dcf8984 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -19,14 +19,14 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import lombok.Getter;
import lombok.Setter;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
/**
* YAML inventory incremental job item progress.
*/
@Getter
@Setter
-public final class YamlInventoryIncrementalJobItemProgress implements
YamlConfiguration {
+public final class YamlInventoryIncrementalJobItemProgress implements
YamlPipelineJobItemProgressConfiguration {
private String status;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 24c2f6a5a74..1b29d4f52de 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -57,4 +57,9 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}
+
+ @Override
+ public Class<YamlInventoryIncrementalJobItemProgress>
getYamlProgressClass() {
+ return YamlInventoryIncrementalJobItemProgress.class;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
index 908c3103c76..fa4594f7eab 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobAPI.java
@@ -37,13 +37,13 @@ import java.sql.SQLException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/**
* Inventory incremental job API.
*/
public interface InventoryIncrementalJobAPI extends PipelineJobAPI {
+ @SuppressWarnings("unchecked")
@Override
default YamlInventoryIncrementalJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
return new YamlInventoryIncrementalJobItemProgressSwapper();
@@ -123,9 +123,6 @@ public interface InventoryIncrementalJobAPI extends
PipelineJobAPI {
*/
Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(PipelineJobConfiguration pipelineJobConfig);
- @Override
- Optional<InventoryIncrementalJobItemProgress> getJobItemProgress(String
jobId, int shardingItem);
-
/**
* Get job infos.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index b5ab26690a0..168b18d88ad 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -43,10 +44,10 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Get YAML pipeline job item progress swapper.
*
+ * @param <T> type of pipeline job item progress
* @return YAML pipeline job item progress swapper
*/
- @SuppressWarnings("rawtypes")
- YamlPipelineJobItemProgressSwapper getYamlJobItemProgressSwapper();
+ <T extends PipelineJobItemProgress>
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
getYamlJobItemProgressSwapper();
/**
* Whether to ignore to start disabled job when job item progress is
finished.
@@ -75,15 +76,6 @@ public interface PipelineJobAPI extends TypedSPI {
return Optional.empty();
}
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return job item progress, may be null
- */
- Optional<? extends PipelineJobItemProgress> getJobItemProgress(String
jobId, int shardingItem);
-
/**
* Update job item status.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 99d846dd6d7..e7fd3ad85e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,6 +32,8 @@ import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBa
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -95,7 +97,7 @@ public final class PipelineJobManager {
*/
public void startDisabledJob(final String jobId) {
if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
- Optional<? extends PipelineJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobId, 0);
+ Optional<? extends PipelineJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, 0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
@@ -201,6 +203,20 @@ public final class PipelineJobManager {
.filter(each ->
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}
+ /**
+ * Get job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param <T> type of pipeline job item progress
+ * @return job item progress, may be null
+ */
+ public <T extends PipelineJobItemProgress> Optional<T>
getJobItemProgress(final String jobId, final int shardingItem) {
+
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
swapper = jobAPI.getYamlJobItemProgressSwapper();
+ Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
+ return progress.map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
+ }
+
/**
* Persist job item progress.
*
@@ -221,7 +237,6 @@ public final class PipelineJobManager {
.updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
}
- @SuppressWarnings("unchecked")
private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
return
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 1706b2c4c9f..6b0cf4171e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlInventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
@@ -77,10 +76,11 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
+ PipelineJobManager jobManager = new PipelineJobManager(this);
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, each);
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, each);
jobItemProgress.ifPresent(optional ->
optional.setActive(!jobConfigPOJO.isDisabled()));
map.put(each, jobItemProgress.orElse(null));
}, LinkedHashMap::putAll);
@@ -130,15 +130,10 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
return jobOffsetInfoSwapper.swapToObject(new YamlJobOffsetInfo());
}
- @Override
- public Optional<InventoryIncrementalJobItemProgress>
getJobItemProgress(final String jobId, final int shardingItem) {
- Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
- return progress.map(optional ->
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(optional,
YamlInventoryIncrementalJobItemProgress.class)));
- }
-
@Override
public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
return;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
similarity index 66%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
index 90f522b0910..1a1ef05cbdb 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressConfiguration.java
@@ -17,15 +17,10 @@
package org.apache.shardingsphere.data.pipeline.core.job.yaml;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
-import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
- * YAML pipeline job configuration swapper.
- *
- * @param <Y> type of YAML configuration
- * @param <T> type of swapped pipeline job item progress
+ * YAML pipeline job item progress configuration.
*/
-public interface YamlPipelineJobItemProgressSwapper<Y extends
YamlConfiguration, T extends PipelineJobItemProgress> extends
YamlConfigurationSwapper<Y, T> {
+public interface YamlPipelineJobItemProgressConfiguration extends
YamlConfiguration {
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
index 90f522b0910..4f803634df9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/yaml/YamlPipelineJobItemProgressSwapper.java
@@ -18,14 +18,20 @@
package org.apache.shardingsphere.data.pipeline.core.job.yaml;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
/**
* YAML pipeline job configuration swapper.
*
- * @param <Y> type of YAML configuration
+ * @param <Y> type of YAML pipeline job item progress configuration
* @param <T> type of swapped pipeline job item progress
*/
-public interface YamlPipelineJobItemProgressSwapper<Y extends
YamlConfiguration, T extends PipelineJobItemProgress> extends
YamlConfigurationSwapper<Y, T> {
+public interface YamlPipelineJobItemProgressSwapper<Y extends
YamlPipelineJobItemProgressConfiguration, T extends PipelineJobItemProgress>
extends YamlConfigurationSwapper<Y, T> {
+
+ /**
+ * Get YAML pipeline job item progress configuration class.
+ *
+ * @return YAML pipeline job item progress configuration class
+ */
+ Class<Y> getYamlProgressClass();
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 070328d868e..e4aec40e5cb 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -168,9 +168,10 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
+ PipelineJobManager jobManager = new PipelineJobManager(this);
try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
- if (getJobItemProgress(jobId, i).isPresent()) {
+ if (jobManager.getJobItemProgress(jobId, i).isPresent()) {
continue;
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index c2637c0de36..657b7b48ccd 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -106,7 +106,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
}
private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
- Optional<InventoryIncrementalJobItemProgress> initProgress =
jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobManager.getJobItemProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index e79887f33cd..31f50757bbe 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -89,7 +89,7 @@ public final class CDCJobPreparer {
private void initTasks0(final CDCJobItemContext jobItemContext, final
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair>
inventoryChannelProgressPairs,
final AtomicBoolean incrementalImporterUsed, final
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
jobManager.persistJobItemProgress(jobItemContext);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 7d289dd0a89..913fb82ed29 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -22,6 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -46,7 +47,8 @@ public final class ConsistencyCheckJob extends
AbstractSimplePipelineJob {
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
- Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobAPI.getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
+ PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING,
jobItemProgress.orElse(null));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 62a1520c06b..4ba4d05079d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -22,7 +22,6 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -84,7 +83,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
- Optional<ConsistencyCheckJobItemProgress> progress =
getJobItemProgress(latestCheckJobId.get(), 0);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(latestCheckJobId.get(), 0);
if (!progress.isPresent() || JobStatus.FINISHED !=
progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED,
progress={}", progress);
throw new
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
@@ -117,15 +117,10 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
return true;
}
- @Override
- public Optional<ConsistencyCheckJobItemProgress> getJobItemProgress(final
String jobId, final int shardingItem) {
- Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
- return progress.map(s ->
getYamlJobItemProgressSwapper().swapToObject(YamlEngine.unmarshal(s,
YamlConsistencyCheckJobItemProgress.class, true)));
- }
-
@Override
public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, shardingItem);
if (!jobItemProgress.isPresent()) {
log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
return;
@@ -193,7 +188,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- Optional<ConsistencyCheckJobItemProgress> progress =
getJobItemProgress(checkJobId, 0);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(checkJobId, 0);
if (!progress.isPresent()) {
return Collections.emptyList();
}
@@ -233,7 +229,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- Optional<ConsistencyCheckJobItemProgress> progress =
getJobItemProgress(checkJobId, 0);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
result.setActive(!jobConfigPOJO.isDisabled());
@@ -306,6 +303,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
return new YamlConsistencyCheckJobConfigurationSwapper();
}
+ @SuppressWarnings("unchecked")
@Override
public YamlConsistencyCheckJobItemProgressSwapper
getYamlJobItemProgressSwapper() {
return new YamlConsistencyCheckJobItemProgressSwapper();
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index adef5960815..136842f0541 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.InventoryIncrementalTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -46,6 +47,8 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
// Shared by all sharding items
@@ -59,7 +62,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
protected InventoryIncrementalJobItemContext
buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- Optional<InventoryIncrementalJobItemProgress> initProgress =
jobAPI.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobManager.getJobItemProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 37772491969..d473001b873 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -125,7 +125,7 @@ public final class MigrationJobPreparer {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String jobId = jobConfig.getJobId();
LockContext lockContext =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
- if (!jobAPI.getJobItemProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
+ if (!jobManager.getJobItemProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
jobManager.persistJobItemProgress(jobItemContext);
}
LockDefinition lockDefinition = new
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(),
jobConfig.getJobId()));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index fd0e33cdfa0..a1a1c905dbd 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -247,7 +247,7 @@ class MigrationJobAPITest {
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobManager.persistJobItemProgress(jobItemContext);
jobAPI.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
- Optional<InventoryIncrementalJobItemProgress> actual =
jobAPI.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ Optional<InventoryIncrementalJobItemProgress> actual =
jobManager.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
}