This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8ada091156d Add generic type of AbstractInseparablePipelineJob (#29340)
8ada091156d is described below
commit 8ada091156d815cb6917223f7715a38b9411d471
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 18:08:33 2023 +0800
Add generic type of AbstractInseparablePipelineJob (#29340)
---
.../core/job/AbstractInseparablePipelineJob.java | 30 +++++++-------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 47 ++++++++++------------
2 files changed, 38 insertions(+), 39 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 157fd259845..9dcde50e8e2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -36,9 +36,11 @@ import java.util.concurrent.CompletableFuture;
/**
* Abstract inseparable pipeline job.
+ *
+ * @param <T> type of pipeline job item context
*/
@Slf4j
-public abstract class AbstractInseparablePipelineJob extends
AbstractPipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> extends AbstractPipelineJob {
private final PipelineJobItemManager<TransmissionJobItemProgress>
jobItemManager;
@@ -52,13 +54,13 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
PipelineJobConfiguration jobConfig =
getJobType().getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- Collection<PipelineJobItemContext> jobItemContexts = new
LinkedList<>();
+ Collection<T> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (isStopping()) {
log.info("Stopping true, ignore");
return;
}
- PipelineJobItemContext jobItemContext =
buildJobItemContext(jobConfig, shardingItem);
+ T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
if (!addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
@@ -75,11 +77,11 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
executeIncrementalTasks(jobItemContexts);
}
- protected abstract PipelineJobItemContext
buildJobItemContext(PipelineJobConfiguration jobConfig, int shardingItem);
+ protected abstract T buildJobItemContext(PipelineJobConfiguration
jobConfig, int shardingItem);
- protected abstract PipelineTasksRunner
buildTasksRunner(PipelineJobItemContext jobItemContext);
+ protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
- private void prepare(final Collection<PipelineJobItemContext>
jobItemContexts) {
+ private void prepare(final Collection<T> jobItemContexts) {
try {
doPrepare(jobItemContexts);
// CHECKSTYLE:OFF
@@ -92,7 +94,7 @@ public abstract class AbstractInseparablePipelineJob extends
AbstractPipelineJob
}
}
- protected abstract void doPrepare(Collection<PipelineJobItemContext>
jobItemContexts);
+ protected abstract void doPrepare(Collection<T> jobItemContexts);
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
@@ -103,9 +105,9 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
protected abstract void processFailed(String jobId);
- private void executeInventoryTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
+ private void executeInventoryTasks(final Collection<T> jobItemContexts) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (PipelineJobItemContext each : jobItemContexts) {
+ for (T each : jobItemContexts) {
updateJobItemStatus(each, JobStatus.EXECUTE_INVENTORY_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
if (task.getTaskProgress().getPosition() instanceof
FinishedPosition) {
@@ -120,17 +122,17 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
executeInventoryTasks(futures, jobItemContexts);
}
- protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures,
Collection<PipelineJobItemContext> jobItemContexts);
+ protected abstract void
executeInventoryTasks(Collection<CompletableFuture<?>> futures, Collection<T>
jobItemContexts);
- private void updateJobItemStatus(final PipelineJobItemContext
jobItemContext, final JobStatus jobStatus) {
+ private void updateJobItemStatus(final T jobItemContext, final JobStatus
jobStatus) {
jobItemContext.setStatus(jobStatus);
jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
- private void executeIncrementalTasks(final
Collection<PipelineJobItemContext> jobItemContexts) {
+ private void executeIncrementalTasks(final Collection<T> jobItemContexts) {
log.info("Execute incremental tasks, jobId={}", getJobId());
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (PipelineJobItemContext each : jobItemContexts) {
+ for (T each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
return;
@@ -146,5 +148,5 @@ public abstract class AbstractInseparablePipelineJob
extends AbstractPipelineJob
executeIncrementalTasks(futures, jobItemContexts);
}
- protected abstract void
executeIncrementalTasks(Collection<CompletableFuture<?>> futures,
Collection<PipelineJobItemContext> jobItemContexts);
+ protected abstract void
executeIncrementalTasks(Collection<CompletableFuture<?>> futures, Collection<T>
jobItemContexts);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 81cf62b827e..009ea5df9f6 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketS
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLineConvertUtils;
@@ -74,7 +73,7 @@ import java.util.stream.Collectors;
* CDC job.
*/
@Slf4j
-public final class CDCJob extends AbstractInseparablePipelineJob implements
SimpleJob {
+public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemContext> implements SimpleJob {
@Getter
private final PipelineSink sink;
@@ -100,51 +99,49 @@ public final class CDCJob extends
AbstractInseparablePipelineJob implements Simp
}
@Override
- protected PipelineJobItemContext buildJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
+ protected CDCJobItemContext buildJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.convertWithDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
getJobType().getType()));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
- log.debug("buildTaskConfiguration, result={}", taskConfig);
return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
dataSourceManager, sink);
}
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
- IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+ TableAndSchemaNameMapper mapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+ IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, mapper);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), mapper);
return new CDCTaskConfiguration(dumperContext, importerConfig);
}
- private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper mapper) {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- return new IncrementalDumperContext(
- new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
- jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+ DumperCommonContext dumperCommonContext = new
DumperCommonContext(dataSourceName, actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), mapper);
+ return new IncrementalDumperContext(dumperCommonContext,
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
- final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
- jobConfig.getDataSourceConfig().getParameter());
- JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ final
TableAndSchemaNameMapper mapper) {
+ PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(
+ jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
mapper, batchSize, writeRateLimitAlgorithm, 0, 1);
}
@Override
- protected PipelineTasksRunner buildTasksRunner(final
PipelineJobItemContext jobItemContext) {
- return new CDCTasksRunner((CDCJobItemContext) jobItemContext);
+ protected PipelineTasksRunner buildTasksRunner(final CDCJobItemContext
jobItemContext) {
+ return new CDCTasksRunner(jobItemContext);
}
@Override
- protected void doPrepare(final Collection<PipelineJobItemContext>
jobItemContexts) {
- jobPreparer.initTasks(jobItemContexts.stream().map(each ->
(CDCJobItemContext) each).collect(Collectors.toList()));
+ protected void doPrepare(final Collection<CDCJobItemContext>
jobItemContexts) {
+ jobPreparer.initTasks(jobItemContexts);
}
@Override
@@ -153,13 +150,13 @@ public final class CDCJob extends
AbstractInseparablePipelineJob implements Simp
}
@Override
- protected void executeInventoryTasks(final
Collection<CompletableFuture<?>> futures, final
Collection<PipelineJobItemContext> jobItemContexts) {
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
(CDCJobItemContext) jobItemContexts.iterator().next()));
+ protected void executeInventoryTasks(final
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext>
jobItemContexts) {
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("inventory",
jobItemContexts.iterator().next()));
}
@Override
- protected void executeIncrementalTasks(final
Collection<CompletableFuture<?>> futures, final
Collection<PipelineJobItemContext> jobItemContexts) {
- ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
(CDCJobItemContext) jobItemContexts.iterator().next()));
+ protected void executeIncrementalTasks(final
Collection<CompletableFuture<?>> futures, final Collection<CDCJobItemContext>
jobItemContexts) {
+ ExecuteEngine.trigger(futures, new CDCExecuteCallback("incremental",
jobItemContexts.iterator().next()));
}
@Override