This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e51d73d0b2c Improve cleaning on pipeline job stopping (#26839)
e51d73d0b2c is described below

commit e51d73d0b2cf5c6fc704f7c782c1d43e1589d891
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Jul 8 18:10:51 2023 +0800

    Improve cleaning on pipeline job stopping (#26839)
---
 .../data/pipeline/core/job/AbstractPipelineJob.java      |  9 ++++-----
 .../pipeline/core/job/AbstractSimplePipelineJob.java     | 15 ---------------
 .../data/pipeline/cdc/core/job/CDCJob.java               | 16 +---------------
 .../data/pipeline/cases/PipelineContainerComposer.java   |  2 +-
 4 files changed, 6 insertions(+), 36 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index a74f5aa1bf8..b071052777d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -37,7 +38,6 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
@@ -121,10 +121,6 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         return new ArrayList<>(tasksRunnerMap.keySet());
     }
     
-    protected Collection<PipelineTasksRunner> getTasksRunners() {
-        return Collections.unmodifiableCollection(tasksRunnerMap.values());
-    }
-    
     protected boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
         if (null != tasksRunnerMap.putIfAbsent(shardingItem, tasksRunner)) {
             log.warn("shardingItem {} tasks runner exists, ignore", 
shardingItem);
@@ -178,6 +174,9 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     
     private void innerClean() {
         PipelineJobProgressPersistService.remove(jobId);
+        for (PipelineTasksRunner each : tasksRunnerMap.values()) {
+            
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
+        }
     }
     
     protected abstract void doClean();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 2aba307d8c3..7480585c916 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.common.util.CloseUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
@@ -46,14 +45,6 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        try {
-            execute0(shardingContext);
-        } finally {
-            clean();
-        }
-    }
-    
-    private void execute0(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
@@ -71,10 +62,4 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
-    
-    private void clean() {
-        for (PipelineTasksRunner each : getTasksRunners()) {
-            
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
-        }
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 1d7ee243f9a..c62bdfe7032 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -23,8 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
+import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.swapper.YamlCDCJobConfigurationSwapper;
@@ -73,14 +73,6 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        try {
-            execute0(shardingContext);
-        } finally {
-            clean();
-        }
-    }
-    
-    private void execute0(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         log.info("Execute job {}", jobId);
         CDCJobConfiguration jobConfig = new 
YamlCDCJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
@@ -108,12 +100,6 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         executeIncrementalTasks(jobItemContexts);
     }
     
-    private void clean() {
-        for (PipelineTasksRunner each : getTasksRunners()) {
-            
CloseUtils.closeQuietly(each.getJobItemContext().getJobProcessContext());
-        }
-    }
-    
     private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
         Optional<InventoryIncrementalJobItemProgress> initProgress = 
jobAPI.getJobItemProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index dcd6bdcfc40..6c50af1aaa7 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -458,7 +458,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             Set<String> actualStatus = new HashSet<>();
             Collection<Integer> incrementalIdleSecondsList = new 
LinkedList<>();
             for (Map<String, Object> each : listJobStatus) {
-                
assertTrue(Strings.isNullOrEmpty(each.get("error_message").toString()), 
"error_message is not null");
+                assertTrue(Strings.isNullOrEmpty((String) 
each.get("error_message")), "error_message: `" + each.get("error_message") + 
"`");
                 actualStatus.add(each.get("status").toString());
                 String incrementalIdleSeconds = 
each.get("incremental_idle_seconds").toString();
                 
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));

Reply via email to