This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e4b3ab0e2 Improve stop pipeline job, extract method to the upper 
level (#21750)
32e4b3ab0e2 is described below

commit 32e4b3ab0e2cc3e4fd090491450455a29b3b49d6
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Oct 26 13:21:33 2022 +0800

    Improve stop pipeline job, extract method to the upper level (#21750)
    
    * Improve job sync to stop, extract method to the upper level
    
    * revise comment
    
    * Fix divide zero
    
    * Fix codestyle
    
    * Fix unexpected error
    
    * Improve
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  2 +-
 .../pipeline/core/job/AbstractPipelineJob.java     | 27 ++++++++++++++++++++++
 .../data/pipeline/core/task/IncrementalTask.java   |  6 +++--
 .../core/util/PipelineDistributedBarrier.java      |  4 ++++
 ...tencyCheckChangedJobConfigurationProcessor.java |  5 ++++
 .../consistencycheck/ConsistencyCheckJob.java      | 17 ++++----------
 .../ConsistencyCheckJobAPIImpl.java                |  2 +-
 .../MigrationChangedJobConfigurationProcessor.java |  5 ++++
 .../pipeline/scenario/migration/MigrationJob.java  | 18 ++++-----------
 9 files changed, 55 insertions(+), 31 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index ff09f119145..c2d64115917 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -140,7 +140,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         jobConfigPOJO.getProps().setProperty("stop_time_millis", 
System.currentTimeMillis() + "");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
         String barrierPath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
-        pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        pipelineDistributedBarrier.register(barrierPath, 1);
         pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index c29e4310a55..3b8d5c6f628 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -17,12 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,8 +47,11 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     @Setter
     private volatile JobBootstrap jobBootstrap;
     
+    @Getter(value = AccessLevel.PRIVATE)
     private final Map<Integer, PipelineTasksRunner> tasksRunnerMap = new 
ConcurrentHashMap<>();
     
+    private final PipelineDistributedBarrier distributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
     protected void runInBackground(final Runnable runnable) {
         new Thread(runnable).start();
     }
@@ -52,4 +60,23 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
         return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
     }
+    
+    protected void addTaskRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
+        tasksRunnerMap.put(shardingItem, tasksRunner);
+        
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
+        
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
+    }
+    
+    protected boolean containsTaskRunner(final int shardingItem) {
+        return tasksRunnerMap.containsKey(shardingItem);
+    }
+    
+    protected void clearTaskRunner() {
+        tasksRunnerMap.clear();
+        
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+    }
+    
+    protected Collection<PipelineTasksRunner> getTaskRunners() {
+        return tasksRunnerMap.values();
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 98de46f4a8a..c30f072def6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -42,6 +42,7 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDump
 
 import java.util.Collection;
 import java.util.LinkedList;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -82,8 +83,9 @@ public final class IncrementalTask implements PipelineTask, 
AutoCloseable {
     private IncrementalTaskProgress createIncrementalTaskProgress(final 
IngestPosition<?> position, final InventoryIncrementalJobItemProgress 
jobItemProgress) {
         IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress();
         incrementalTaskProgress.setPosition(position);
-        if (null != jobItemProgress) {
-            
incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay());
+        if (null != jobItemProgress && null != 
jobItemProgress.getIncremental()) {
+            
Optional.ofNullable(jobItemProgress.getIncremental().getIncrementalTaskProgress())
+                    .ifPresent(optional -> 
incrementalTaskProgress.setIncrementalTaskDelay(jobItemProgress.getIncremental().getIncrementalTaskProgress().getIncrementalTaskDelay()));
         }
         return incrementalTaskProgress;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 0e74180f818..c856a565e7c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -84,6 +84,10 @@ public final class PipelineDistributedBarrier {
      * @param shardingItem sharding item
      */
     public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        if (!getRepository().isExisted(parentPath)) {
+            log.info("parent path {} not exist, ignore", parentPath);
+            return;
+        }
         String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
         getRepository().delete(key);
         getRepository().persistEphemeral(key, "");
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
index 406fe64310c..e2e5564fa34 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckChangedJobConfigurationProcessor.java
@@ -22,7 +22,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.event.handler.PipelineChangedJobConfigurationProcessor;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
@@ -35,12 +37,15 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class ConsistencyCheckChangedJobConfigurationProcessor implements 
PipelineChangedJobConfigurationProcessor {
     
+    private final PipelineDistributedBarrier distributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
     @Override
     public void process(final DataChangedEvent.Type eventType, final 
JobConfigurationPOJO jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             log.info("{} is disabled", jobId);
             PipelineJobCenter.stop(jobId);
+            
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 0);
             return;
         }
         switch (eventType) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 5cc1b0a5066..7012b4690f7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -23,9 +23,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlConsistencyCheckJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -38,8 +35,6 @@ public final class ConsistencyCheckJob extends 
AbstractPipelineJob implements Si
     
     private final ConsistencyCheckJobAPI jobAPI = 
ConsistencyCheckJobAPIFactory.getInstance();
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
-    
     @Override
     public void execute(final ShardingContext shardingContext) {
         String checkJobId = shardingContext.getJobName();
@@ -52,7 +47,7 @@ public final class ConsistencyCheckJob extends 
AbstractPipelineJob implements Si
         setJobId(checkJobId);
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         ConsistencyCheckJobItemContext jobItemContext = new 
ConsistencyCheckJobItemContext(jobConfig, shardingItem, JobStatus.RUNNING);
-        if (getTasksRunnerMap().containsKey(shardingItem)) {
+        if (containsTaskRunner(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", 
shardingItem);
             return;
         }
@@ -60,8 +55,7 @@ public final class ConsistencyCheckJob extends 
AbstractPipelineJob implements Si
         jobAPI.cleanJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
         ConsistencyCheckTasksRunner tasksRunner = new 
ConsistencyCheckTasksRunner(jobItemContext);
         tasksRunner.start();
-        
PipelineJobProgressPersistService.addJobProgressPersistContext(checkJobId, 
shardingContext.getShardingItem());
-        getTasksRunnerMap().put(shardingItem, tasksRunner);
+        addTaskRunner(shardingItem, tasksRunner);
     }
     
     @Override
@@ -74,12 +68,9 @@ public final class ConsistencyCheckJob extends 
AbstractPipelineJob implements Si
             log.info("stop consistency check job, jobId is null, ignore");
             return;
         }
-        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+        for (PipelineTasksRunner each : getTaskRunners()) {
             each.stop();
         }
-        getTasksRunnerMap().clear();
-        String jobBarrierDisablePath = 
PipelineMetaDataNode.getJobBarrierDisablePath(getJobId());
-        
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 
0);
-        
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
+        clearTaskRunner();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
index 6c7dad4eca7..c2b4ba02466 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobAPIImpl.java
@@ -194,7 +194,7 @@ public final class ConsistencyCheckJobAPIImpl extends 
AbstractPipelineJobAPIImpl
             result.setRemainingSeconds(0L);
         } else {
             long checkedRecordsCount = 
Math.min(jobItemProgress.getCheckedRecordsCount(), recordsCount);
-            result.setFinishedPercentage((int) (checkedRecordsCount * 100 / 
recordsCount));
+            result.setFinishedPercentage(0 == recordsCount ? 0 : (int) 
(checkedRecordsCount * 100 / recordsCount));
             JobConfigurationPOJO jobConfigPOJO = 
getElasticJobConfigPOJO(checkJobId);
             Long stopTimeMillis = jobConfigPOJO.isDisabled() ? 
Long.parseLong(jobConfigPOJO.getProps().getProperty("stop_time_millis")) : null;
             long durationMillis = (null != stopTimeMillis ? stopTimeMillis : 
System.currentTimeMillis()) - jobItemProgress.getCheckBeginTimeMillis();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
index b0429f58cd2..532a9a86520 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationChangedJobConfigurationProcessor.java
@@ -18,6 +18,8 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobType;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -36,12 +38,15 @@ import java.util.concurrent.CompletableFuture;
 @Slf4j
 public final class MigrationChangedJobConfigurationProcessor implements 
PipelineChangedJobConfigurationProcessor {
     
+    private final PipelineDistributedBarrier distributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
     @Override
     public void process(final Type eventType, final JobConfigurationPOJO 
jobConfigPOJO) {
         String jobId = jobConfigPOJO.getJobName();
         if (jobConfigPOJO.isDisabled()) {
             log.info("{} is disabled", jobId);
             PipelineJobCenter.stop(jobId);
+            
distributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId),
 0);
             return;
         }
         switch (eventType) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 0792434e09f..7c354ad2b87 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -27,10 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -48,8 +45,6 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
-    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
-    
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
@@ -67,7 +62,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
         MigrationProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         MigrationJobItemContext jobItemContext = new 
MigrationJobItemContext(jobConfig, shardingItem, initProgress, 
jobProcessContext, taskConfig, dataSourceManager);
-        if (getTasksRunnerMap().containsKey(shardingItem)) {
+        if (containsTaskRunner(shardingItem)) {
             log.warn("tasksRunnerMap contains shardingItem {}, ignore", 
shardingItem);
             return;
         }
@@ -78,9 +73,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
             prepare(jobItemContext);
             tasksRunner.start();
         });
-        getTasksRunnerMap().put(shardingItem, tasksRunner);
-        
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
-        
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getJobBarrierEnablePath(getJobId()),
 shardingItem);
+        addTaskRunner(shardingItem, tasksRunner);
     }
     
     private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -118,12 +111,9 @@ public final class MigrationJob extends 
AbstractPipelineJob implements SimpleJob
             return;
         }
         log.info("stop tasks runner, jobId={}", jobId);
-        String jobBarrierDisablePath = 
PipelineMetaDataNode.getJobBarrierDisablePath(jobId);
-        for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
+        for (PipelineTasksRunner each : getTaskRunners()) {
             each.stop();
-            
pipelineDistributedBarrier.persistEphemeralChildrenNode(jobBarrierDisablePath, 
each.getJobItemContext().getShardingItem());
         }
-        getTasksRunnerMap().clear();
-        
PipelineJobProgressPersistService.removeJobProgressPersistContext(jobId);
+        clearTaskRunner();
     }
 }

Reply via email to