This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 99278110ab9 Add PipelineJobIteErrorMessageManager (#29102)
99278110ab9 is described below
commit 99278110ab91b76e8cda902ecc018eece2a04023
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Nov 21 16:44:20 2023 +0800
Add PipelineJobIteErrorMessageManager (#29102)
---
.../core/job/AbstractSimplePipelineJob.java | 15 +++--
.../service/InventoryIncrementalJobManager.java | 5 +-
.../service/PipelineJobIteErrorMessageManager.java | 72 ++++++++++++++++++++++
.../core/job/service/PipelineJobItemManager.java | 42 +------------
.../runner/InventoryIncrementalTasksRunner.java | 5 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 7 ++-
.../api/impl/ConsistencyCheckJobAPI.java | 3 +-
.../task/ConsistencyCheckTasksRunner.java | 3 +-
8 files changed, 93 insertions(+), 59 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 36899a266c8..5931c3b681e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -49,7 +49,6 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
@Override
public void execute(final ShardingContext shardingContext) {
PipelineJobManager jobManager = new PipelineJobManager(getJobAPI());
- PipelineJobItemManager<?> jobItemManager = new
PipelineJobItemManager<>(getJobAPI().getYamlJobItemProgressSwapper());
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
log.info("Execute job {}-{}", jobId, shardingItem);
@@ -59,31 +58,31 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
}
try {
PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
- execute0(jobItemManager, jobItemContext);
+ execute0(jobItemContext);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
- processFailed(jobManager, jobItemManager, jobId, shardingItem, ex);
+ processFailed(jobManager, jobId, shardingItem, ex);
throw ex;
}
}
- private void execute0(final PipelineJobItemManager<?> jobItemManager,
final PipelineJobItemContext jobItemContext) {
+ private void execute0(final PipelineJobItemContext jobItemContext) {
String jobId = jobItemContext.getJobId();
int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
- jobItemManager.cleanErrorMessage(jobId, shardingItem);
+ new PipelineJobIteErrorMessageManager(jobId,
shardingItem).cleanErrorMessage();
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
- private void processFailed(final PipelineJobManager jobManager, final
PipelineJobItemManager<?> jobItemManager, final String jobId, final int
shardingItem, final Exception ex) {
+ private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
+ new PipelineJobIteErrorMessageManager(jobId,
shardingItem).updateErrorMessage(ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 4e754a5d453..7e41b403aac 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -80,12 +80,11 @@ public final class InventoryIncrementalJobManager {
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
- PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
+ TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
jobAPI.getJobInfo(jobId);
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
- TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
jobAPI.getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage = jobItemManager.getErrorMessage(jobId,
shardingItem);
+ String errorMessage = new PipelineJobIteErrorMessageManager(jobId,
shardingItem).getErrorMessage();
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTable(), null, startTimeMillis, 0, errorMessage));
continue;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
new file mode 100644
index 00000000000..a20cd2e7a4f
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
+import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+
+import java.util.Optional;
+
+/**
+ * Pipeline job item error message manager.
+ */
+public final class PipelineJobIteErrorMessageManager {
+
+ private final String jobId;
+
+ private final int shardingItem;
+
+ private final GovernanceRepositoryAPI governanceRepositoryAPI;
+
+ public PipelineJobIteErrorMessageManager(final String jobId, final int
shardingItem) {
+ this.jobId = jobId;
+ this.shardingItem = shardingItem;
+ governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId));
+ }
+
+ /**
+ * Get job item error message.
+ *
+ * @return map, key is sharding item, value is error message
+ */
+ public String getErrorMessage() {
+ return
Optional.ofNullable(governanceRepositoryAPI.getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
+ }
+
+ /**
+ * Update job item error message.
+ *
+ * @param error error
+ */
+ public void updateErrorMessage(final Object error) {
+
governanceRepositoryAPI.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), null == error ? "" : buildErrorMessage(error));
+ }
+
+ private String buildErrorMessage(final Object error) {
+ return error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
+ }
+
+ /**
+ * Clean job item error message.
+ */
+ public void cleanErrorMessage() {
+
governanceRepositoryAPI.persist(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), "");
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
index 9aa4a01cbb3..c6e6ec47e5a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobItemManager.java
@@ -17,11 +17,9 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
@@ -30,7 +28,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.util.Optional;
/**
- * Pipeline job manager.
+ * Pipeline job item manager.
*
* @param <T> type of pipeline job item progress
*/
@@ -96,42 +94,4 @@ public final class PipelineJobItemManager<T extends
PipelineJobItemProgress> {
private String convertProgressYamlContent(final PipelineJobItemContext
jobItemContext) {
return YamlEngine.marshal(swapper.swapToYamlConfiguration((T)
jobItemContext.toProgress()));
}
-
- /**
- * Get job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @return map, key is sharding item, value is error message
- */
- public String getErrorMessage(final String jobId, final int shardingItem) {
- return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
- }
-
- /**
- * Update job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- * @param error error
- */
- public void updateErrorMessage(final String jobId, final int shardingItem,
final Object error) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
- String value = "";
- if (null != error) {
- value = error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
- }
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).update(key,
value);
- }
-
- /**
- * Clean job item error message.
- *
- * @param jobId job id
- * @param shardingItem sharding item
- */
- public void cleanErrorMessage(final String jobId, final int shardingItem) {
- String key = PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem);
-
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persist(key,
"");
- }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 3935bec8106..13b9b2477af 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -152,7 +153,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
- jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
@@ -187,7 +188,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
- jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 0f6b66fa7c1..70ead420a66 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,6 +41,7 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -93,7 +94,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
continue;
}
jobItemContexts.add(jobItemContext);
- jobItemManager.cleanErrorMessage(jobId, shardingItem);
+ new PipelineJobIteErrorMessageManager(jobId,
shardingItem).cleanErrorMessage();
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
@@ -127,7 +128,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- jobItemManager.updateErrorMessage(jobId, shardingItem, ex);
+ new PipelineJobIteErrorMessageManager(jobId,
shardingItem).updateErrorMessage(ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
@@ -204,7 +205,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
- jobItemManager.updateErrorMessage(jobId,
jobItemContext.getShardingItem(), throwable);
+ new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index cf166153d54..f9c1db64a65 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -231,7 +232,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
- result.setErrorMessage(new
PipelineJobItemManager<>(getYamlJobItemProgressSwapper()).getErrorMessage(checkJobId,
0));
+ result.setErrorMessage(new
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceRepositoryAPI.getCheckJobResult(parentJobId, checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 136e92b3dc5..e75f71f126d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -33,6 +33,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -151,7 +152,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
- jobItemManager.updateErrorMessage(checkJobId, 0, throwable);
+ new PipelineJobIteErrorMessageManager(checkJobId,
0).updateErrorMessage(throwable);
jobManager.stop(checkJobId);
}
}