This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 394353400b1 Add AbstractInseparablePipelineJob (#29336)
394353400b1 is described below
commit 394353400b12f93ecd816414e733570233478069
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 15:01:24 2023 +0800
Add AbstractInseparablePipelineJob (#29336)
* Rename AbstractSeparablePipelineJob
* Add AbstractInseparablePipelineJob
* Add AbstractInseparablePipelineJob
* Add AbstractInseparablePipelineJob
---
.../core/job/AbstractInseparablePipelineJob.java | 152 +++++++++++++++++++++
.../pipeline/core/job/AbstractPipelineJob.java | 31 ++---
...eJob.java => AbstractSeparablePipelineJob.java} | 22 ++-
.../data/pipeline/core/job/PipelineJob.java | 4 +-
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 120 ++++------------
.../consistencycheck/ConsistencyCheckJob.java | 4 +-
.../pipeline/scenario/migration/MigrationJob.java | 4 +-
7 files changed, 211 insertions(+), 126 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
new file mode 100644
index 00000000000..845d3605f0a
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job;
+
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Abstract inseparable pipeline job.
+ */
+@Slf4j
+public abstract class AbstractInseparablePipelineJob extends
AbstractPipelineJob {
+
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
+
+ protected AbstractInseparablePipelineJob(final String jobId) {
+ super(jobId);
+ jobItemManager = new
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+ }
+
+ @Override
+ public final void execute(final ShardingContext shardingContext) {
+ String jobId = shardingContext.getJobName();
+ log.info("Execute job {}", jobId);
+ PipelineJobConfiguration jobConfig =
getJobConfiguration(shardingContext);
+ Collection<PipelineJobItemContext> jobItemContexts = new
LinkedList<>();
+ for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
+ if (isStopping()) {
+ log.info("stopping true, ignore");
+ return;
+ }
+ PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(jobConfig, shardingItem);
+ if (!addTasksRunner(shardingItem,
buildPipelineTasksRunner(jobItemContext))) {
+ continue;
+ }
+ jobItemContexts.add(jobItemContext);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
+ log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
+ }
+ if (jobItemContexts.isEmpty()) {
+ log.warn("job item contexts empty, ignore");
+ return;
+ }
+ prepare(jobItemContexts);
+ executeInventoryTasks(jobItemContexts);
+ executeIncrementalTasks(jobItemContexts);
+ }
+
+ protected abstract PipelineJobConfiguration
getJobConfiguration(ShardingContext shardingContext);
+
+ protected abstract PipelineJobItemContext
buildPipelineJobItemContext(PipelineJobConfiguration jobConfig, int
shardingItem);
+
+ protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+
+ private void prepare(final Collection<PipelineJobItemContext>
jobItemContexts) {
+ try {
+ doPrepare0(jobItemContexts);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ for (PipelineJobItemContext each : jobItemContexts) {
+ processFailed(each.getJobId(), each.getShardingItem(), ex);
+ }
+ throw ex;
+ }
+ }
+
+ protected abstract void doPrepare0(Collection<PipelineJobItemContext>
jobItemContexts);
+
+ private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
+ log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
+ PipelineJobRegistry.stop(jobId);
+ processFailed(jobId);
+ }
+
+ protected abstract void processFailed(String jobId);
+
+ private void executeInventoryTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (PipelineJobItemContext each : jobItemContexts) {
+ updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INVENTORY_TASK);
+ for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ if (futures.isEmpty()) {
+ return;
+ }
+ executeInventoryTasks(futures, jobItemContexts);
+ }
+
+ protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures,
Collection<PipelineJobItemContext> jobItemContexts);
+
+ private void updateLocalAndRemoteJobItemStatus(final
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+ jobItemContext.setStatus(jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
+ }
+
+ private void executeIncrementalTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
+ log.info("execute incremental tasks, jobId={}", getJobId());
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (PipelineJobItemContext each : jobItemContexts) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+ log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
+ return;
+ }
+ updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INCREMENTAL_TASK);
+ for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ executeIncrementalTasks(futures, jobItemContexts);
+ }
+
+ protected abstract void
executeIncrementalTasks(Collection<CompletableFuture<?>> futures,
Collection<PipelineJobItemContext> jobItemContexts);
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 910e7ceaff2..402d9546f7a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -20,22 +20,19 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
-import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
@@ -61,7 +58,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private final AtomicReference<JobBootstrap> jobBootstrap = new
AtomicReference<>();
- private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, PipelineTasksRunner> tasksRunners = new
ConcurrentHashMap<>();
protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
@@ -86,30 +83,18 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
this.jobBootstrap.set(jobBootstrap);
}
- protected void prepare(final PipelineJobItemContext jobItemContext) {
- try {
- doPrepare(jobItemContext);
- // CHECKSTYLE:OFF
- } catch (final SQLException ex) {
- // CHECKSTYLE:ON
- throw new PipelineInternalException(ex);
- }
- }
-
- protected abstract void doPrepare(PipelineJobItemContext jobItemContext)
throws SQLException;
-
@Override
public final Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
- return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
+ return Optional.ofNullable(tasksRunners.get(shardingItem));
}
@Override
public final Collection<Integer> getShardingItems() {
- return new ArrayList<>(tasksRunnerMap.keySet());
+ return new ArrayList<>(tasksRunners.keySet());
}
protected final boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
- if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
+ if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
log.warn("shardingItem {} tasks runner exists, ignore",
shardingItem);
return false;
}
@@ -132,7 +117,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private void innerStop() {
stopping.set(true);
log.info("stop tasks runner, jobId={}", jobId);
- for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+ for (PipelineTasksRunner each : tasksRunners.values()) {
each.stop();
}
Optional<ElasticJobListener> pipelineJobListener =
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class,
PipelineElasticJobListener.class.getName());
@@ -161,7 +146,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
private void innerClean() {
PipelineJobProgressPersistService.remove(jobId);
- for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+ for (PipelineTasksRunner each : tasksRunners.values()) {
QuietlyCloser.close(each.getJobItemContext().getJobProcessContext());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
similarity index 83%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 96861144eb8..cd668ca127f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -19,21 +19,23 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+
+import java.sql.SQLException;
/**
- * Abstract simple pipeline job.
+ * Abstract separable pipeline job.
*/
@Slf4j
-public abstract class AbstractSimplePipelineJob extends AbstractPipelineJob
implements SimpleJob {
+public abstract class AbstractSeparablePipelineJob extends AbstractPipelineJob
{
- protected AbstractSimplePipelineJob(final String jobId) {
+ protected AbstractSeparablePipelineJob(final String jobId) {
super(jobId);
}
@@ -69,6 +71,18 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
tasksRunner.start();
}
+ protected final void prepare(final PipelineJobItemContext jobItemContext) {
+ try {
+ doPrepare(jobItemContext);
+ // CHECKSTYLE:OFF
+ } catch (final SQLException ex) {
+ // CHECKSTYLE:ON
+ throw new PipelineInternalException(ex);
+ }
+ }
+
+ protected abstract void doPrepare(PipelineJobItemContext jobItemContext)
throws SQLException;
+
protected abstract PipelineJobItemContext
buildPipelineJobItemContext(ShardingContext shardingContext);
protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
index 0d20a209072..e71792d6f9a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJob.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.api.ElasticJob;
+import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.util.Collection;
import java.util.Optional;
@@ -26,7 +26,7 @@ import java.util.Optional;
/**
* Pipeline job.
*/
-public interface PipelineJob extends ElasticJob {
+public interface PipelineJob extends SimpleJob {
/**
* Get tasks runner.
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index b8999fb58bb..b957fb2c7b5 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,12 +45,11 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.FinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -60,7 +59,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -68,8 +67,6 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -80,7 +77,7 @@ import java.util.stream.Collectors;
* CDC job.
*/
@Slf4j
-public final class CDCJob extends AbstractPipelineJob implements SimpleJob {
+public final class CDCJob extends AbstractInseparablePipelineJob implements
SimpleJob {
@Getter
private final PipelineSink sink;
@@ -103,40 +100,28 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
}
@Override
- public void execute(final ShardingContext shardingContext) {
- String jobId = shardingContext.getJobName();
- log.info("Execute job {}", jobId);
- CDCJobConfiguration jobConfig = new
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- List<CDCJobItemContext> jobItemContexts = new LinkedList<>();
- for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
- if (isStopping()) {
- log.info("stopping true, ignore");
- return;
- }
- CDCJobItemContext jobItemContext =
buildCDCJobItemContext(jobConfig, shardingItem);
- if (!addTasksRunner(shardingItem, new
CDCTasksRunner(jobItemContext))) {
- continue;
- }
- jobItemContexts.add(jobItemContext);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
- log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
- }
- if (jobItemContexts.isEmpty()) {
- log.warn("job item contexts empty, ignore");
- return;
- }
- prepare(jobItemContexts);
- executeInventoryTasks(jobItemContexts);
- executeIncrementalTasks(jobItemContexts);
+ protected PipelineJobConfiguration getJobConfiguration(final
ShardingContext shardingContext) {
+ return new
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
}
- private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
+ @Override
+ protected PipelineJobItemContext buildPipelineJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobType.getType()));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
- CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
- return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
+ CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
dataSourceManager, sink);
+ }
+
+ @Override
+ protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext jobItemContext) {
+ return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+ }
+
+ @Override
+ protected void doPrepare0(final Collection<PipelineJobItemContext>
jobItemContexts) {
+ jobPreparer.initTasks(jobItemContexts.stream().map(each ->
(CDCJobItemContext) each).collect(Collectors.toList()));
}
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
@@ -169,70 +154,19 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
}
- private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
- try {
- jobPreparer.initTasks(jobItemContexts);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- for (PipelineJobItemContext each : jobItemContexts) {
- processFailed(each.getJobId(), each.getShardingItem(), ex);
- }
- throw ex;
- }
- }
-
- private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
- PipelineJobRegistry.stop(jobId);
+ @Override
+ protected void processFailed(final String jobId) {
jobAPI.disable(jobId);
}
- private void executeInventoryTasks(final List<CDCJobItemContext>
jobItemContexts) {
- Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (CDCJobItemContext each : jobItemContexts) {
- updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INVENTORY_TASK);
- for (PipelineTask task : each.getInventoryTasks()) {
- if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
- continue;
- }
- futures.addAll(task.start());
- }
- }
- if (futures.isEmpty()) {
- return;
- }
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
jobItemContexts.get(0)));
- }
-
- private void updateLocalAndRemoteJobItemStatus(final
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
- jobItemContext.setStatus(jobStatus);
- jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
- }
-
- private void executeIncrementalTasks(final List<CDCJobItemContext>
jobItemContexts) {
- log.info("execute incremental tasks, jobId={}", getJobId());
- Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (CDCJobItemContext each : jobItemContexts) {
- if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
- log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
- return;
- }
- updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INCREMENTAL_TASK);
- for (PipelineTask task : each.getIncrementalTasks()) {
- if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
- continue;
- }
- futures.addAll(task.start());
- }
- }
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.get(0)));
+ @Override
+ protected void executeInventoryTasks(final
Collection<CompletableFuture<?>> futures, final
Collection<PipelineJobItemContext> jobItemContexts) {
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
(CDCJobItemContext) jobItemContexts.iterator().next()));
}
@Override
- protected void doPrepare(final PipelineJobItemContext jobItemContext) {
- throw new UnsupportedOperationException();
+ protected void executeIncrementalTasks(final
Collection<CompletableFuture<?>> futures, final
Collection<PipelineJobItemContext> jobItemContexts) {
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
(CDCJobItemContext) jobItemContexts.iterator().next()));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 039418b6ac1..d669eda81e1 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -36,7 +36,7 @@ import java.util.Optional;
* Consistency check job.
*/
@Slf4j
-public final class ConsistencyCheckJob extends AbstractSimplePipelineJob {
+public final class ConsistencyCheckJob extends AbstractSeparablePipelineJob {
public ConsistencyCheckJob(final String jobId) {
super(jobId);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index e9a3b6440ea..b990ef1a620 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -37,7 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAl
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
@@ -66,7 +66,7 @@ import java.util.stream.Collectors;
* Migration job.
*/
@Slf4j
-public final class MigrationJob extends AbstractSimplePipelineJob {
+public final class MigrationJob extends AbstractSeparablePipelineJob {
private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new PipelineJobItemManager<>(new
MigrationJobType().getYamlJobItemProgressSwapper());