This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new df44c995f31 Add PipelineJobItemManager (#29078)
df44c995f31 is described below
commit df44c995f31df93711afc427496a92ee3809fd3b
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 18 20:53:15 2023 +0800
Add PipelineJobItemManager (#29078)
* Add PipelineJobItemManager
* Add PipelineJobItemManager
---
.../core/job/AbstractSimplePipelineJob.java | 14 ++-
.../persist/PipelineJobProgressPersistService.java | 5 +-
.../core/job/service/PipelineJobItemManager.java | 137 +++++++++++++++++++++
.../core/job/service/PipelineJobManager.java | 100 +--------------
.../AbstractInventoryIncrementalJobAPIImpl.java | 12 +-
.../runner/InventoryIncrementalTasksRunner.java | 20 +--
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 14 +--
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 22 ++--
.../consistencycheck/ConsistencyCheckJob.java | 8 +-
.../api/impl/ConsistencyCheckJobAPI.java | 15 +--
.../task/ConsistencyCheckTasksRunner.java | 17 ++-
.../pipeline/scenario/migration/MigrationJob.java | 6 +-
.../migration/prepare/MigrationJobPreparer.java | 10 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 6 +-
.../migration/api/impl/MigrationJobAPITest.java | 14 ++-
16 files changed, 233 insertions(+), 172 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 197c09e2d49..36899a266c8 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -48,6 +49,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
@Override
public void execute(final ShardingContext shardingContext) {
PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
+ PipelineJobItemManager<?> jobItemManager = new
PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
@@ -57,31 +59,31 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
}
try {
PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
- execute0(jobManager, jobItemContext);
+ execute0(jobItemManager, jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(jobManager, jobId, shardingItem, ex);
+ processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
throw ex;
}
}
- private void execute0(final PipelineJobManager jobManager, final
PipelineJobItemContext jobItemContext) {
+ private void execute0(final PipelineJobItemManager<?> jobItemManager,
final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
- jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
+ jobItemManager.cleanErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
- private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
+ private void processFailed(final PipelineJobManager jobManager, final
PipelineJobItemManager<?> jobItemManager, final String jobId, final int
shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
+ jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 87e13a2ea09..143ba94c50b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -130,7 +130,8 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType())).updateJobItemProgress(jobItemContext.get());
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobId).getType())
+
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
log.info("persist, jobId={}, shardingItem={}, cost {} ms",
jobId, shardingItem, System.currentTimeMillis() - startTimeMillis);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
new file mode 100644
index 00000000000..9aa4a01cbb3
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job manager.
+ *
+ * @param <T> type of pipeline job item progress
+ */
+public final class PipelineJobItemManager<T extends PipelineJobItemProgress> {
+
+ private final
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
swapper;
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public PipelineJobItemManager(final YamlPipelineJobItemProgressSwapper
swapper) {
+ this.swapper = swapper;
+ }
+
+ /**
+ * Update job item status.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param status status
+ */
+ public void updateStatus(final String jobId, final int shardingItem, final
JobStatus status) {
+ Optional<T> jobItemProgress = getProgress(jobId, shardingItem);
+ if (!jobItemProgress.isPresent()) {
+ return;
+ }
+ jobItemProgress.get().setStatus(status);
+ PipelineAPIFactory.getGovernanceRepositoryAPI(
+
PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
+ }
+
+ /**
+ * Get job item progress.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return job item progress
+ */
+ public Optional<T> getProgress(final String jobId, final int shardingItem)
{
+ return
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem)
+ .map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
+ }
+
+ /**
+ * Persist job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ public void persistProgress(final PipelineJobItemContext jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+ }
+
+ /**
+ * Update job item progress.
+ *
+ * @param jobItemContext job item context
+ */
+ public void updateProgress(final PipelineJobItemContext jobItemContext) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
+ .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertProgressYamlContent(jobItemContext));
+ }
+
+ @SuppressWarnings("unchecked")
+ private String convertProgressYamlContent(final PipelineJobItemContext
jobItemContext) {
+ return YamlEngine.marshal(swapper.swapToYamlConfiguration((T)
jobItemContext.toProgress()));
+ }
+
+ /**
+ * Get job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return map, key is sharding item, value is error message
+ */
+ public String getErrorMessage(final String jobId, final int shardingItem) {
+ return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
+ }
+
+ /**
+ * Update job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param error error
+ */
+ public void updateErrorMessage(final String jobId, final int shardingItem,
final Object error) {
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
+ String value = "";
+ if (null != error) {
+ value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+ }
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
+ }
+
+ /**
+ * Clean job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ public void cleanErrorMessage(final String jobId, final int shardingItem) {
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 4ffeaafbee9..eccbdd80227 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -19,10 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.core.job.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
@@ -32,8 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBa
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -95,7 +91,7 @@ public final class PipelineJobManager {
*/
public void startDisabledJob(final String jobId) {
if (jobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
- Optional<? extends PipelineJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, 0);
+ Optional<? extends PipelineJobItemProgress> jobItemProgress = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper()).getProgress(jobId,
0);
if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
log.info("job status is FINISHED, ignore, jobId={}", jobId);
return;
@@ -197,98 +193,4 @@ public final class PipelineJobManager {
}
return Collections.emptyList();
}
-
- /**
- * Get job item progress.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param <T> type of pipeline job item progress
- * @return job item progress, may be null
- */
- public <T extends PipelineJobItemProgress> Optional<T>
getJobItemProgress(final String jobId, final int shardingItem) {
-
YamlPipelineJobItemProgressSwapper<YamlPipelineJobItemProgressConfiguration, T>
swapper = jobAPI.getYamlJobItemProgressSwapper();
- Optional<String> progress =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemProgress(jobId,
shardingItem);
- return progress.map(optional ->
swapper.swapToObject(YamlEngine.unmarshal(optional,
swapper.getYamlProgressClass(), true)));
- }
-
- /**
- * Persist job item progress.
- *
- * @param jobItemContext job item context
- */
- public void persistJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
- /**
- * Update job item status.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param status status
- */
- public void updateJobItemStatus(final String jobId, final int
shardingItem, final JobStatus status) {
- Optional<PipelineJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, shardingItem);
- if (!jobItemProgress.isPresent()) {
- log.warn("updateJobItemStatus, jobProgress is null, jobId={},
shardingItem={}", jobId, shardingItem);
- return;
- }
- jobItemProgress.get().setStatus(status);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobItemProgress(jobId,
shardingItem,
-
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress.get())));
- }
-
- /**
- * Update job item progress.
- *
- * @param jobItemContext job item context
- */
- public void updateJobItemProgress(final PipelineJobItemContext
jobItemContext) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobItemContext.getJobId()))
- .updateJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), convertJobItemProgress(jobItemContext));
- }
-
- private String convertJobItemProgress(final PipelineJobItemContext
jobItemContext) {
- return
YamlEngine.marshal(jobAPI.getYamlJobItemProgressSwapper().swapToYamlConfiguration(jobItemContext.toProgress()));
- }
-
- /**
- * Get job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return map, key is sharding item, value is error message
- */
- public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
- return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
- }
-
- /**
- * Update job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param error error
- */
- public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final Object error) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
- String value = "";
- if (null != error) {
- value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
- }
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
- }
-
- /**
- * Clean job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- */
- public void cleanJobItemErrorMessage(final String jobId, final int
shardingItem) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index f0c85629db7..c3d0b1d689a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -76,11 +77,11 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl implements Inventor
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
String jobId = jobConfig.getJobId();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
return IntStream.range(0,
jobConfig.getJobShardingCount()).boxed().collect(LinkedHashMap::new, (map,
each) -> {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobId, each);
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobId, each);
jobItemProgress.ifPresent(optional ->
optional.setActive(!jobConfigPOJO.isDisabled()));
map.put(each, jobItemProgress.orElse(null));
}, LinkedHashMap::putAll);
@@ -88,9 +89,10 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
implements Inventor
@Override
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- PipelineJobConfiguration jobConfig =
pipelineJobManager.getJobConfiguration(jobConfigPOJO);
+ PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobConfigPOJO);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
@@ -98,7 +100,7 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
implements Inventor
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage =
pipelineJobManager.getJobItemErrorMessage(jobId, shardingItem);
+ String errorMessage = jobItemManager.getErrorMessage(jobId,
shardingItem);
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 4af5b5f3b27..3935bec8106 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -25,15 +25,17 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
import java.util.LinkedList;
@@ -57,12 +59,15 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
private final PipelineJobManager jobManager;
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager;
+
public InventoryIncrementalTasksRunner(final
InventoryIncrementalJobItemContext jobItemContext) {
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
jobManager = new PipelineJobManager(jobAPI);
+ jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
}
@Override
@@ -83,7 +88,8 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
executeIncrementalTask();
@@ -106,7 +112,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- jobManager.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
private synchronized void executeIncrementalTask() {
@@ -146,7 +152,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
- jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
@@ -181,7 +187,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
- jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e4aec40e5cb..9f90858e590 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,6 +68,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
@@ -168,10 +169,10 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
private void initIncrementalPosition(final CDCJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- PipelineJobManager jobManager = new PipelineJobManager(this);
+ PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
try (PipelineDataSourceManager pipelineDataSourceManager = new
DefaultPipelineDataSourceManager()) {
for (int i = 0; i < jobConfig.getJobShardingCount(); i++) {
- if (jobManager.getJobItemProgress(jobId, i).isPresent()) {
+ if (jobItemManager.getProgress(jobId, i).isPresent()) {
continue;
}
IncrementalDumperContext dumperContext =
buildDumperContext(jobConfig, i, new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 14cdb71513a..0f6b66fa7c1 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,7 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -65,7 +65,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private final CDCJobAPI jobAPI = new CDCJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
@@ -93,7 +93,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
continue;
}
jobItemContexts.add(jobItemContext);
- jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
+ jobItemManager.cleanErrorMessage(jobId, shardingItem);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
@@ -106,7 +106,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
}
private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
- Optional<InventoryIncrementalJobItemProgress> initProgress =
jobManager.getJobItemProgress(jobConfig.getJobId(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
@@ -127,7 +127,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
+ jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
@@ -151,7 +151,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void updateLocalAndRemoteJobItemStatus(final
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
- jobManager.updateJobItemStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
private void executeIncrementalTasks(final List<CDCJobItemContext>
jobItemContexts) {
@@ -204,7 +204,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
- jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 31f50757bbe..b39a0647458 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -18,11 +18,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.prepare;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
@@ -35,18 +30,23 @@ import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
+import
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithGetBinlogPositionException;
import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
import org.apache.shardingsphere.data.pipeline.core.importer.ImporterType;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.core.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumper;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
-import
org.apache.shardingsphere.data.pipeline.common.spi.ingest.dumper.IncrementalDumperCreator;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.opengauss.type.OpenGaussDatabaseType;
@@ -69,7 +69,7 @@ public final class CDCJobPreparer {
private final CDCJobAPI jobAPI = new CDCJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
/**
* Do prepare work.
@@ -89,9 +89,9 @@ public final class CDCJobPreparer {
private void initTasks0(final CDCJobItemContext jobItemContext, final
AtomicBoolean inventoryImporterUsed, final List<CDCChannelProgressPair>
inventoryChannelProgressPairs,
final AtomicBoolean incrementalImporterUsed, final
List<CDCChannelProgressPair> incrementalChannelProgressPairs) {
- Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ Optional<InventoryIncrementalJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
if (!jobItemProgress.isPresent()) {
- jobManager.persistJobItemProgress(jobItemContext);
+ jobItemManager.persistProgress(jobItemContext);
}
if (jobItemContext.isStopping()) {
PipelineJobCenter.stop(jobItemContext.getJobId());
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 913fb82ed29..8932efa6613 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -22,9 +22,8 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
@@ -46,9 +45,8 @@ public final class ConsistencyCheckJob extends
AbstractSimplePipelineJob {
@Override
public ConsistencyCheckJobItemContext buildPipelineJobItemContext(final
ShardingContext shardingContext) {
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- ConsistencyCheckJobAPI jobAPI = (ConsistencyCheckJobAPI) getJobAPI();
- PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
- Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobManager.getJobItemProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
+ Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(),
shardingContext.getShardingItem());
return new ConsistencyCheckJobItemContext(jobConfig,
shardingContext.getShardingItem(), JobStatus.RUNNING,
jobItemProgress.orElse(null));
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index a60fcb44d5e..ff0e63ce7af 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -82,8 +83,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId));
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
if (latestCheckJobId.isPresent()) {
- PipelineJobManager jobManager = new PipelineJobManager(this);
- Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(latestCheckJobId.get(), 0);
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(latestCheckJobId.get(), 0);
if (!progress.isPresent() || JobStatus.FINISHED !=
progress.get().getStatus()) {
log.info("check job already exists and status is not FINISHED,
progress={}", progress);
throw new
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
@@ -174,8 +175,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- PipelineJobManager jobManager = new PipelineJobManager(this);
- Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(checkJobId, 0);
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(checkJobId, 0);
if (!progress.isPresent()) {
return Collections.emptyList();
}
@@ -215,8 +216,8 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
Optional<String> latestCheckJobId =
governanceRepositoryAPI.getLatestCheckJobId(parentJobId);
ShardingSpherePreconditions.checkState(latestCheckJobId.isPresent(),
() -> new ConsistencyCheckJobNotFoundException(parentJobId));
String checkJobId = latestCheckJobId.get();
- PipelineJobManager jobManager = new PipelineJobManager(this);
- Optional<ConsistencyCheckJobItemProgress> progress =
jobManager.getJobItemProgress(checkJobId, 0);
+ PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(getYamlJobItemProgressSwapper());
+ Optional<ConsistencyCheckJobItemProgress> progress =
jobItemManager.getProgress(checkJobId, 0);
ConsistencyCheckJobItemInfo result = new ConsistencyCheckJobItemInfo();
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId);
result.setActive(!jobConfigPOJO.isDisabled());
@@ -232,7 +233,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
- result.setErrorMessage(new
PipelineJobManager(this).getJobItemErrorMessage(checkJobId, 0));
+ result.setErrorMessage(new
PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId,
0));
Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 37947db77b8..8259022cf89 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -19,12 +19,13 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
-import
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.execute.AbstractPipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
+import
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
@@ -32,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
@@ -54,6 +56,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+
@Getter
private final ConsistencyCheckJobItemContext jobItemContext;
@@ -80,7 +84,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())).persistJobItemProgress(jobItemContext);
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
ExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
}
@@ -95,7 +100,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
protected void runBlocking() {
- jobManager.persistJobItemProgress(jobItemContext);
+ jobItemManager.persistProgress(jobItemContext);
JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
InventoryIncrementalJobAPI jobAPI = (InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(parentJobId));
@@ -133,7 +138,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
}
log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
jobItemContext.setStatus(JobStatus.FINISHED);
- jobManager.persistJobItemProgress(jobItemContext);
+ jobItemManager.persistProgress(jobItemContext);
jobManager.stop(checkJobId);
}
@@ -146,7 +151,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
- jobManager.updateJobItemErrorMessage(checkJobId, 0, throwable);
+ jobItemManager.updateErrorMessage(checkJobId, 0, throwable);
jobManager.stop(checkJobId);
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 136842f0541..439cc36ad81 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipeline
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.InventoryIncrementalTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -47,7 +47,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
private final MigrationJobAPI jobAPI = new MigrationJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@@ -62,7 +62,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
protected InventoryIncrementalJobItemContext
buildPipelineJobItemContext(final ShardingContext shardingContext) {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- Optional<InventoryIncrementalJobItemProgress> initProgress =
jobManager.getJobItemProgress(shardingContext.getJobName(), shardingItem);
+ Optional<InventoryIncrementalJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 50ce83843f1..2857e63f58e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -45,7 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.InventoryDumperContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PrepareTargetSchemasParameter;
@@ -82,7 +82,7 @@ public final class MigrationJobPreparer {
private final MigrationJobAPI jobAPI = new MigrationJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
/**
* Do prepare work.
@@ -125,8 +125,8 @@ public final class MigrationJobPreparer {
MigrationJobConfiguration jobConfig = jobItemContext.getJobConfig();
String jobId = jobConfig.getJobId();
LockContext lockContext =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobId)).getContextManager().getInstanceContext().getLockContext();
- if (!jobManager.getJobItemProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
- jobManager.persistJobItemProgress(jobItemContext);
+ if (!jobItemManager.getProgress(jobId,
jobItemContext.getShardingItem()).isPresent()) {
+ jobItemManager.persistProgress(jobItemContext);
}
LockDefinition lockDefinition = new
GlobalLockDefinition(String.format(GlobalLockNames.PREPARE.getLockName(),
jobConfig.getJobId()));
long startTimeMillis = System.currentTimeMillis();
@@ -136,7 +136,7 @@ public final class MigrationJobPreparer {
JobOffsetInfo offsetInfo = jobAPI.getJobOffsetInfo(jobId);
if (!offsetInfo.isTargetSchemaTableCreated()) {
jobItemContext.setStatus(JobStatus.PREPARING);
- jobManager.updateJobItemStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
+ jobItemManager.updateStatus(jobId,
jobItemContext.getShardingItem(), JobStatus.PREPARING);
prepareAndCheckTarget(jobItemContext);
jobAPI.persistJobOffsetInfo(jobId, new
JobOffsetInfo(true));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index deeae3d64e5..04c35a541bb 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -18,10 +18,12 @@
package
org.apache.shardingsphere.test.it.data.pipeline.scenario.consistencycheck.api.impl;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
@@ -52,7 +54,7 @@ class ConsistencyCheckJobAPITest {
private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
- private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+ private final PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new
YamlMigrationJobConfigurationSwapper();
@@ -88,7 +90,7 @@ class ConsistencyCheckJobAPITest {
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null, TypedSPILoader.getService(DatabaseType.class, "H2")),
0, JobStatus.FINISHED, null);
- jobManager.persistJobItemProgress(checkJobItemContext);
+ jobItemManager.persistProgress(checkJobItemContext);
Map<String, TableDataConsistencyCheckResult>
dataConsistencyCheckResult = Collections.singletonMap("t_order", new
TableDataConsistencyCheckResult(true));
repositoryAPI.persistCheckJobResult(parentJobId, checkJobId,
dataConsistencyCheckResult);
Optional<String> latestCheckJobId =
repositoryAPI.getLatestCheckJobId(parentJobId);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 6201a6cb507..187448cd273 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
@@ -91,6 +92,8 @@ class MigrationJobAPITest {
private static PipelineJobManager jobManager;
+ private static PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager;
+
private static DatabaseType databaseType;
@BeforeAll
@@ -98,6 +101,7 @@ class MigrationJobAPITest {
PipelineContextUtils.mockModeConfigAndContextManager();
jobAPI = new MigrationJobAPI();
jobManager = new PipelineJobManager(jobAPI);
+ jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
String jdbcUrl =
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
databaseType = DatabaseTypeFactory.get(jdbcUrl);
Map<String, Object> props = new HashMap<>();
@@ -212,8 +216,8 @@ class MigrationJobAPITest {
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
- jobManager.persistJobItemProgress(jobItemContext);
- jobManager.updateJobItemStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
+ jobItemManager.persistProgress(jobItemContext);
+ jobItemManager.updateStatus(jobId.get(),
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
Map<Integer, InventoryIncrementalJobItemProgress> progress =
jobAPI.getJobProgress(jobConfig);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
progress.entrySet()) {
assertThat(entry.getValue().getStatus(),
is(JobStatus.EXECUTE_INVENTORY_TASK));
@@ -245,9 +249,9 @@ class MigrationJobAPITest {
void assertRenewJobStatus() {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
- jobManager.persistJobItemProgress(jobItemContext);
- jobManager.updateJobItemStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
- Optional<InventoryIncrementalJobItemProgress> actual =
jobManager.getJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
+ jobItemManager.persistProgress(jobItemContext);
+ jobItemManager.updateStatus(jobConfig.getJobId(), 0,
JobStatus.FINISHED);
+ Optional<InventoryIncrementalJobItemProgress> actual =
jobItemManager.getProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem());
assertTrue(actual.isPresent());
assertThat(actual.get().getStatus(), is(JobStatus.FINISHED));
}