This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b15790a9e93 Merge AbstractPipelineJobAPIImpl and PipelineJobManager
(#29047)
b15790a9e93 is described below
commit b15790a9e939a04dcf9654618371b6300ace7986
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 16 10:46:40 2023 +0800
Merge AbstractPipelineJobAPIImpl and PipelineJobManager (#29047)
* Move PipelineJobAPI.start() to PipelineJobManager
* Move PipelineJobAPI.dropJob() to PipelineJobManager
* Move PipelineJobAPI.getJobItemErrorMessage() to PipelineJobManager
* Move PipelineJobAPI.updateJobItemErrorMessage() to PipelineJobManager
* Move PipelineJobAPI.stop() to PipelineJobManager
* Merge AbstractPipelineJobAPIImpl and PipelineJobManager
---
.../core/job/AbstractSimplePipelineJob.java | 16 +-
.../pipeline/core/job/service/PipelineJobAPI.java | 69 +++-----
.../core/job/service/PipelineJobManager.java | 174 +++++++++++++++++++++
.../AbstractInventoryIncrementalJobAPIImpl.java | 6 +-
.../service/impl/AbstractPipelineJobAPIImpl.java | 124 ---------------
.../runner/InventoryIncrementalTasksRunner.java | 12 +-
.../handler/update/StartMigrationUpdater.java | 5 +-
.../handler/update/StopMigrationUpdater.java | 5 +-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 3 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 9 +-
.../api/impl/ConsistencyCheckJobAPI.java | 28 ++--
.../task/ConsistencyCheckTasksRunner.java | 11 +-
.../migration/api/impl/MigrationJobAPI.java | 42 ++---
.../migration/api/impl/MigrationJobAPITest.java | 26 +--
14 files changed, 277 insertions(+), 253 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index fae85d03ad3..197c09e2d49 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -46,6 +47,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
@Override
public void execute(final ShardingContext shardingContext) {
+ PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
@@ -55,33 +57,33 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
}
try {
PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
- execute0(jobItemContext);
+ execute0(jobManager, jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(jobId, shardingItem, ex);
+ processFailed(jobManager, jobId, shardingItem, ex);
throw ex;
}
}
- private void execute0(final PipelineJobItemContext jobItemContext) {
+ private void execute0(final PipelineJobManager jobManager, final
PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
- getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
+ jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
- private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
+ private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
+ jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
try {
- getJobAPI().stop(jobId);
+ jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 44799e07817..134bc7e7e16 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -67,42 +67,47 @@ public interface PipelineJobAPI extends TypedSPI {
PipelineProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
/**
- * Start job.
+ * Get job configuration.
*
- * @param jobConfig job configuration
- * @return job id
+ * @param jobId job id
+ * @return job configuration
*/
- Optional<String> start(PipelineJobConfiguration jobConfig);
+ PipelineJobConfiguration getJobConfiguration(String jobId);
/**
- * Start disabled job.
+ * Get job configuration.
*
- * @param jobId job id
+ * @param jobConfigPOJO job configuration POJO
+ * @return pipeline job configuration
*/
- void startDisabledJob(String jobId);
+ PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO
jobConfigPOJO);
/**
- * Stop pipeline job.
- *
- * @param jobId job id
+ * Whether to ignore to start disabled job when job item progress is
finished.
+ *
+ * @return ignore to start disabled job when job item progress is finished
or not
*/
- void stop(String jobId);
+ default boolean isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished() {
+ return false;
+ }
/**
- * Get job configuration.
+ * Get to be start disabled next job type.
*
- * @param jobId job id
- * @return job configuration
+ * @return to be start disabled next job type
*/
- PipelineJobConfiguration getJobConfiguration(String jobId);
+ default Optional<String> getToBeStartDisabledNextJobType() {
+ return Optional.empty();
+ }
/**
- * Get job configuration.
+ * Get to be stopped previous job type.
*
- * @param jobConfigPOJO job configuration POJO
- * @return pipeline job configuration
+ * @return to be stopped previous job type
*/
- PipelineJobConfiguration getJobConfiguration(JobConfigurationPOJO
jobConfigPOJO);
+ default Optional<String> getToBeStoppedPreviousJobType() {
+ return Optional.empty();
+ }
/**
* Get pipeline job info.
@@ -144,32 +149,6 @@ public interface PipelineJobAPI extends TypedSPI {
*/
void updateJobItemStatus(String jobId, int shardingItem, JobStatus status);
- /**
- * Get job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return map, key is sharding item, value is error message
- */
- String getJobItemErrorMessage(String jobId, int shardingItem);
-
- /**
- * Update job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param error error
- */
- void updateJobItemErrorMessage(String jobId, int shardingItem, Object
error);
-
- /**
- * Clean job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- */
- void cleanJobItemErrorMessage(String jobId, int shardingItem);
-
/**
* Get pipeline job class.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 55a4a6b3337..47d8eecad2e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -18,12 +18,30 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -31,10 +49,128 @@ import java.util.stream.Stream;
* Pipeline job manager.
*/
@RequiredArgsConstructor
+@Slf4j
public final class PipelineJobManager {
+ private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
private final PipelineJobAPI pipelineJobAPI;
+ /**
+ * Start job.
+ *
+ * @param jobConfig job configuration
+ * @return job id
+ */
+ public Optional<String> start(final PipelineJobConfiguration jobConfig) {
+ String jobId = jobConfig.getJobId();
+ ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
+ GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
+ if (repositoryAPI.isExisted(jobConfigKey)) {
+ log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
+ return Optional.of(jobId);
+ }
+ repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
pipelineJobAPI.getPipelineJobClass().getName());
+ repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
+ return Optional.of(jobId);
+ }
+
+ /**
+ * Start disabled job.
+ *
+ * @param jobId job id
+ */
+ public void startDisabledJob(final String jobId) {
+ if
(pipelineJobAPI.isIgnoreToStartDisabledJobWhenJobItemProgressIsFinished()) {
+ Optional<? extends PipelineJobItemProgress> jobItemProgress =
pipelineJobAPI.getJobItemProgress(jobId, 0);
+ if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
+ log.info("job status is FINISHED, ignore, jobId={}", jobId);
+ return;
+ }
+ }
+ startCurrentDisabledJob(jobId);
+ pipelineJobAPI.getToBeStartDisabledNextJobType().ifPresent(optional ->
startNextDisabledJob(jobId, optional));
+
+ }
+
+ private void startCurrentDisabledJob(final String jobId) {
+ PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
+
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+ ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineJobHasAlreadyStartedException(jobId));
+ jobConfigPOJO.setDisabled(false);
+ jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
+ jobConfigPOJO.getProps().remove("stop_time");
+ jobConfigPOJO.getProps().remove("stop_time_millis");
+ jobConfigPOJO.getProps().setProperty("run_count",
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
"0")) + 1));
+ String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
+ pipelineDistributedBarrier.register(barrierEnablePath,
jobConfigPOJO.getShardingTotalCount());
+
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
+ pipelineDistributedBarrier.await(barrierEnablePath, 5L,
TimeUnit.SECONDS);
+ }
+
+ private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
+ try {
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStartDisabledNextJobType)).startDisabledJob(optional);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.warn("start related check job failed, check job id: {},
error: {}", optional, ex.getMessage());
+ }
+ });
+ }
+
+ /**
+ * Stop pipeline job.
+ *
+ * @param jobId job id
+ */
+ public void stop(final String jobId) {
+ pipelineJobAPI.getToBeStoppedPreviousJobType().ifPresent(optional ->
stopPreviousJob(jobId, optional));
+ stopCurrentJob(jobId);
+ }
+
+ private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
+ try {
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobAPI.class,
toBeStoppedPreviousJobType)).stop(optional);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ log.warn("stop related check job failed, check job id: {},
error: {}", optional, ex.getMessage());
+ }
+ });
+ }
+
+ private void stopCurrentJob(final String jobId) {
+ PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
+
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
+ JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
+ if (jobConfigPOJO.isDisabled()) {
+ return;
+ }
+ jobConfigPOJO.setDisabled(true);
+ jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
+ jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
+ String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
+ pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
+
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
+ pipelineDistributedBarrier.await(barrierPath, 5L, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Drop job.
+ *
+ * @param jobId to be drooped job id
+ */
+ public void drop(final String jobId) {
+ PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
+
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId),
null);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
+ }
+
/**
* Get pipeline jobs info.
*
@@ -49,4 +185,42 @@ public final class PipelineJobManager {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
.filter(each ->
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
}
+
+ /**
+ * Get job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @return map, key is sharding item, value is error message
+ */
+ public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
+ return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
+ }
+
+ /**
+ * Update job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ * @param error error
+ */
+ public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final Object error) {
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
+ String value = "";
+ if (null != error) {
+ value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+ }
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
+ }
+
+ /**
+ * Clean job item error message.
+ *
+ * @param jobId job id
+ * @param shardingItem sharding item
+ */
+ public void cleanJobItemErrorMessage(final String jobId, final int
shardingItem) {
+ String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
+
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 391e3b8454e..bfc7e8e9fd4 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -45,6 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.Table
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -68,7 +69,7 @@ import java.util.stream.IntStream;
* Abstract inventory incremental job API implementation.
*/
@Slf4j
-public abstract class AbstractInventoryIncrementalJobAPIImpl extends
AbstractPipelineJobAPIImpl implements InventoryIncrementalJobAPI {
+public abstract class AbstractInventoryIncrementalJobAPIImpl implements
InventoryIncrementalJobAPI {
private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
@@ -106,11 +107,12 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
long startTimeMillis =
Long.parseLong(Optional.ofNullable(jobConfigPOJO.getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
+ PipelineJobManager pipelineJobManager = new PipelineJobManager(this);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
+ String errorMessage =
pipelineJobManager.getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
deleted file mode 100644
index cdff9743e88..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service.impl;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
-import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Abstract pipeline job API impl.
- */
-@Slf4j
-public abstract class AbstractPipelineJobAPIImpl implements PipelineJobAPI {
-
- protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
-
- @Override
- public Optional<String> start(final PipelineJobConfiguration jobConfig) {
- String jobId = jobConfig.getJobId();
- ShardingSpherePreconditions.checkState(0 !=
jobConfig.getJobShardingCount(), () -> new
PipelineJobCreationWithInvalidShardingCountException(jobId));
- GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
- String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
- if (repositoryAPI.isExisted(jobConfigKey)) {
- log.warn("jobId already exists in registry center, ignore,
jobConfigKey={}", jobConfigKey);
- return Optional.of(jobId);
- }
- repositoryAPI.persist(PipelineMetaDataNode.getJobRootPath(jobId),
getPipelineJobClass().getName());
- repositoryAPI.persist(jobConfigKey,
YamlEngine.marshal(jobConfig.convertToJobConfigurationPOJO()));
- return Optional.of(jobId);
- }
-
- @Override
- public void startDisabledJob(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
-
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineJobHasAlreadyStartedException(jobId));
- jobConfigPOJO.setDisabled(false);
- jobConfigPOJO.getProps().setProperty("start_time_millis",
String.valueOf(System.currentTimeMillis()));
- jobConfigPOJO.getProps().remove("stop_time");
- jobConfigPOJO.getProps().remove("stop_time_millis");
- jobConfigPOJO.getProps().setProperty("run_count",
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
"0")) + 1));
- String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
- pipelineDistributedBarrier.register(barrierEnablePath,
jobConfigPOJO.getShardingTotalCount());
-
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
- pipelineDistributedBarrier.await(barrierEnablePath, 5,
TimeUnit.SECONDS);
- }
-
- @Override
- public void stop(final String jobId) {
- PipelineDistributedBarrier pipelineDistributedBarrier =
PipelineDistributedBarrier.getInstance(PipelineJobIdUtils.parseContextKey(jobId));
-
pipelineDistributedBarrier.unregister(PipelineMetaDataNode.getJobBarrierEnablePath(jobId));
- JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- if (jobConfigPOJO.isDisabled()) {
- return;
- }
- jobConfigPOJO.setDisabled(true);
- jobConfigPOJO.getProps().setProperty("stop_time",
LocalDateTime.now().format(DATE_TIME_FORMATTER));
- jobConfigPOJO.getProps().setProperty("stop_time_millis",
String.valueOf(System.currentTimeMillis()));
- String barrierPath =
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
- pipelineDistributedBarrier.register(barrierPath,
jobConfigPOJO.getShardingTotalCount());
-
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
- pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
- }
-
- protected void dropJob(final String jobId) {
- PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
-
PipelineAPIFactory.getJobOperateAPI(contextKey).remove(String.valueOf(jobId),
null);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
- }
-
- @Override
- public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
- return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
- }
-
- @Override
- public void updateJobItemErrorMessage(final String jobId, final int
shardingItem, final Object error) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
- String value = "";
- if (null != error) {
- value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
- }
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
- }
-
- @Override
- public void cleanJobItemErrorMessage(final String jobId, final int
shardingItem) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index ff3e9308f58..1923ad14911 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPo
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
@@ -54,11 +55,14 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
private final PipelineJobAPI jobAPI;
+ private final PipelineJobManager jobManager;
+
public InventoryIncrementalTasksRunner(final
InventoryIncrementalJobItemContext jobItemContext) {
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
jobAPI = TypedSPILoader.getService(PipelineJobAPI.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+ jobManager = new PipelineJobManager(jobAPI);
}
@Override
@@ -142,9 +146,9 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
try {
- jobAPI.stop(jobId);
+ jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
}
}
@@ -177,9 +181,9 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
try {
- jobAPI.stop(jobId);
+ jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
}
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
index fe1629adaaf..7c504a3f4fc 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StartMigrationUpdater.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationStatement;
@@ -26,11 +27,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.StartMigrationState
*/
public final class StartMigrationUpdater implements
RALUpdater<StartMigrationStatement> {
- private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+ private final PipelineJobManager jobManager = new PipelineJobManager(new
MigrationJobAPI());
@Override
public void executeUpdate(final String databaseName, final
StartMigrationStatement sqlStatement) {
- jobAPI.startDisabledJob(sqlStatement.getJobId());
+ jobManager.startDisabledJob(sqlStatement.getJobId());
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
index fb3a8b22591..77c4c050611 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/StopMigrationUpdater.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.migration.distsql.statement.StopMigrationStatement;
@@ -28,9 +29,11 @@ public final class StopMigrationUpdater implements
RALUpdater<StopMigrationState
private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+
@Override
public void executeUpdate(final String databaseName, final
StopMigrationStatement sqlStatement) {
- jobAPI.stop(sqlStatement.getJobId());
+ jobManager.stop(sqlStatement.getJobId());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index e4134a90c0f..1993cde6e7b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,6 +68,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -308,7 +309,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineInternalException("Can't drop streaming job which is active"));
- dropJob(jobId);
+ new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 50e572c5a32..c2637c0de36 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -64,6 +65,8 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private final CDCJobAPI jobAPI = new CDCJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(jobAPI);
+
private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
@@ -90,7 +93,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
continue;
}
jobItemContexts.add(jobItemContext);
- jobAPI.cleanJobItemErrorMessage(jobId, shardingItem);
+ jobManager.cleanJobItemErrorMessage(jobId, shardingItem);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
@@ -124,7 +127,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
+ jobManager.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
@@ -201,7 +204,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
- jobAPI.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ jobManager.updateJobItemErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6f825c9f17d..ddbb7e7a89e 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -42,7 +42,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
@@ -77,7 +77,7 @@ import java.util.stream.Collectors;
* Consistency check job API.
*/
@Slf4j
-public final class ConsistencyCheckJobAPI extends AbstractPipelineJobAPIImpl {
+public final class ConsistencyCheckJobAPI implements PipelineJobAPI {
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
@@ -106,14 +106,14 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
String result = latestCheckJobId.map(s -> new
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)).marshal();
repositoryAPI.persistLatestCheckJobId(parentJobId, result);
repositoryAPI.deleteCheckJobResult(parentJobId, result);
- dropJob(result);
+ new PipelineJobManager(this).drop(result);
YamlConsistencyCheckJobConfiguration yamlConfig = new
YamlConsistencyCheckJobConfiguration();
yamlConfig.setJobId(result);
yamlConfig.setParentJobId(parentJobId);
yamlConfig.setAlgorithmTypeName(param.getAlgorithmTypeName());
yamlConfig.setAlgorithmProps(param.getAlgorithmProps());
yamlConfig.setSourceDatabaseType(param.getSourceDatabaseType().getType());
- start(new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
+ new PipelineJobManager(this).start(new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(yamlConfig));
return result;
}
@@ -180,23 +180,13 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
YamlEngine.marshal(swapper.swapToYamlConfiguration(jobItemProgress.get())));
}
- @Override
- public void startDisabledJob(final String jobId) {
- Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
getJobItemProgress(jobId, 0);
- if (jobItemProgress.isPresent() && JobStatus.FINISHED ==
jobItemProgress.get().getStatus()) {
- log.info("job status is FINISHED, ignore, jobId={}", jobId);
- return;
- }
- super.startDisabledJob(jobId);
- }
-
/**
* Start by parent job id.
*
* @param parentJobId parent job id
*/
public void startByParentJobId(final String parentJobId) {
- startDisabledJob(getLatestCheckJobId(parentJobId));
+ new
PipelineJobManager(this).startDisabledJob(getLatestCheckJobId(parentJobId));
}
private String getLatestCheckJobId(final String parentJobId) {
@@ -211,7 +201,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
* @param parentJobId parent job id
*/
public void stopByParentJobId(final String parentJobId) {
- stop(getLatestCheckJobId(parentJobId));
+ new PipelineJobManager(this).stop(getLatestCheckJobId(parentJobId));
}
/**
@@ -221,7 +211,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
*/
public void dropByParentJobId(final String parentJobId) {
String latestCheckJobId = getLatestCheckJobId(parentJobId);
- stop(latestCheckJobId);
+ new PipelineJobManager(this).stop(latestCheckJobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey);
Collection<String> checkJobIds =
repositoryAPI.listCheckJobIds(parentJobId);
@@ -234,7 +224,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
repositoryAPI.deleteLatestCheckJobId(parentJobId);
}
repositoryAPI.deleteCheckJobResult(parentJobId, latestCheckJobId);
- dropJob(latestCheckJobId);
+ new PipelineJobManager(this).drop(latestCheckJobId);
}
/**
@@ -304,7 +294,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
- result.setErrorMessage(getJobItemErrorMessage(checkJobId, 0));
+ result.setErrorMessage(new
PipelineJobManager(this).getJobItemErrorMessage(checkJobId, 0));
Map<String, TableDataConsistencyCheckResult> checkJobResult =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
fillInJobItemInfoWithCheckResult(result, checkJobResult, parentJobId);
result.setCheckFailedTableNames(checkJobResult.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index b6ed5aa0c5f..3ae18d58129 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -51,6 +52,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI();
+ private final PipelineJobManager jobManager = new
PipelineJobManager(checkJobAPI);
+
@Getter
private final ConsistencyCheckJobItemContext jobItemContext;
@@ -131,7 +134,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
log.info("onSuccess, check job id: {}, parent job id: {}",
checkJobId, parentJobId);
jobItemContext.setStatus(JobStatus.FINISHED);
checkJobAPI.persistJobItemProgress(jobItemContext);
- checkJobAPI.stop(checkJobId);
+ jobManager.stop(checkJobId);
}
@Override
@@ -139,12 +142,12 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
PipelineDataConsistencyChecker checker = consistencyChecker.get();
if (null != checker && checker.isCanceling()) {
log.info("onFailure, canceling, check job id: {}, parent job
id: {}", checkJobId, parentJobId);
- checkJobAPI.stop(checkJobId);
+ jobManager.stop(checkJobId);
return;
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
- checkJobAPI.updateJobItemErrorMessage(checkJobId, 0, throwable);
- checkJobAPI.stop(checkJobId);
+ jobManager.updateJobItemErrorMessage(checkJobId, 0, throwable);
+ jobManager.stop(checkJobId);
}
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 0cfed7317b4..1fe00789a82 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -54,7 +54,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.Increm
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
-import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInventoryIncrementalJobAPIImpl;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
@@ -80,7 +80,6 @@ import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePo
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.json.JsonUtils;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
@@ -102,6 +101,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -122,7 +122,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
*/
public String createJobAndStart(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
- start(jobConfig);
+ new PipelineJobManager(this).start(jobConfig);
return jobConfig.getJobId();
}
@@ -293,40 +293,21 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- public void startDisabledJob(final String jobId) {
- super.startDisabledJob(jobId);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
- try {
- TypedSPILoader.getService(PipelineJobAPI.class,
"CONSISTENCY_CHECK").startDisabledJob(optional);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.warn("start related check job failed, check job id: {},
error: {}", optional, ex.getMessage());
- }
- });
+ public Optional<String> getToBeStartDisabledNextJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
}
@Override
- public void stop(final String jobId) {
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getLatestCheckJobId(jobId).ifPresent(optional
-> {
- try {
- TypedSPILoader.getService(PipelineJobAPI.class,
"CONSISTENCY_CHECK").stop(optional);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- log.warn("stop related check job failed, check job id: {},
error: {}", optional, ex.getMessage());
- }
- });
- super.stop(jobId);
+ public Optional<String> getToBeStoppedPreviousJobType() {
+ return Optional.of("CONSISTENCY_CHECK");
}
@Override
public void rollback(final String jobId) throws SQLException {
final long startTimeMillis = System.currentTimeMillis();
- stop(jobId);
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
- dropJob(jobId);
+ new PipelineJobManager(this).drop(jobId);
log.info("Rollback job {} cost {} ms", jobId,
System.currentTimeMillis() - startTimeMillis);
}
@@ -337,7 +318,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
for (String each : checkJobIds) {
try {
- dropJob(each);
+ new PipelineJobManager(this).drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -368,11 +349,12 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
- stop(jobId);
+ PipelineJobManager jobManager = new PipelineJobManager(this);
+ jobManager.stop(jobId);
dropCheckJobs(jobId);
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
- dropJob(jobId);
+ jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index c51bddd7599..e206c202875 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -88,12 +89,15 @@ class MigrationJobAPITest {
private static MigrationJobAPI jobAPI;
+ private static PipelineJobManager jobManager;
+
private static DatabaseType databaseType;
@BeforeAll
static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
jobAPI = new MigrationJobAPI();
+ jobManager = new PipelineJobManager(jobAPI);
String jdbcUrl =
"jdbc:h2:mem:test_ds_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL";
databaseType = DatabaseTypeFactory.get(jdbcUrl);
Map<String, Object> props = new HashMap<>();
@@ -110,7 +114,7 @@ class MigrationJobAPITest {
@Test
void assertStartAndList() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
JobConfigurationPOJO jobConfigPOJO =
getJobConfigurationPOJO(jobId.get());
assertFalse(jobConfigPOJO.isDisabled());
@@ -123,20 +127,20 @@ class MigrationJobAPITest {
@Test
void assertStartOrStopById() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
- jobAPI.stop(jobId.get());
+ jobManager.stop(jobId.get());
assertTrue(getJobConfigurationPOJO(jobId.get()).isDisabled());
- jobAPI.startDisabledJob(jobId.get());
+ jobManager.startDisabledJob(jobId.get());
assertFalse(getJobConfigurationPOJO(jobId.get()).isDisabled());
}
@Test
void assertRollback() throws SQLException {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
@@ -148,7 +152,7 @@ class MigrationJobAPITest {
@Test
void assertCommit() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
MigrationJobConfiguration jobConfig =
jobAPI.getJobConfiguration(jobId.get());
initTableData(jobConfig);
@@ -161,7 +165,7 @@ class MigrationJobAPITest {
@Test
void assertGetProgress() {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- Optional<String> jobId = jobAPI.start(jobConfig);
+ Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
Map<Integer, InventoryIncrementalJobItemProgress> jobProgressMap =
jobAPI.getJobProgress(jobConfig);
assertThat(jobProgressMap.size(), is(1));
@@ -171,7 +175,7 @@ class MigrationJobAPITest {
void assertDataConsistencyCheck() {
MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
initTableData(jobConfig);
- Optional<String> jobId = jobAPI.start(jobConfig);
+ Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildPipelineDataConsistencyChecker(
jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE",
null);
@@ -205,7 +209,7 @@ class MigrationJobAPITest {
@Test
void assertSwitchClusterConfigurationSucceed() {
final MigrationJobConfiguration jobConfig =
JobConfigurationBuilder.createJobConfiguration();
- Optional<String> jobId = jobAPI.start(jobConfig);
+ Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
MigrationJobItemContext jobItemContext =
PipelineContextUtils.mockMigrationJobItemContext(jobConfig);
jobAPI.persistJobItemProgress(jobItemContext);
@@ -308,7 +312,7 @@ class MigrationJobAPITest {
@Test
void assertGetJobItemInfosAtBegin() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new
YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setStatus(JobStatus.RUNNING.name());
@@ -323,7 +327,7 @@ class MigrationJobAPITest {
@Test
void assertGetJobItemInfosAtIncrementTask() {
- Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
+ Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
YamlInventoryIncrementalJobItemProgress yamlJobItemProgress = new
YamlInventoryIncrementalJobItemProgress();
yamlJobItemProgress.setSourceDatabaseType("MySQL");