This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 78fc6318952 Remove PipelineJobIteErrorMessageManager (#29146)
78fc6318952 is described below
commit 78fc63189529f08372b5a66a8189c45c74a9f64f
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 23:56:07 2023 +0800
Remove PipelineJobIteErrorMessageManager (#29146)
---
...ineJobItemErrorMessageGovernanceRepository.java | 21 +++++--
.../core/job/AbstractSimplePipelineJob.java | 6 +-
.../service/InventoryIncrementalJobManager.java | 2 +-
.../service/PipelineJobIteErrorMessageManager.java | 71 ----------------------
.../runner/InventoryIncrementalTasksRunner.java | 6 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 9 +--
.../api/impl/ConsistencyCheckJobAPI.java | 3 +-
.../task/ConsistencyCheckTasksRunner.java | 3 +-
8 files changed, 31 insertions(+), 90 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
index 213302fa88f..a2c20eca007 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -18,9 +18,12 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import java.util.Optional;
+
/**
* Pipeline job item error message governance repository.
*/
@@ -34,10 +37,20 @@ public final class
PipelineJobItemErrorMessageGovernanceRepository {
*
* @param jobId job ID
* @param shardingItem sharding item
- * @param errorMessage error message
+ * @param throwable throwable
+ */
+ public void update(final String jobId, final int shardingItem, final
Throwable throwable) {
+
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), ExceptionUtils.getStackTrace(throwable));
+ }
+
+ /**
+ * Clean job item error message.
+ *
+ * @param jobId job ID
+ * @param shardingItem sharding item
*/
- public void update(final String jobId, final int shardingItem, final
String errorMessage) {
-
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), errorMessage);
+ public void clean(final String jobId, final int shardingItem) {
+
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem), "");
}
/**
@@ -48,6 +61,6 @@ public final class
PipelineJobItemErrorMessageGovernanceRepository {
* @return error msg
*/
public String load(final String jobId, final int shardingItem) {
- return
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem));
+ return
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
shardingItem))).orElse("");
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 5931c3b681e..fa3e33aab75 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -74,7 +74,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
- new PipelineJobIteErrorMessageManager(jobId,
shardingItem).cleanErrorMessage();
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
@@ -82,7 +82,7 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
private void processFailed(final PipelineJobManager jobManager, final
String jobId, final int shardingItem, final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- new PipelineJobIteErrorMessageManager(jobId,
shardingItem).updateErrorMessage(ex);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index ee9f2ef5bc3..afa4fa3e78a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -84,7 +84,7 @@ public final class InventoryIncrementalJobManager {
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
- String errorMessage = new PipelineJobIteErrorMessageManager(jobId,
shardingItem).getErrorMessage();
+ String errorMessage =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId,
shardingItem);
if (null == jobItemProgress) {
result.add(new InventoryIncrementalJobItemInfo(shardingItem,
jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage));
continue;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
deleted file mode 100644
index 1066c68d9f3..00000000000
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-
-import java.util.Optional;
-
-/**
- * Pipeline job item error message manager.
- */
-public final class PipelineJobIteErrorMessageManager {
-
- private final String jobId;
-
- private final int shardingItem;
-
- private final PipelineGovernanceFacade governanceFacade;
-
- public PipelineJobIteErrorMessageManager(final String jobId, final int
shardingItem) {
- this.jobId = jobId;
- this.shardingItem = shardingItem;
- governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
- }
-
- /**
- * Get job item error message.
- *
- * @return map, key is sharding item, value is error message
- */
- public String getErrorMessage() {
- return
Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId,
shardingItem)).orElse("");
- }
-
- /**
- * Update job item error message.
- *
- * @param error error
- */
- public void updateErrorMessage(final Object error) {
- governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, null == error ? "" : buildErrorMessage(error));
- }
-
- private String buildErrorMessage(final Object error) {
- return error instanceof Throwable ?
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
- }
-
- /**
- * Clean job item error message.
- */
- public void cleanErrorMessage() {
- governanceFacade.getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, "");
- }
-}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 13b9b2477af..be8da1323fe 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -30,8 +30,8 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -153,7 +153,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
protected void inventoryFailureCallback(final Throwable throwable) {
log.error("onFailure, inventory task execute failed.", throwable);
String jobId = jobItemContext.getJobId();
- new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
@@ -188,7 +188,7 @@ public class InventoryIncrementalTasksRunner implements
PipelineTasksRunner {
public void onFailure(final Throwable throwable) {
log.error("onFailure, incremental task execute failed.",
throwable);
String jobId = jobItemContext.getJobId();
- new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
try {
jobManager.stop(jobId);
} catch (final PipelineJobNotFoundException ignored) {
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 70ead420a66..62250972e4b 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,7 +41,8 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -94,7 +95,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
continue;
}
jobItemContexts.add(jobItemContext);
- new PipelineJobIteErrorMessageManager(jobId,
shardingItem).cleanErrorMessage();
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
}
if (jobItemContexts.isEmpty()) {
@@ -128,7 +129,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
- new PipelineJobIteErrorMessageManager(jobId,
shardingItem).updateErrorMessage(ex);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}
@@ -205,7 +206,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
public void onFailure(final Throwable throwable) {
log.error("onFailure, {} task execute failed.", identifier,
throwable);
String jobId = jobItemContext.getJobId();
- new PipelineJobIteErrorMessageManager(jobId,
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
jobItemContext.getShardingItem(), throwable);
if (jobItemContext.getSink() instanceof CDCSocketSink) {
CDCSocketSink cdcSink = (CDCSocketSink)
jobItemContext.getSink();
cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("",
"", throwable.getMessage()));
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index a9a846bed49..367b7e19b17 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -232,7 +231,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
- result.setErrorMessage(new
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
+
result.setErrorMessage(PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().load(checkJobId,
0));
Map<String, TableDataConsistencyCheckResult> checkJobResults =
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId,
checkJobId);
result.setCheckSuccess(checkJobResults.isEmpty() ? null :
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index f705d6b01ab..27a54b2e22b 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -153,7 +152,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
return;
}
log.info("onFailure, check job id: {}, parent job id: {}",
checkJobId, parentJobId, throwable);
- new PipelineJobIteErrorMessageManager(checkJobId,
0).updateErrorMessage(throwable);
+
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().update(checkJobId,
0, throwable);
jobManager.stop(checkJobId);
}
}