This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a17d519718b Refactor AbstractPipelineJob (#29337)
a17d519718b is described below

commit a17d519718ba5f95f856ebe522fa4ae23fe071c7
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 9 15:43:05 2023 +0800

    Refactor AbstractPipelineJob (#29337)
---
 .../pipeline/core/job/AbstractPipelineJob.java     | 54 +++++++++-------------
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   |  2 +-
 .../consistencycheck/ConsistencyCheckJob.java      |  2 +-
 .../pipeline/scenario/migration/MigrationJob.java  |  2 +-
 4 files changed, 26 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 402d9546f7a..75b09d70ad4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -38,7 +38,6 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -48,6 +47,8 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public abstract class AbstractPipelineJob implements PipelineJob {
     
+    private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
+    
     @Getter
     private final String jobId;
     
@@ -107,49 +108,40 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     @Override
     public final void stop() {
         try {
-            innerStop();
+            stopping.set(true);
+            log.info("Stop tasks runner, jobId={}", jobId);
+            tasksRunners.values().forEach(PipelineTasksRunner::stop);
+            awaitJobStopped(jobId);
+            if (null != jobBootstrap.get()) {
+                jobBootstrap.get().shutdown();
+            }
         } finally {
-            innerClean();
-            doClean();
+            PipelineJobProgressPersistService.remove(jobId);
+            tasksRunners.values().stream().map(each -> 
each.getJobItemContext().getJobProcessContext()).forEach(QuietlyCloser::close);
+            clean();
         }
     }
     
-    private void innerStop() {
-        stopping.set(true);
-        log.info("stop tasks runner, jobId={}", jobId);
-        for (PipelineTasksRunner each : tasksRunners.values()) {
-            each.stop();
+    private void awaitJobStopped(final String jobId) {
+        Optional<ElasticJobListener> jobListener = 
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, 
PipelineElasticJobListener.class.getName());
+        if (!jobListener.isPresent()) {
+            return;
         }
-        Optional<ElasticJobListener> pipelineJobListener = 
ElasticJobServiceLoader.getCachedTypedServiceInstance(ElasticJobListener.class, 
PipelineElasticJobListener.class.getName());
-        pipelineJobListener.ifPresent(optional -> 
awaitJobStopped((PipelineElasticJobListener) optional, jobId, 
TimeUnit.SECONDS.toMillis(2)));
-        if (null != jobBootstrap.get()) {
-            jobBootstrap.get().shutdown();
-        }
-    }
-    
-    private void awaitJobStopped(final PipelineElasticJobListener jobListener, 
final String jobId, final long timeoutMillis) {
-        int time = 0;
-        int sleepTime = 50;
-        while (time < timeoutMillis) {
-            if (!jobListener.isJobRunning(jobId)) {
+        long spentMills = 0L;
+        long sleepMillis = 50L;
+        while (spentMills < JOB_WAITING_TIMEOUT_MILLS) {
+            if (!((PipelineElasticJobListener) 
jobListener.get()).isJobRunning(jobId)) {
                 break;
             }
             try {
-                Thread.sleep(sleepTime);
+                Thread.sleep(sleepMillis);
             } catch (final InterruptedException ignored) {
                 Thread.currentThread().interrupt();
                 break;
             }
-            time += sleepTime;
-        }
-    }
-    
-    private void innerClean() {
-        PipelineJobProgressPersistService.remove(jobId);
-        for (PipelineTasksRunner each : tasksRunners.values()) {
-            
QuietlyCloser.close(each.getJobItemContext().getJobProcessContext());
+            spentMills += sleepMillis;
         }
     }
     
-    protected abstract void doClean();
+    protected abstract void clean();
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index b957fb2c7b5..3150895b804 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -170,7 +170,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob implements Simp
     }
     
     @Override
-    protected void doClean() {
+    protected void clean() {
         dataSourceManager.close();
         QuietlyCloser.close(sink);
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index d669eda81e1..5d13dd54240 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -60,6 +60,6 @@ public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob {
     }
     
     @Override
-    protected void doClean() {
+    protected void clean() {
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index b990ef1a620..5a204212d5b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -142,7 +142,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob {
     }
     
     @Override
-    public void doClean() {
+    public void clean() {
         dataSourceManager.close();
     }
 }

Reply via email to