This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f218cf727c0 Merge AbstractInseparablePipelineJob and CDCJob (#32759)
f218cf727c0 is described below
commit f218cf727c0f0c7e813270368fdfa6308b92bd83
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Sep 1 17:27:45 2024 +0800
Merge AbstractInseparablePipelineJob and CDCJob (#32759)
* Merge AbstractInseparablePipelineJob and CDCJob
* Merge AbstractInseparablePipelineJob and CDCJob
* Merge AbstractInseparablePipelineJob and CDCJob
* Merge AbstractInseparablePipelineJob and CDCJob
* Merge AbstractInseparablePipelineJob and CDCJob
---
.../core/job/AbstractInseparablePipelineJob.java | 178 ---------------------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 139 ++++++++++++----
2 files changed, 112 insertions(+), 205 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
deleted file mode 100644
index 3d3047bcdea..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
-import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
-import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
-import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Abstract inseparable pipeline job.
- *
- * @param <T> type of pipeline job configuration
- * @param <I> type of pipeline job item context
- * @param <P> type of pipeline job item progress
- */
-@Getter
-@Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
-
- private final PipelineJobRunnerManager jobRunnerManager;
-
- protected AbstractInseparablePipelineJob(final PipelineJobRunnerCleaner
cleaner) {
- jobRunnerManager = new PipelineJobRunnerManager(cleaner);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public final void execute(final ShardingContext shardingContext) {
- String jobId = shardingContext.getJobName();
- log.info("Execute job {}", jobId);
- PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
- PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
- T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- PipelineJobItemManager<P> jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
- TransmissionProcessContext jobProcessContext =
createTransmissionProcessContext(jobId, jobType, contextKey);
- PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
- Collection<I> jobItemContexts = new LinkedList<>();
- for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
- if (jobRunnerManager.isStopping()) {
- log.info("Job is stopping, ignore.");
- return;
- }
- P jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
- I jobItemContext = buildJobItemContext(jobConfig, shardingItem,
jobItemProgress, jobProcessContext);
- if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
- continue;
- }
- jobItemContexts.add(jobItemContext);
- governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
- log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId,
shardingItem);
- }
- if (jobItemContexts.isEmpty()) {
- log.warn("Job item contexts are empty, ignore.");
- return;
- }
- prepare(jobItemContexts, governanceFacade);
- executeInventoryTasks(jobItemContexts, jobItemManager);
- executeIncrementalTasks(jobItemContexts, jobItemManager);
- }
-
- private TransmissionProcessContext createTransmissionProcessContext(final
String jobId, final PipelineJobType jobType, final PipelineContextKey
contextKey) {
- if (!jobType.isTransmissionJob()) {
- return null;
- }
- PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType()));
- return new TransmissionProcessContext(jobId, processConfig);
- }
-
- protected abstract I buildJobItemContext(T jobConfig, int shardingItem, P
jobItemProgress, TransmissionProcessContext jobProcessContext);
-
- protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
-
- private void prepare(final Collection<I> jobItemContexts, final
PipelineGovernanceFacade governanceFacade) {
- try {
- doPrepare(jobItemContexts);
- // CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- for (PipelineJobItemContext each : jobItemContexts) {
- processFailed(each.getJobId(), each.getShardingItem(), ex,
governanceFacade);
- }
- throw ex;
- }
- }
-
- protected abstract void doPrepare(Collection<I> jobItemContexts);
-
- private void processFailed(final String jobId, final int shardingItem,
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
- log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
- governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
- PipelineJobRegistry.stop(jobId);
- processFailed(jobId);
- }
-
- protected abstract void processFailed(String jobId);
-
- private void executeInventoryTasks(final Collection<I> jobItemContexts,
final PipelineJobItemManager<P> jobItemManager) {
- Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (I each : jobItemContexts) {
- updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK,
jobItemManager);
- for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
- if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
- continue;
- }
- futures.addAll(task.start());
- }
- }
- if (futures.isEmpty()) {
- return;
- }
- ExecuteEngine.trigger(futures, buildExecuteCallback("inventory",
jobItemContexts.iterator().next()));
- }
-
- private void executeIncrementalTasks(final Collection<I> jobItemContexts,
final PipelineJobItemManager<P> jobItemManager) {
- Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (I each : jobItemContexts) {
- if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
- log.info("Job status has already EXECUTE_INCREMENTAL_TASK,
ignore.");
- return;
- }
- updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK,
jobItemManager);
- for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
- if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
- continue;
- }
- futures.addAll(task.start());
- }
- }
- ExecuteEngine.trigger(futures, buildExecuteCallback("incremental",
jobItemContexts.iterator().next()));
- }
-
- private void updateJobItemStatus(final I jobItemContext, final JobStatus
jobStatus, final PipelineJobItemManager<P> jobItemManager) {
- jobItemContext.setStatus(jobStatus);
- jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
- }
-
- protected abstract ExecuteCallback buildExecuteCallback(String identifier,
I jobItemContext);
-}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 8fcd1de568f..71474b4f6c5 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -31,59 +31,104 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.engine.CDCJobRunnerCleaner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.core.datasource.config.PipelineDataSourceConfigurationFactory;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
+import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipelineJob;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finished.IngestFinishedPosition;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
+import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* CDC job.
*/
@Slf4j
-public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext,
TransmissionJobItemProgress> {
+public final class CDCJob implements PipelineJob {
- private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+ @Getter
+ private final PipelineJobRunnerManager jobRunnerManager;
- private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
+ private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
@Getter
private final PipelineSink sink;
public CDCJob(final PipelineSink sink) {
- super(new CDCJobRunnerCleaner(sink));
+ jobRunnerManager = new PipelineJobRunnerManager(new
CDCJobRunnerCleaner(sink));
this.sink = sink;
}
@Override
- protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig,
- final int shardingItem,
final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
- CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getProcessConfiguration());
- return new CDCJobItemContext(jobConfig, shardingItem, jobItemProgress,
jobProcessContext, taskConfig, getJobRunnerManager().getDataSourceManager(),
sink);
+ public void execute(final ShardingContext shardingContext) {
+ String jobId = shardingContext.getJobName();
+ log.info("Execute job {}", jobId);
+ PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
+ PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(jobId);
+ CDCJobConfiguration jobConfig = (CDCJobConfiguration)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(
+ jobId,
PipelineProcessConfigurationUtils.fillInDefaultValue(new
PipelineProcessConfigurationPersistService().load(contextKey,
jobType.getType())));
+ PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
+ Collection<CDCJobItemContext> jobItemContexts = new LinkedList<>();
+ for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
+ if (jobRunnerManager.isStopping()) {
+ log.info("Job is stopping, ignore.");
+ return;
+ }
+ TransmissionJobItemProgress jobItemProgress =
jobItemManager.getProgress(shardingContext.getJobName(),
shardingItem).orElse(null);
+ CDCTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getProcessConfiguration());
+ CDCJobItemContext jobItemContext = new
CDCJobItemContext(jobConfig, shardingItem, jobItemProgress, jobProcessContext,
taskConfig, getJobRunnerManager().getDataSourceManager(), sink);
+ if (!jobRunnerManager.addTasksRunner(shardingItem, new
CDCTasksRunner(jobItemContext))) {
+ continue;
+ }
+ jobItemContexts.add(jobItemContext);
+ governanceFacade.getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
+ log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId,
shardingItem);
+ }
+ if (jobItemContexts.isEmpty()) {
+ log.warn("Job item contexts are empty, ignore.");
+ return;
+ }
+ initTasks(jobItemContexts, governanceFacade);
+ executeInventoryTasks(jobItemContexts, jobItemManager);
+ executeIncrementalTasks(jobItemContexts, jobItemManager);
}
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
@@ -101,8 +146,8 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
return new IncrementalDumperContext(dumperCommonContext,
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
- private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
- final
TableAndSchemaNameMapper mapper) {
+ private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
+ final
Collection<String> schemaTableNames, final TableAndSchemaNameMapper mapper) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
@@ -113,24 +158,64 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
mapper, write.getBatchSize(), writeRateLimitAlgorithm, 0, 1);
}
- @Override
- protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext
jobItemContext) {
- return new CDCTasksRunner(jobItemContext);
+ private void initTasks(final Collection<CDCJobItemContext>
jobItemContexts, final PipelineGovernanceFacade governanceFacade) {
+ try {
+ new CDCJobPreparer().initTasks(jobItemContexts);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ for (PipelineJobItemContext each : jobItemContexts) {
+ initTasksFailed(each.getJobId(), each.getShardingItem(), ex,
governanceFacade);
+ }
+ throw ex;
+ }
}
- @Override
- protected void doPrepare(final Collection<CDCJobItemContext>
jobItemContexts) {
- jobPreparer.initTasks(jobItemContexts);
+ private void initTasksFailed(final String jobId, final int shardingItem,
final Exception ex, final PipelineGovernanceFacade governanceFacade) {
+ log.error("Job {}-{} execution failed.", jobId, shardingItem, ex);
+ governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
+ PipelineJobRegistry.stop(jobId);
+ jobAPI.disable(jobId);
}
- @Override
- protected void processFailed(final String jobId) {
- jobAPI.disable(jobId);
+ private void executeInventoryTasks(final Collection<CDCJobItemContext>
jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager) {
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (CDCJobItemContext each : jobItemContexts) {
+ updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK,
jobItemManager);
+ for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ if (futures.isEmpty()) {
+ return;
+ }
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
jobItemContexts.iterator().next()));
}
- @Override
- protected ExecuteCallback buildExecuteCallback(final String identifier,
final CDCJobItemContext jobItemContext) {
- return new CDCExecuteCallback(identifier, jobItemContext);
+ private void executeIncrementalTasks(final Collection<CDCJobItemContext>
jobItemContexts, final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager) {
+ Collection<CompletableFuture<?>> futures = new LinkedList<>();
+ for (CDCJobItemContext each : jobItemContexts) {
+ if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
+ log.info("Job status has already EXECUTE_INCREMENTAL_TASK,
ignore.");
+ return;
+ }
+ updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK,
jobItemManager);
+ for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
+ if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
+ continue;
+ }
+ futures.addAll(task.start());
+ }
+ }
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.iterator().next()));
+ }
+
+ private void updateJobItemStatus(final CDCJobItemContext jobItemContext,
final JobStatus jobStatus, final
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager) {
+ jobItemContext.setStatus(jobStatus);
+ jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
@RequiredArgsConstructor
@@ -143,15 +228,15 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
@Override
public void onSuccess() {
if (jobItemContext.isStopping()) {
- log.info("onSuccess, stopping true, ignore");
+ log.info("Job is stopping, ignore.");
return;
}
- log.info("onSuccess, all {} tasks finished.", identifier);
+ log.info("All {} tasks finished successful.", identifier);
}
@Override
public void onFailure(final Throwable throwable) {
- log.error("onFailure, {} task execute failed.", identifier,
throwable);
+ log.error("Task {} execute failed.", identifier, throwable);
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof PipelineCDCSocketSink) {