This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 670d974860e Refactor TransmissionTasksRunner (#32766)
670d974860e is described below

commit 670d974860e56ef1897cd187f64f464825c22296
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Sep 2 12:46:23 2024 +0800

    Refactor TransmissionTasksRunner (#32766)
---
 .../core/task/runner/TransmissionTasksRunner.java  | 84 +++++++++-------------
 1 file changed, 35 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index d869bd2a2ef..0d84ed1f7df 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -31,9 +31,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProg
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
+import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
@@ -57,8 +57,6 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
     
     private final PipelineJobType jobType;
     
-    private final PipelineJobManager jobManager;
-    
     private final PipelineJobItemManager<TransmissionJobItemProgress> 
jobItemManager;
     
     public TransmissionTasksRunner(final TransmissionJobItemContext 
jobItemContext) {
@@ -66,40 +64,23 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         inventoryTasks = jobItemContext.getInventoryTasks();
         incrementalTasks = jobItemContext.getIncrementalTasks();
         jobType = TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
-        jobManager = new PipelineJobManager(jobType);
         jobItemManager = new 
PipelineJobItemManager<>(jobType.getYamlJobItemProgressSwapper());
     }
     
-    @Override
-    public void stop() {
-        jobItemContext.setStopping(true);
-        for (PipelineTask each : inventoryTasks) {
-            each.stop();
-            QuietlyCloser.close(each);
-        }
-        for (PipelineTask each : incrementalTasks) {
-            each.stop();
-            QuietlyCloser.close(each);
-        }
-    }
-    
     @Override
     public void start() {
-        if (jobItemContext.isStopping()) {
-            throw new PipelineJobCancelingException();
-        }
-        new 
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class, 
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
-                
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
+        ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
+        jobItemManager.persistProgress(jobItemContext);
         if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
             log.info("All inventory tasks finished.");
-            executeIncrementalTask();
+            executeIncrementalTasks();
         } else {
-            executeInventoryTask();
+            executeInventoryTasks();
         }
     }
     
-    private synchronized void executeInventoryTask() {
-        updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
+    private synchronized void executeInventoryTasks() {
+        updateJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (PipelineTask each : inventoryTasks) {
             if (each.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
@@ -110,24 +91,17 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         ExecuteEngine.trigger(futures, new InventoryTaskExecuteCallback());
     }
     
-    private void updateLocalAndRemoteJobItemStatus(final JobStatus jobStatus) {
-        jobItemContext.setStatus(jobStatus);
-        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
-    }
-    
-    private synchronized void executeIncrementalTask() {
-        if (jobItemContext.isStopping()) {
-            throw new PipelineJobCancelingException();
-        }
+    private synchronized void executeIncrementalTasks() {
+        ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
         if (incrementalTasks.isEmpty()) {
-            log.info("incrementalTasks empty, ignore");
+            log.info("Incremental tasks are empty, ignore.");
             return;
         }
         if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemContext.getStatus()) {
-            log.info("job status already EXECUTE_INCREMENTAL_TASK, ignore");
+            log.info("Incremental tasks had already run, ignore.");
             return;
         }
-        updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
+        updateJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK);
         Collection<CompletableFuture<?>> futures = new LinkedList<>();
         for (PipelineTask each : incrementalTasks) {
             if (each.getTaskProgress().getPosition() instanceof 
IngestFinishedPosition) {
@@ -138,13 +112,21 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         ExecuteEngine.trigger(futures, new IncrementalExecuteCallback());
     }
     
-    protected void inventorySuccessCallback() {
-        if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
-            log.info("onSuccess, all inventory tasks finished.");
-            
PipelineJobProgressPersistService.persistNow(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
-            executeIncrementalTask();
-        } else {
-            log.info("onSuccess, inventory tasks not finished");
+    private void updateJobItemStatus(final JobStatus jobStatus) {
+        jobItemContext.setStatus(jobStatus);
+        jobItemManager.updateStatus(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), jobStatus);
+    }
+    
+    @Override
+    public void stop() {
+        jobItemContext.setStopping(true);
+        for (PipelineTask each : inventoryTasks) {
+            each.stop();
+            QuietlyCloser.close(each);
+        }
+        for (PipelineTask each : incrementalTasks) {
+            each.stop();
+            QuietlyCloser.close(each);
         }
     }
     
@@ -152,10 +134,14 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         public void onSuccess() {
-            if (jobItemContext.isStopping()) {
-                throw new PipelineJobCancelingException();
+            
ShardingSpherePreconditions.checkState(!jobItemContext.isStopping(), 
PipelineJobCancelingException::new);
+            if 
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
+                log.info("onSuccess, all inventory tasks finished.");
+                
PipelineJobProgressPersistService.persistNow(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
+                executeIncrementalTasks();
+            } else {
+                log.info("onSuccess, inventory tasks did not finish.");
             }
-            inventorySuccessCallback();
         }
         
         @Override
@@ -163,7 +149,7 @@ public class TransmissionTasksRunner implements 
PipelineTasksRunner {
         }
     }
     
-    private final class IncrementalExecuteCallback implements ExecuteCallback {
+    private static final class IncrementalExecuteCallback implements 
ExecuteCallback {
         
         @Override
         public void onSuccess() {

Reply via email to