This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 99278110ab9 Add PipelineJobIteErrorMessageManager (#29102)
99278110ab9 is described below

commit 99278110ab91b76e8cda902ecc018eece2a04023
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 16:44:20 2023 +0800

    Add PipelineJobIteErrorMessageManager (#29102)
---
 .../core/job/AbstractSimplePipelineJob.java        | 15 +++--
 .../service/InventoryIncrementalJobManager.java    |  5 +-
 .../service/PipelineJobIteErrorMessageManager.java | 72 ++++++++++++++++++++++
 .../core/job/service/PipelineJobItemManager.java   | 42 +------------
 .../runner/InventoryIncrementalTasksRunner.java    |  5 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  7 ++-
 .../api/impl/ConsistencyCheckJobAPI.java           |  3 +-
 .../task/ConsistencyCheckTasksRunner.java          |  3 +-
 8 files changed, 93 insertions(+), 59 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 36899a266c8..5931c3b681e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -49,7 +49,6 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     @Override
     public void execute(final ShardingContext shardingContext) {
         PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
-        PipelineJobItemManager<?> jobItemManager = new 
PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
         log.info("Execute job {}-{}", jobId, shardingItem);
@@ -59,31 +58,31 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         }
         try {
             PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
-            execute0(jobItemManager, jobItemContext);
+            execute0(jobItemContext);
             // CHECKSTYLE:OFF
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
-            processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
+            processFailed(jobManager, jobId, shardingItem, ex);
             throw ex;
         }
     }
     
-    private void execute0(final PipelineJobItemManager<?> jobItemManager, 
final PipelineJobItemContext jobItemContext) {
+    private void execute0(final PipelineJobItemContext jobItemContext) {
         String jobId = jobItemContext.getJobId();
         int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
-        jobItemManager.cleanErrorMessage(jobId, shardingItem);
+        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).cleanErrorMessage();
         prepare(jobItemContext);
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
     
-    private void processFailed(final PipelineJobManager jobManager, final 
PipelineJobItemManager<?> jobItemManager, final String jobId, final int 
shardingItem, final Exception ex) {
+    private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
+        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).updateErrorMessage(ex);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 4e754a5d453..7e41b403aac 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -80,12 +80,11 @@ public final class InventoryIncrementalJobManager {
         long startTimeMillis = 
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
         Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = 
getJobProgress(jobConfig);
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
-        PipelineJobItemManager<InventoryIncrementalJobItemProgress> 
jobItemManager = new 
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+        TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
jobAPI.getJobInfo(jobId);
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
-            TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
jobAPI.getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = jobItemManager.getErrorMessage(jobId, 
shardingItem);
+            String errorMessage = new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).getErrorMessage();
             if (null == jobItemProgress) {
                 result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
                 continue;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
new file mode 100644
index 00000000000..a20cd2e7a4f
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job item error message manager.
+ */
+public final class PipelineJobIteErrorMessageManager {
+    
+    private final String jobId;
+    
+    private final int shardingItem;
+    
+    private final GovernanceRepositoryAPI governanceRepositoryAPI;
+    
+    public PipelineJobIteErrorMessageManager(final String jobId, final int 
shardingItem) {
+        this.jobId = jobId;
+        this.shardingItem = shardingItem;
+        governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+    }
+    
+    /**
+     * Get job item error message.
+     *
+     * @return map, key is sharding item, value is error message
+     */
+    public String getErrorMessage() {
+        return 
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId, 
shardingItem)).orElse("");
+    }
+    
+    /**
+     * Update job item error message.
+     *
+     * @param error error
+     */
+    public void updateErrorMessage(final Object error) {
+        
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
 shardingItem), null == error ? "" : buildErrorMessage(error));
+    }
+    
+    private String buildErrorMessage(final Object error) {
+        return error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+    }
+    
+    /**
+     * Clean job item error message.
+     */
+    public void cleanErrorMessage() {
+        
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
 shardingItem), "");
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 9aa4a01cbb3..c6e6ec47e5a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,11 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
@@ -30,7 +28,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import java.util.Optional;
 
 /**
- * Pipeline job manager.
+ * Pipeline job item manager.
  * 
  * @param <T> type of pipeline job item progress
  */
@@ -96,42 +94,4 @@ public final class PipelineJobItemManager<T extends 
PipelineJobItemProgress> {
     private String convertProgressYamlContent(final PipelineJobItemContext 
jobItemContext) {
         return YamlEngine.marshal(swapper.swapToYamlConfiguration((T) 
jobItemContext.toProgress()));
     }
-    
-    /**
-     * Get job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @return map, key is sharding item, value is error message
-     */
-    public String getErrorMessage(final String jobId, final int shardingItem) {
-        return 
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
 shardingItem)).orElse("");
-    }
-    
-    /**
-     * Update job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     * @param error error
-     */
-    public void updateErrorMessage(final String jobId, final int shardingItem, 
final Object error) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        String value = "";
-        if (null != error) {
-            value = error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
-        }
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
 value);
-    }
-    
-    /**
-     * Clean job item error message.
-     *
-     * @param jobId job id
-     * @param shardingItem sharding item
-     */
-    public void cleanErrorMessage(final String jobId, final int shardingItem) {
-        String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem);
-        
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
 "");
-    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 3935bec8106..13b9b2477af 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -152,7 +153,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     protected void inventoryFailureCallback(final Throwable throwable) {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
-        jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+        new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
@@ -187,7 +188,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
             try {
                 jobManager.stop(jobId);
             } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 0f6b66fa7c1..70ead420a66 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,6 +41,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -93,7 +94,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
-            jobItemManager.cleanErrorMessage(jobId, shardingItem);
+            new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).cleanErrorMessage();
             log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
@@ -127,7 +128,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
+        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).updateErrorMessage(ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
@@ -204,7 +205,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
-            jobItemManager.updateErrorMessage(jobId, 
jobItemContext.getShardingItem(), throwable);
+            new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
             if (jobItemContext.getSink() instanceof CDCSocketSink) {
                 CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index cf166153d54..f9c1db64a65 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -231,7 +232,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
-        result.setErrorMessage(new 
PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId,
 0));
+        result.setErrorMessage(new 
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
         Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
         result.setCheckSuccess(checkJobResults.isEmpty() ? null : 
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
         
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 136e92b3dc5..e75f71f126d 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -33,6 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -151,7 +152,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 return;
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
-            jobItemManager.updateErrorMessage(checkJobId, 0, throwable);
+            new PipelineJobIteErrorMessageManager(checkJobId, 
0).updateErrorMessage(throwable);
             jobManager.stop(checkJobId);
         }
     }

Reply via email to