This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8ae3d101fc0 Add AbstractInseparablePipelineJob (#32739)
8ae3d101fc0 is described below
commit 8ae3d101fc0d18939268b1e792ddc185347678d0
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 12:16:51 2024 +0800
Add AbstractInseparablePipelineJob (#32739)
---
.../core/job/AbstractInseparablePipelineJob.java | 36 ++++++++++++----------
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 11 +++----
2 files changed, 24 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 40737f682f4..6f770eafe38 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -42,29 +42,31 @@ import java.util.concurrent.CompletableFuture;
/**
* Abstract inseparable pipeline job.
- *
- * @param <T> type of pipeline job item context
+ *
+ * @param <T> type of pipeline job configuration
+ * @param <I> type of pipeline job item context
*/
@RequiredArgsConstructor
@Getter
@Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobItemContext> implements PipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext> implements
PipelineJob {
private final PipelineJobRunnerManager jobRunnerManager;
+ @SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
log.info("Execute job {}", jobId);
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
- PipelineJobConfiguration jobConfig =
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
- Collection<T> jobItemContexts = new LinkedList<>();
+ T jobConfig = (T)
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+ Collection<I> jobItemContexts = new LinkedList<>();
for (int shardingItem = 0; shardingItem <
jobConfig.getJobShardingCount(); shardingItem++) {
if (jobRunnerManager.isStopping()) {
- log.info("Stopping true, ignore");
+ log.info("Job is stopping, ignore.");
return;
}
- T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
+ I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
if (!jobRunnerManager.addTasksRunner(shardingItem,
buildTasksRunner(jobItemContext))) {
continue;
}
@@ -81,11 +83,11 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
executeIncrementalTasks(jobType, jobItemContexts);
}
- protected abstract T buildJobItemContext(PipelineJobConfiguration
jobConfig, int shardingItem);
+ protected abstract I buildJobItemContext(T jobConfig, int shardingItem);
- protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
+ protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
- private void prepare(final Collection<T> jobItemContexts) {
+ private void prepare(final Collection<I> jobItemContexts) {
try {
doPrepare(jobItemContexts);
// CHECKSTYLE:OFF
@@ -98,7 +100,7 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
}
}
- protected abstract void doPrepare(Collection<T> jobItemContexts);
+ protected abstract void doPrepare(Collection<I> jobItemContexts);
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
@@ -109,9 +111,9 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
protected abstract void processFailed(String jobId);
- private void executeInventoryTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
+ private void executeInventoryTasks(final PipelineJobType jobType, final
Collection<I> jobItemContexts) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (T each : jobItemContexts) {
+ for (I each : jobItemContexts) {
updateJobItemStatus(each, jobType,
JobStatus.EXECUTE_INVENTORY_TASK);
for (PipelineTask task : ((TransmissionJobItemContext)
each).getInventoryTasks()) {
if (task.getTaskProgress().getPosition() instanceof
IngestFinishedPosition) {
@@ -126,9 +128,9 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
ExecuteEngine.trigger(futures, buildExecuteCallback("inventory",
jobItemContexts.iterator().next()));
}
- private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<T> jobItemContexts) {
+ private void executeIncrementalTasks(final PipelineJobType jobType, final
Collection<I> jobItemContexts) {
Collection<CompletableFuture<?>> futures = new LinkedList<>();
- for (T each : jobItemContexts) {
+ for (I each : jobItemContexts) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
log.info("job status already EXECUTE_INCREMENTAL_TASK,
ignore");
return;
@@ -144,11 +146,11 @@ public abstract class AbstractInseparablePipelineJob<T
extends PipelineJobItemCo
ExecuteEngine.trigger(futures, buildExecuteCallback("incremental",
jobItemContexts.iterator().next()));
}
- private void updateJobItemStatus(final T jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
+ private void updateJobItemStatus(final I jobItemContext, final
PipelineJobType jobType, final JobStatus jobStatus) {
jobItemContext.setStatus(jobStatus);
PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager =
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
jobItemManager.updateStatus(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), jobStatus);
}
- protected abstract ExecuteCallback buildExecuteCallback(String identifier,
T jobItemContext);
+ protected abstract ExecuteCallback buildExecuteCallback(String identifier,
I jobItemContext);
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 9964e219b6c..64c6c8952b8 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipel
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
@@ -53,11 +52,11 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.Pipeline
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Collection;
@@ -70,7 +69,7 @@ import java.util.stream.Collectors;
* CDC job.
*/
@Slf4j
-public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemContext> {
+public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext> {
private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
@@ -89,13 +88,13 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobItemConte
}
@Override
- protected CDCJobItemContext buildJobItemContext(final
PipelineJobConfiguration jobConfig, final int shardingItem) {
+ protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
PipelineProcessConfiguration processConfig =
PipelineProcessConfigurationUtils.fillInDefaultValue(
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
"STREAMING"));
TransmissionProcessContext jobProcessContext = new
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
- CDCTaskConfiguration taskConfig =
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem,
jobProcessContext.getProcessConfiguration());
- return new CDCJobItemContext((CDCJobConfiguration) jobConfig,
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager(), sink);
+ CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getProcessConfiguration());
+ return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig,
getJobRunnerManager().getDataSourceManager(), sink);
}
private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {