This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ee50359dbd8 Refactor CDCJob (#29338)
ee50359dbd8 is described below
commit ee50359dbd8bee8b5a8d767ae4b55dc080eb5aba
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 16:21:52 2023 +0800
Refactor CDCJob (#29338)
* Refactor AbstractInseparablePipelineJob
* Refactor CDCJob
---
.../core/job/AbstractInseparablePipelineJob.java | 32 ++++++------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 57 ++++++++++------------
2 files changed, 40 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 845d3605f0a..157fd259845 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -51,23 +51,23 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
- PipelineJobConfiguration jobConfig =
getJobConfiguration(shardingContext);
+ PipelineJobConfiguration jobConfig =
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Collection<PipelineJobItemContext> jobItemContexts = new
LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (isStopping()) {
- log.info("stopping true, ignore");
+ log.info("Stopping true, ignore");
return;
}
- PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(jobConfig, shardingItem);
- if (!addTasksRunner(shardingItem,
buildPipelineTasksRunner(jobItemContext))) {
+ PipelineJobItemContext jobItemContext =
buildJobItemContext(jobConfig, shardingItem);
+ if (!addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
jobItemContexts.add(jobItemContext);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
- log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
+ log.info("Start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
- log.warn("job item contexts empty, ignore");
+ log.warn("Job item contexts is empty, ignore");
return;
}
prepare(jobItemContexts);
@@ -75,15 +75,13 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
executeIncrementalTasks(jobItemContexts);
}
- protected abstract PipelineJobConfiguration
getJobConfiguration(ShardingContext shardingContext);
+ protected abstract PipelineJobItemContext
buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem);
- protected abstract PipelineJobItemContext
buildPipelineJobItemContext(PipelineJobConfiguration jobConfig, int
shardingItem);
-
- protected abstract PipelineTasksRunner
buildPipelineTasksRunner(PipelineJobItemContext pipelineJobItemContext);
+ protected abstract PipelineTasksRunner
buildTasksRunner(PipelineJobItemContext jobItemContext);
private void prepare(final Collection<PipelineJobItemContext>
jobItemContexts) {
try {
- doPrepare0(jobItemContexts);
+ doPrepare(jobItemContexts);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -94,10 +92,10 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
}
}
- protected abstract void doPrepare0(Collection<PipelineJobItemContext>
jobItemContexts);
+ protected abstract void doPrepare(Collection<PipelineJobItemContext>
jobItemContexts);
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
- log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
+ log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
PipelineJobRegistry.stop(jobId);
processFailed(jobId);
@@ -108,7 +106,7 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
private void executeInventoryTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (PipelineJobItemContext each : jobItemContexts) {
- updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INVENTORY_TASK);
+ updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
continue;
@@ -124,20 +122,20 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures,
Collection<PipelineJobItemContext> jobItemContexts);
- private void updateLocalAndRemoteJobItemStatus(final
PipelineJobItemContext jobItemContext, final JobStatus jobStatus) {
+ private void updateJobItemStatus(final PipelineJobItemContext
jobItemContext, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
private void executeIncrementalTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
- log.info("execute incremental tasks, jobId={}", getJobId());
+ log.info("Execute incremental tasks, jobId={}", getJobId());
Collection<CompletableFuture<?>> futures = new LinkedList<>();
for (PipelineJobItemContext each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
return;
}
- updateLocalAndRemoteJobItemStatus(each,
JobStatus.EXECUTE_INCREMENTAL_TASK);
+ updateJobItemStatus(each, JobStatus.EXECUTE_INCREMENTAL_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getIncrementalTasks()) {
if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
continue;
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 3150895b804..81cf62b827e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -25,7 +25,6 @@ import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSour
import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
@@ -55,13 +54,11 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
-import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
@@ -82,55 +79,42 @@ public final class CDCJob extends
AbstractInseparablePipelineJob implements Simp
@Getter
private final PipelineSink sink;
- private final PipelineJobType jobType =
TypedSPILoader.getService(PipelineJobType.class, "STREAMING");
+ private final CDCJobAPI jobAPI;
- private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+ private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
- private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
+ private final PipelineProcessConfigurationPersistService
processConfigPersistService;
- private final PipelineProcessConfigurationPersistService
processConfigPersistService = new PipelineProcessConfigurationPersistService();
+ private final PipelineDataSourceManager dataSourceManager;
- private final CDCJobPreparer jobPreparer = new CDCJobPreparer();
-
- private final PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
+ private final CDCJobPreparer jobPreparer;
public CDCJob(final String jobId, final PipelineSink sink) {
super(jobId);
this.sink = sink;
+ jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+ jobItemManager = new
PipelineJobItemManager<>(getJobType().getYamlJobItemProgressSwapper());
+ processConfigPersistService = new
PipelineProcessConfigurationPersistService();
+ dataSourceManager = new DefaultPipelineDataSourceManager();
+ jobPreparer = new CDCJobPreparer();
}
@Override
- protected PipelineJobConfiguration getJobConfiguration(final
ShardingContext shardingContext) {
- return new
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- }
-
- @Override
- protected PipelineJobItemContext buildPipelineJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
+ protected PipelineJobItemContext buildJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
-
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
jobType.getType()));
+
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
getJobType().getType()));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ log.debug("buildTaskConfiguration, result={}", taskConfig);
return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
dataSourceManager, sink);
}
- @Override
- protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext jobItemContext) {
- return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
- }
-
- @Override
- protected void doPrepare0(final Collection<PipelineJobItemContext>
jobItemContexts) {
- jobPreparer.initTasks(jobItemContexts.stream().map(each ->
(CDCJobItemContext) each).collect(Collectors.toList()));
- }
-
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
- CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
- log.debug("buildTaskConfiguration, result={}", result);
- return result;
+ return new CDCTaskConfiguration(dumperContext, importerConfig);
}
private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
@@ -146,14 +130,23 @@ public final class CDCJob extends
AbstractInseparablePipelineJob implements Simp
final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
- TransmissionProcessContext processContext = new
TransmissionProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
}
+ @Override
+ protected PipelineTasksRunner buildTasksRunner(final
PipelineJobItemContext jobItemContext) {
+ return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+ }
+
+ @Override
+ protected void doPrepare(final Collection<PipelineJobItemContext>
jobItemContexts) {
+ jobPreparer.initTasks(jobItemContexts.stream().map(each ->
(CDCJobItemContext) each).collect(Collectors.toList()));
+ }
+
@Override
protected void processFailed(final String jobId) {
jobAPI.disable(jobId);