This is an automated email from the ASF dual-hosted git repository.

jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 78fc6318952 Remove PipelineJobIteErrorMessageManager (#29146)
78fc6318952 is described below

commit 78fc63189529f08372b5a66a8189c45c74a9f64f
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 23 23:56:07 2023 +0800

    Remove PipelineJobIteErrorMessageManager (#29146)
---
 ...ineJobItemErrorMessageGovernanceRepository.java | 21 +++++--
 .../core/job/AbstractSimplePipelineJob.java        |  6 +-
 .../service/InventoryIncrementalJobManager.java    |  2 +-
 .../service/PipelineJobIteErrorMessageManager.java | 71 ----------------------
 .../runner/InventoryIncrementalTasksRunner.java    |  6 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |  9 +--
 .../api/impl/ConsistencyCheckJobAPI.java           |  3 +-
 .../task/ConsistencyCheckTasksRunner.java          |  3 +-
 8 files changed, 31 insertions(+), 90 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
index 213302fa88f..a2c20eca007 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/item/PipelineJobItemErrorMessageGovernanceRepository.java
@@ -18,9 +18,12 @@
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.item;
 
 import lombok.RequiredArgsConstructor;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
 
+import java.util.Optional;
+
 /**
  * Pipeline job item error message governance repository.
  */
@@ -34,10 +37,20 @@ public final class 
PipelineJobItemErrorMessageGovernanceRepository {
      *
      * @param jobId job ID
      * @param shardingItem sharding item
-     * @param errorMessage error message
+     * @param throwable throwable
+     */
+    public void update(final String jobId, final int shardingItem, final 
Throwable throwable) {
+        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), ExceptionUtils.getStackTrace(throwable));
+    }
+    
+    /**
+     * Clean job item error message.
+     *
+     * @param jobId job ID
+     * @param shardingItem sharding item
      */
-    public void update(final String jobId, final int shardingItem, final 
String errorMessage) {
-        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), errorMessage);
+    public void clean(final String jobId, final int shardingItem) {
+        
repository.update(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem), "");
     }
     
     /**
@@ -48,6 +61,6 @@ public final class 
PipelineJobItemErrorMessageGovernanceRepository {
      * @return error msg
      */
     public String load(final String jobId, final int shardingItem) {
-        return 
repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId, 
shardingItem));
+        return 
Optional.ofNullable(repository.getDirectly(PipelineMetaDataNode.getJobItemErrorMessagePath(jobId,
 shardingItem))).orElse("");
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 5931c3b681e..fa3e33aab75 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.core.job;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -74,7 +74,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
-        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).cleanErrorMessage();
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
         prepare(jobItemContext);
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
@@ -82,7 +82,7 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
     
     private void processFailed(final PipelineJobManager jobManager, final 
String jobId, final int shardingItem, final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).updateErrorMessage(ex);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index ee9f2ef5bc3..afa4fa3e78a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -84,7 +84,7 @@ public final class InventoryIncrementalJobManager {
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
-            String errorMessage = new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).getErrorMessage();
+            String errorMessage = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().load(jobId,
 shardingItem);
             if (null == jobItemProgress) {
                 result.add(new InventoryIncrementalJobItemInfo(shardingItem, 
jobInfo.getTableName(), null, startTimeMillis, 0, errorMessage));
                 continue;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
deleted file mode 100644
index 1066c68d9f3..00000000000
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobIteErrorMessageManager.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.job.service;
-
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-
-import java.util.Optional;
-
-/**
- * Pipeline job item error message manager.
- */
-public final class PipelineJobIteErrorMessageManager {
-    
-    private final String jobId;
-    
-    private final int shardingItem;
-    
-    private final PipelineGovernanceFacade governanceFacade;
-    
-    public PipelineJobIteErrorMessageManager(final String jobId, final int 
shardingItem) {
-        this.jobId = jobId;
-        this.shardingItem = shardingItem;
-        governanceFacade = 
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId));
-    }
-    
-    /**
-     * Get job item error message.
-     *
-     * @return map, key is sharding item, value is error message
-     */
-    public String getErrorMessage() {
-        return 
Optional.ofNullable(governanceFacade.getJobItemFacade().getErrorMessage().load(jobId,
 shardingItem)).orElse("");
-    }
-    
-    /**
-     * Update job item error message.
-     *
-     * @param error error
-     */
-    public void updateErrorMessage(final Object error) {
-        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, null == error ? "" : buildErrorMessage(error));
-    }
-    
-    private String buildErrorMessage(final Object error) {
-        return error instanceof Throwable ? 
ExceptionUtils.getStackTrace((Throwable) error) : error.toString();
-    }
-    
-    /**
-     * Clean job item error message.
-     */
-    public void cleanErrorMessage() {
-        governanceFacade.getJobItemFacade().getErrorMessage().update(jobId, 
shardingItem, "");
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
index 13b9b2477af..be8da1323fe 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/InventoryIncrementalTasksRunner.java
@@ -30,8 +30,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNot
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
@@ -153,7 +153,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
     protected void inventoryFailureCallback(final Throwable throwable) {
         log.error("onFailure, inventory task execute failed.", throwable);
         String jobId = jobItemContext.getJobId();
-        new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
         try {
             jobManager.stop(jobId);
         } catch (final PipelineJobNotFoundException ignored) {
@@ -188,7 +188,7 @@ public class InventoryIncrementalTasksRunner implements 
PipelineTasksRunner {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, incremental task execute failed.", 
throwable);
             String jobId = jobItemContext.getJobId();
-            new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
             try {
                 jobManager.stop(jobId);
             } catch (final PipelineJobNotFoundException ignored) {
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 70ead420a66..62250972e4b 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -41,7 +41,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -94,7 +95,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 continue;
             }
             jobItemContexts.add(jobItemContext);
-            new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).cleanErrorMessage();
+            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
             log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         }
         if (jobItemContexts.isEmpty()) {
@@ -128,7 +129,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
         log.error("job execution failed, {}-{}", jobId, shardingItem, ex);
-        new PipelineJobIteErrorMessageManager(jobId, 
shardingItem).updateErrorMessage(ex);
+        
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 shardingItem, ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }
@@ -205,7 +206,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         public void onFailure(final Throwable throwable) {
             log.error("onFailure, {} task execute failed.", identifier, 
throwable);
             String jobId = jobItemContext.getJobId();
-            new PipelineJobIteErrorMessageManager(jobId, 
jobItemContext.getShardingItem()).updateErrorMessage(throwable);
+            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().update(jobId,
 jobItemContext.getShardingItem(), throwable);
             if (jobItemContext.getSink() instanceof CDCSocketSink) {
                 CDCSocketSink cdcSink = (CDCSocketSink) 
jobItemContext.getSink();
                 cdcSink.getChannel().writeAndFlush(CDCResponseUtils.failed("", 
"", throwable.getMessage()));
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index a9a846bed49..367b7e19b17 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedCon
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -232,7 +231,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
         fillInJobItemInfoWithTimes(result, jobItemProgress, jobConfigPOJO);
         
result.setTableNames(Optional.ofNullable(jobItemProgress.getTableNames()).orElse(""));
         fillInJobItemInfoWithCheckAlgorithm(result, checkJobId);
-        result.setErrorMessage(new 
PipelineJobIteErrorMessageManager(checkJobId, 0).getErrorMessage());
+        
result.setErrorMessage(PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().load(checkJobId,
 0));
         Map<String, TableDataConsistencyCheckResult> checkJobResults = 
governanceFacade.getJobFacade().getCheck().getCheckJobResult(parentJobId, 
checkJobId);
         result.setCheckSuccess(checkJobResults.isEmpty() ? null : 
checkJobResults.values().stream().allMatch(TableDataConsistencyCheckResult::isMatched));
         
result.setCheckFailedTableNames(checkJobResults.entrySet().stream().filter(each 
-> !each.getValue().isIgnored() && !each.getValue().isMatched())
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index f705d6b01ab..27a54b2e22b 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobIteErrorMessageManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -153,7 +152,7 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
                 return;
             }
             log.info("onFailure, check job id: {}, parent job id: {}", 
checkJobId, parentJobId, throwable);
-            new PipelineJobIteErrorMessageManager(checkJobId, 
0).updateErrorMessage(throwable);
+            
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(checkJobId)).getJobItemFacade().getErrorMessage().update(checkJobId,
 0, throwable);
             jobManager.stop(checkJobId);
         }
     }

Reply via email to