This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new fe5096b681a Improve pipeline job item error message persistance 
(#28466)
fe5096b681a is described below

commit fe5096b681aa6be4d152090f2940164e1653602b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Sep 19 19:09:24 2023 +0800

    Improve pipeline job item error message persistance (#28466)
    
    * Re-impl cleanJobItemErrorMessage
    
    * Update persistJobItemErrorMessage impl to updateJobItemErrorMessage
    
    * Remove assertStartWithGetEstimatedRowsFailure
    
    * Update cleanJobItemErrorMessage impl
---
 .../repository/GovernanceRepositoryAPI.java            | 16 ++++++++--------
 .../repository/GovernanceRepositoryAPIImpl.java        | 10 +++++-----
 .../data/pipeline/core/job/AbstractPipelineJob.java    |  2 +-
 .../data/pipeline/core/job/service/PipelineJobAPI.java |  4 ++--
 .../job/service/impl/AbstractPipelineJobAPIImpl.java   |  7 ++++---
 .../task/runner/InventoryIncrementalTasksRunner.java   |  4 ++--
 .../data/pipeline/cdc/core/job/CDCJob.java             |  4 ++--
 .../task/ConsistencyCheckTasksRunner.java              |  2 +-
 .../it/data/pipeline/core/task/InventoryTaskTest.java  | 18 ------------------
 9 files changed, 25 insertions(+), 42 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index dff2369cb60..ccc68d2a091 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -170,6 +170,14 @@ public interface GovernanceRepositoryAPI {
      */
     void persist(String key, String value);
     
+    /**
+     * Update data.
+     *
+     * @param key key of data
+     * @param value value of data
+     */
+    void update(String key, String value);
+    
     /**
      * Get sharding items of job.
      *
@@ -218,12 +226,4 @@ public interface GovernanceRepositoryAPI {
      * @return error msg
      */
     String getJobItemErrorMessage(String jobId, int shardingItem);
-    
-    /**
-     * Clean job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     */
-    void cleanJobItemErrorMessage(String jobId, int shardingItem);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index 1e7b9f50b00..b2bd85fa747 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -155,6 +155,11 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
         repository.persist(key, value);
     }
     
+    @Override
+    public void update(final String key, final String value) {
+        repository.update(key, value);
+    }
+    
     @Override
     public List<Integer> getShardingItems(final String jobId) {
         List<String> result = 
getChildrenKeys(PipelineMetaDataNode.getJobOffsetPath(jobId));
@@ -185,9 +190,4 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     public String getJobItemErrorMessage(final String jobId, final int 
shardingItem) {
         return 
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
     }
-    
-    @Override
-    public void cleanJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        
repository.delete(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index bb8a4a6e051..5fceae71fc5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -107,7 +107,7 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
     protected void processFailed(final PipelineJobItemContext jobItemContext, 
final Exception ex) {
         String jobId = jobItemContext.getJobId();
         log.error("job prepare failed, {}-{}", jobId, 
jobItemContext.getShardingItem(), ex);
-        jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
+        jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
         jobAPI.stop(jobId);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 32cae2477ca..1dc330457f0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -162,13 +162,13 @@ public interface PipelineJobAPI extends TypedSPI {
     String getJobItemErrorMessage(String jobId, int shardingItem);
     
     /**
-     * Persist job item error message.
+     * Update job item error message.
      *
      * @param jobId job id
      * @param shardingItem sharding item
      * @param error error
      */
-    void persistJobItemErrorMessage(String jobId, int shardingItem, Object 
error);
+    void updateJobItemErrorMessage(String jobId, int shardingItem, Object 
error);
     
     /**
      * Clean job item error message.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 55a20a90f2b..bb32906c9d0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -171,17 +171,18 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
     }
     
     @Override
-    public void persistJobItemErrorMessage(final String jobId, final int 
shardingItem, final Object error) {
+    public void updateJobItemErrorMessage(final String jobId, final int 
shardingItem, final Object error) {
         String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
         String value = "";
         if (null != error) {
             value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
         }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 value);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
     }
     
     @Override
     public void cleanJobItemErrorMessage(final String jobId, final int 
shardingItem) {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).cleanJobItemErrorMessage(jobId,
 shardingItem);
+        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
+        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 49dd4ed586e..755a049cf12 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -141,7 +141,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     protected void inventoryFailureCallback(final Throwable throwable) {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
-        jobAPI.persistJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+        jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
         jobAPI.stop(jobId);
     }
     
@@ -169,7 +169,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobAPI.persistJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             jobAPI.stop(jobId);
         }
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index a4f231bc8c5..697fc3b9787 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -124,7 +124,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     protected void processFailed(final PipelineJobItemContext jobItemContext, 
final Exception ex) {
         String jobId = jobItemContext.getJobId();
         log.error("job prepare failed, {}-{}", jobId, 
jobItemContext.getShardingItem(), ex);
-        jobAPI.persistJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
+        jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
@@ -197,7 +197,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobAPI.persistJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            jobAPI.updateJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
             PipelineJobCenter.stop(jobId);
             jobAPI.updateJobConfigurationDisabled(jobId, true);
         }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index bfb36635f80..4a5a3fe622c 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -136,7 +136,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 return;
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
-            checkJobAPI.persistJobItemErrorMessage(checkJobId, 0, throwable);
+            checkJobAPI.updateJobItemErrorMessage(checkJobId, 0, throwable);
             checkJobAPI.stop(checkJobId);
         }
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
index 54fa3ddb6c6..f56ea61452b 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/task/InventoryTaskTest.java
@@ -19,16 +19,12 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.task;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.core.dumper.InventoryDumper;
 import org.apache.shardingsphere.data.pipeline.core.importer.Importer;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -52,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 
 class InventoryTaskTest {
@@ -76,19 +71,6 @@ class InventoryTaskTest {
         taskConfig = 
PipelineContextUtils.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
     }
     
-    @Test
-    void assertStartWithGetEstimatedRowsFailure() {
-        InventoryDumperConfiguration inventoryDumperConfig = 
createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
-        AtomicReference<IngestPosition> position = new 
AtomicReference<>(inventoryDumperConfig.getPosition());
-        PipelineChannel channel = 
PipelineTaskUtils.createInventoryChannel(PipelineContextUtils.getPipelineChannelCreator(),
 100, position);
-        PipelineDataSourceWrapper dataSource = 
DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
-        InventoryTask inventoryTask = new 
InventoryTask(PipelineTaskUtils.generateInventoryTaskId(inventoryDumperConfig),
-                PipelineContextUtils.getExecuteEngine(), 
PipelineContextUtils.getExecuteEngine(),
-                new InventoryDumper(inventoryDumperConfig, channel, 
dataSource, metaDataLoader), mock(Importer.class), position);
-        assertThrows(ExecutionException.class, () -> 
CompletableFuture.allOf(inventoryTask.start().toArray(new 
CompletableFuture[0])).get(10L, TimeUnit.SECONDS));
-    }
-    
     @Test
     void assertGetProgress() throws SQLException, ExecutionException, 
InterruptedException, TimeoutException {
         initTableData(taskConfig.getDumperConfig());

Reply via email to