This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ae3d101fc0 Add AbstractInseparablePipelineJob (#32739)
8ae3d101fc0 is described below

commit 8ae3d101fc0d18939268b1e792ddc185347678d0
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 12:16:51 2024 +0800

    Add AbstractInseparablePipelineJob (#32739)
---
 .../core/job/AbstractInseparablePipelineJob.java   | 36 ++++++++++++----------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 11 +++----
 2 files changed, 24 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index 40737f682f4..6f770eafe38 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -42,29 +42,31 @@ import java.util.concurrent.CompletableFuture;
 
 /**
  * Abstract inseparable pipeline job.
- * 
- * @param <T> type of pipeline job item context
+ *
+ * @param <T> type of pipeline job configuration
+ * @param <I> type of pipeline job item context
  */
 @RequiredArgsConstructor
 @Getter
 @Slf4j
-public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobItemContext> implements PipelineJob {
+public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext> implements 
PipelineJob {
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
+    @SuppressWarnings("unchecked")
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
-        PipelineJobConfiguration jobConfig = 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
-        Collection<T> jobItemContexts = new LinkedList<>();
+        T jobConfig = (T) 
jobType.getYamlJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
+        Collection<I> jobItemContexts = new LinkedList<>();
         for (int shardingItem = 0; shardingItem < 
jobConfig.getJobShardingCount(); shardingItem++) {
             if (jobRunnerManager.isStopping()) {
-                log.info("Stopping true, ignore");
+                log.info("Job is stopping, ignore.");
                 return;
             }
-            T jobItemContext = buildJobItemContext(jobConfig, shardingItem);
+            I jobItemContext = buildJobItemContext(jobConfig, shardingItem);
             if (!jobRunnerManager.addTasksRunner(shardingItem, 
buildTasksRunner(jobItemContext))) {
                 continue;
             }
@@ -81,11 +83,11 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         executeIncrementalTasks(jobType, jobItemContexts);
     }
     
-    protected abstract T buildJobItemContext(PipelineJobConfiguration 
jobConfig, int shardingItem);
+    protected abstract I buildJobItemContext(T jobConfig, int shardingItem);
     
-    protected abstract PipelineTasksRunner buildTasksRunner(T jobItemContext);
+    protected abstract PipelineTasksRunner buildTasksRunner(I jobItemContext);
     
-    private void prepare(final Collection<T> jobItemContexts) {
+    private void prepare(final Collection<I> jobItemContexts) {
         try {
             doPrepare(jobItemContexts);
             // CHECKSTYLE:OFF
@@ -98,7 +100,7 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         }
     }
     
-    protected abstract void doPrepare(Collection<T> jobItemContexts);
+    protected abstract void doPrepare(Collection<I> jobItemContexts);
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("Job execution failed, {}-{}", jobId, shardingItem, ex);
@@ -109,9 +111,9 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
     
     protected abstract void processFailed(String jobId);
     
-    private void executeInventoryTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
+    private void executeInventoryTasks(final PipelineJobType jobType, final 
Collection<I> jobItemContexts) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (T each : jobItemContexts) {
+        for (I each : jobItemContexts) {
             updateJobItemStatus(each, jobType, 
JobStatus.EXECUTE_INVENTORY_TASK);
             for (PipelineTask task : ((TransmissionJobItemContext) 
each).getInventoryTasks()) {
                 if (task.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
@@ -126,9 +128,9 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         ExecuteEngine.trigger(futures, buildExecuteCallback("inventory", 
jobItemContexts.iterator().next()));
     }
     
-    private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<T> jobItemContexts) {
+    private void executeIncrementalTasks(final PipelineJobType jobType, final 
Collection<I> jobItemContexts) {
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
-        for (T each : jobItemContexts) {
+        for (I each : jobItemContexts) {
             if (JobStatus.EXECUTE_INCREMENTAL_TASK == each.getStatus()) {
                 log.info("job status already EXECUTE_INCREMENTAL_TASK, 
ignore");
                 return;
@@ -144,11 +146,11 @@ public abstract class AbstractInseparablePipelineJob<T 
extends PipelineJobItemCo
         ExecuteEngine.trigger(futures, buildExecuteCallback("incremental", 
jobItemContexts.iterator().next()));
     }
     
-    private void updateJobItemStatus(final T jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
+    private void updateJobItemStatus(final I jobItemContext, final 
PipelineJobType jobType, final JobStatus jobStatus) {
         jobItemContext.setStatus(jobStatus);
         PipelineJobItemManager<TransmissionJobItemProgress> jobItemManager = 
new PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
         jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
     }
     
-    protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
T jobItemContext);
+    protected abstract ExecuteCallback buildExecuteCallback(String identifier, 
I jobItemContext);
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 9964e219b6c..64c6c8952b8 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipel
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
@@ -53,11 +52,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.Pipeline
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineWriteConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Collection;
@@ -70,7 +69,7 @@ import java.util.stream.Collectors;
  * CDC job.
  */
 @Slf4j
-public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemContext> {
+public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfiguration, CDCJobItemContext> {
     
     private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
     
@@ -89,13 +88,13 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobItemConte
     }
     
     @Override
-    protected CDCJobItemContext buildJobItemContext(final 
PipelineJobConfiguration jobConfig, final int shardingItem) {
+    protected CDCJobItemContext buildJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         PipelineProcessConfiguration processConfig = 
PipelineProcessConfigurationUtils.fillInDefaultValue(
                 
processConfigPersistService.load(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()),
 "STREAMING"));
         TransmissionProcessContext jobProcessContext = new 
TransmissionProcessContext(jobConfig.getJobId(), processConfig);
-        CDCTaskConfiguration taskConfig = 
buildTaskConfiguration((CDCJobConfiguration) jobConfig, shardingItem, 
jobProcessContext.getProcessConfiguration());
-        return new CDCJobItemContext((CDCJobConfiguration) jobConfig, 
shardingItem, initProgress.orElse(null), jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager(), sink);
+        CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getProcessConfiguration());
+        return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, 
getJobRunnerManager().getDataSourceManager(), sink);
     }
     
     private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {

Reply via email to