This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fbf06128641 Improve pipeline job failure status persistence and usage 
(#24745)
fbf06128641 is described below

commit fbf06128641ae784bf29fe6934ae04361da08a45
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Mar 22 19:49:50 2023 +0800

    Improve pipeline job failure status persistence and usage (#24745)
    
    * Return inventory position if it's present in InventoryTaskSplitter
    
    * Remove FAILURE job item progress persistence
    
    * Only prepareAndCheckTarget when initProgress is null in 
MigrationJobPreparer
---
 .../apache/shardingsphere/data/pipeline/api/job/JobStatus.java    | 1 +
 .../data/pipeline/core/job/AbstractPipelineJob.java               | 3 ---
 .../data/pipeline/core/prepare/InventoryTaskSplitter.java         | 8 +++++---
 .../data/pipeline/core/task/InventoryIncrementalTasksRunner.java  | 2 --
 .../consistencycheck/task/ConsistencyCheckTasksRunner.java        | 2 --
 .../pipeline/scenario/migration/prepare/MigrationJobPreparer.java | 4 ++--
 .../test/e2e/data/pipeline/cases/PipelineContainerComposer.java   | 3 ---
 7 files changed, 8 insertions(+), 15 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
index d31c2505715..d7f2cd93e3e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/JobStatus.java
@@ -73,5 +73,6 @@ public enum JobStatus {
      */
     EXECUTE_INCREMENTAL_TASK_FAILURE(false);
     
+    // TODO Remove unused field; Remove unused enum
     private final boolean running;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 3b146248b76..0e636e73067 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,7 +22,6 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
@@ -71,8 +70,6 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
             // CHECKSTYLE:ON
             String jobId = jobItemContext.getJobId();
             log.error("job prepare failed, {}-{}", jobId, 
jobItemContext.getShardingItem(), ex);
-            jobItemContext.setStatus(JobStatus.PREPARING_FAILURE);
-            jobAPI.persistJobItemProgress(jobItemContext);
             jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
             jobAPI.stop(jobId);
             if (ex instanceof RuntimeException) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index ecd85d92b3d..2b750b64ed5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -30,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.NoUniqueKeyPo
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.UnsupportedKeyPosition;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -144,9 +143,12 @@ public final class InventoryTaskSplitter {
     private Collection<IngestPosition<?>> getInventoryPositions(final 
InventoryIncrementalJobItemContext jobItemContext, final 
InventoryDumperConfiguration dumperConfig,
                                                                 final 
DataSource dataSource) {
         InventoryIncrementalJobItemProgress initProgress = 
jobItemContext.getInitProgress();
-        if (null != initProgress && initProgress.getStatus() != 
JobStatus.PREPARING_FAILURE) {
+        if (null != initProgress) {
             // Do NOT filter FinishedPosition here, since whole inventory 
tasks are required in job progress when persisting to register center.
-            return 
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
+            Collection<IngestPosition<?>> result = 
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
+            if (!result.isEmpty()) {
+                return result;
+            }
         }
         if (!dumperConfig.hasUniqueKey()) {
             return getPositionWithoutUniqueKey(jobItemContext, dataSource, 
dumperConfig);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
index a26d40ab923..0834db969a9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryIncrementalTasksRunner.java
@@ -141,7 +141,6 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         @Override
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, inventory task execute failed.", throwable);
-            
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE);
             String jobId = jobItemContext.getJobId();
             jobAPI.persistJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             jobAPI.stop(jobId);
@@ -158,7 +157,6 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         @Override
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
-            
updateLocalAndRemoteJobItemStatus(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE);
             String jobId = jobItemContext.getJobId();
             jobAPI.persistJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             jobAPI.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 91a629c9ec6..0850dbbbb2d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -141,8 +141,6 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
             checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
-            jobItemContext.setStatus(JobStatus.CONSISTENCY_CHECK_FAILURE);
-            checkJobAPI.persistJobItemProgress(jobItemContext);
             checkJobAPI.stop(checkJobId);
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index 7b22e4b3e61..b4b3b0417bb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -115,7 +115,7 @@ public final class MigrationJobPreparer {
                 lockContext.unlock(lockDefinition);
             }
         } else {
-            log.warn("jobId={}, shardingItem={} try lock failed", 
jobConfig.getJobId(), jobItemContext.getShardingItem());
+            log.warn("try lock failed, jobId={}, shardingItem={}", 
jobConfig.getJobId(), jobItemContext.getShardingItem());
         }
     }
     
@@ -124,7 +124,7 @@ public final class MigrationJobPreparer {
             prepareTarget(jobItemContext);
         }
         InventoryIncrementalJobItemProgress initProgress = 
jobItemContext.getInitProgress();
-        if (null == initProgress || initProgress.getStatus() == 
JobStatus.PREPARING_FAILURE) {
+        if (null == initProgress) {
             PipelineDataSourceWrapper targetDataSource = 
((PipelineDataSourceManager) 
jobItemContext.getImporterConnector().getConnector())
                     
.getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
             
PipelineJobPreparerUtils.checkTargetDataSource(jobItemContext.getJobConfig().getTargetDatabaseType(),
 jobItemContext.getTaskConfig().getImporterConfig(),
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 029ea4e5d65..5a3a9cf812d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -450,9 +450,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
                 String incrementalIdleSeconds = 
each.get("incremental_idle_seconds").toString();
                 
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
             }
-            
assertFalse(actualStatus.contains(JobStatus.PREPARING_FAILURE.name()), "status 
is JobStatus.PREPARING_FAILURE");
-            
assertFalse(actualStatus.contains(JobStatus.EXECUTE_INVENTORY_TASK_FAILURE.name()),
 "status is JobStatus.EXECUTE_INVENTORY_TASK_FAILURE");
-            
assertFalse(actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE.name()),
 "status is JobStatus.EXECUTE_INCREMENTAL_TASK_FAILURE");
             if (Collections.min(incrementalIdleSecondsList) <= 5) {
                 Thread.sleep(3000L);
                 continue;

Reply via email to