This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b19fe744e4 Break success process if job is stopping (#28476)
3b19fe744e4 is described below

commit 3b19fe744e41698f4a5fd834bb13ca9ded82ac4e
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Sep 20 19:57:19 2023 +0800

    Break success process if job is stopping (#28476)
    
    * Break success process if job is stopping
    
    * Add run_count in job props
    
    * Update IncrementalExecuteCallback
    
    * Update unit test
    
    * Remove log
---
 .../pipeline/core/importer/sink/PipelineDataSourceSink.java   |  7 +------
 .../core/job/service/impl/AbstractPipelineJobAPIImpl.java     | 10 +++++++++-
 .../core/task/runner/InventoryIncrementalTasksRunner.java     |  4 ++++
 .../shardingsphere/data/pipeline/cdc/core/job/CDCJob.java     |  4 ++++
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java    | 11 +++++++++--
 .../consistency/MigrationDataConsistencyCheckerTest.java      |  6 +++---
 6 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index b193b5324f5..ca3a5165184 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -43,13 +43,11 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 /**
  * Pipeline data source sink.
@@ -274,10 +272,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
                 }
                 preparedStatement.addBatch();
             }
-            int[] counts = preparedStatement.executeBatch();
-            if (IntStream.of(counts).anyMatch(value -> 1 != value)) {
-                log.warn("batchDelete failed, counts={}, sql={}, 
dataRecords={}", Arrays.toString(counts), deleteSQL, dataRecords);
-            }
+            preparedStatement.executeBatch();
         } finally {
             batchDeleteStatement.set(null);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index bb32906c9d0..4978dbd6192 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -107,6 +107,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         String createTimeFormat = 
LocalDateTime.now().format(DATE_TIME_FORMATTER);
         result.getProps().setProperty("create_time", createTimeFormat);
         result.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
+        result.getProps().setProperty("run_count", "1");
         
result.setJobListenerTypes(Collections.singletonList(PipelineElasticJobListener.class.getName()));
         return result;
     }
@@ -125,6 +126,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         jobConfigPOJO.getProps().setProperty("start_time_millis", 
String.valueOf(System.currentTimeMillis()));
         jobConfigPOJO.getProps().remove("stop_time");
         jobConfigPOJO.getProps().remove("stop_time_millis");
+        jobConfigPOJO.getProps().setProperty("run_count", 
String.valueOf(Integer.parseInt(jobConfigPOJO.getProps().getProperty("run_count",
 "0")) + 1));
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         pipelineDistributedBarrier.register(barrierEnablePath, 
jobConfigPOJO.getShardingTotalCount());
         
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).updateJobConfiguration(jobConfigPOJO);
@@ -154,7 +156,13 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
         
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).deleteJob(jobId);
     }
     
-    protected final JobConfigurationPOJO getElasticJobConfigPOJO(final String 
jobId) {
+    /**
+     * Get ElasticJob configuration POJO.
+     *
+     * @param jobId job id
+     * @return ElasticJob configuration POJO
+     */
+    public final JobConfigurationPOJO getElasticJobConfigPOJO(final String 
jobId) {
         JobConfigurationPOJO result = 
PipelineAPIFactory.getJobConfigurationAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobConfiguration(jobId);
         ShardingSpherePreconditions.checkNotNull(result, () -> new 
PipelineJobNotFoundException(jobId));
         return result;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 755a049cf12..08415b7ecba 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -149,6 +149,10 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         public void onSuccess() {
+            if (jobItemContext.isStopping()) {
+                log.info("Inventory task onSuccess, stopping true, ignore");
+                return;
+            }
             inventorySuccessCallback();
         }
         
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 697fc3b9787..5f4edfc4bda 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -190,6 +190,10 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         
         @Override
         public void onSuccess() {
+            if (jobItemContext.isStopping()) {
+                log.info("onSuccess, stopping true, ignore");
+                return;
+            }
             log.info("onSuccess, all {} tasks finished.", identifier);
         }
         
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 4a5a3fe622c..a55671d4a18 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -101,8 +101,11 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                         parentJobConfig, 
jobAPI.buildPipelineProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
                 Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
-                log.info("job {} with check algorithm '{}' data consistency 
checker result: {}", parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap);
-                
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
+                log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
+                        parentJobId, checkJobConfig.getAlgorithmTypeName(), 
checkResultMap, jobItemContext.isStopping());
+                if (!jobItemContext.isStopping()) {
+                    
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(parentJobId)).persistCheckJobResult(parentJobId,
 checkJobId, checkResultMap);
+                }
             } finally {
                 
jobItemContext.getProgressContext().setCheckEndTimeMillis(System.currentTimeMillis());
             }
@@ -121,6 +124,10 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
         
         @Override
         public void onSuccess() {
+            if (jobItemContext.isStopping()) {
+                log.info("onSuccess, stopping true, ignore");
+                return;
+            }
             log.info("onSuccess, check job id: {}, parent job id: {}", 
checkJobId, parentJobId);
             jobItemContext.setStatus(JobStatus.FINISHED);
             checkJobAPI.persistJobItemProgress(jobItemContext);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
index 3862d8c061b..ccafd074e0f 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyCheckerTest.java
@@ -64,15 +64,15 @@ class MigrationDataConsistencyCheckerTest {
         
governanceRepositoryAPI.persist(String.format("/pipeline/jobs/%s/config", 
jobConfig.getJobId()), YamlEngine.marshal(jobConfigurationPOJO));
         governanceRepositoryAPI.persistJobItemProgress(jobConfig.getJobId(), 
0, "");
         Map<String, TableDataConsistencyCheckResult> actual = new 
MigrationDataConsistencyChecker(jobConfig, new 
MigrationProcessContext(jobConfig.getJobId(), null),
-                
createConsistencyCheckJobItemProgressContext()).check("FIXTURE", null);
+                
createConsistencyCheckJobItemProgressContext(jobConfig.getJobId())).check("FIXTURE",
 null);
         String checkKey = "t_order";
         assertTrue(actual.get(checkKey).getCountCheckResult().isMatched());
         
assertThat(actual.get(checkKey).getCountCheckResult().getSourceRecordsCount(), 
is(actual.get(checkKey).getCountCheckResult().getTargetRecordsCount()));
         assertTrue(actual.get(checkKey).getContentCheckResult().isMatched());
     }
     
-    private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext() {
-        return new ConsistencyCheckJobItemProgressContext("", 0, "H2");
+    private ConsistencyCheckJobItemProgressContext 
createConsistencyCheckJobItemProgressContext(final String jobId) {
+        return new ConsistencyCheckJobItemProgressContext(jobId, 0, "H2");
     }
     
     private MigrationJobConfiguration createJobConfiguration() throws 
SQLException {

Reply via email to