This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e824cf6afc Improve exception handling in AbstractPipelineJob and 
refactor processFailed (#28677)
0e824cf6afc is described below

commit 0e824cf6afc5ed9e4651e584b37de2fbbdbf31f4
Author: Xinze Guo <[email protected]>
AuthorDate: Sun Oct 8 19:39:13 2023 +0800

    Improve exception handling in AbstractPipelineJob and refactor 
processFailed (#28677)
    
    * Improve exception handle at AbstractPipelineJob and refactor processFailed
    
    * Move processFailed to AbstractSimplePipelineJob
---
 .../pipeline/core/job/AbstractPipelineJob.java     | 15 +-------------
 .../core/job/AbstractSimplePipelineJob.java        | 23 ++++++++++++++++++++--
 .../data/pipeline/cdc/core/job/CDCJob.java         | 12 +++++------
 3 files changed, 27 insertions(+), 23 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 5fceae71fc5..e307fe0f679 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
 import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -34,6 +33,7 @@ import 
org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
 import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -90,27 +90,14 @@ public abstract class AbstractPipelineJob implements 
PipelineJob {
         try {
             doPrepare(jobItemContext);
             // CHECKSTYLE:OFF
-        } catch (final RuntimeException ex) {
-            // CHECKSTYLE:ON
-            processFailed(jobItemContext, ex);
-            throw ex;
-            // CHECKSTYLE:OFF
         } catch (final SQLException ex) {
             // CHECKSTYLE:ON
-            processFailed(jobItemContext, ex);
             throw new PipelineInternalException(ex);
         }
     }
     
     protected abstract void doPrepare(PipelineJobItemContext jobItemContext) 
throws SQLException;
     
-    protected void processFailed(final PipelineJobItemContext jobItemContext, 
final Exception ex) {
-        String jobId = jobItemContext.getJobId();
-        log.error("job prepare failed, {}-{}", jobId, 
jobItemContext.getShardingItem(), ex);
-        jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
-        jobAPI.stop(jobId);
-    }
-    
     @Override
     public Optional<PipelineTasksRunner> getTasksRunner(final int 
shardingItem) {
         return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 7480585c916..3491f3882b1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -52,14 +52,33 @@ public abstract class AbstractSimplePipelineJob extends 
AbstractPipelineJob impl
             log.info("stopping true, ignore");
             return;
         }
-        PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
+        try {
+            PipelineJobItemContext jobItemContext = 
buildPipelineJobItemContext(shardingContext);
+            execute0(jobItemContext);
+            // CHECKSTYLE:OFF
+        } catch (final RuntimeException ex) {
+            // CHECKSTYLE:ON
+            processFailed(jobId, shardingItem, ex);
+            throw ex;
+        }
+    }
+    
+    private void execute0(final PipelineJobItemContext jobItemContext) {
+        String jobId = jobItemContext.getJobId();
+        int shardingItem = jobItemContext.getShardingItem();
         PipelineTasksRunner tasksRunner = 
buildPipelineTasksRunner(jobItemContext);
         if (!addTasksRunner(shardingItem, tasksRunner)) {
             return;
         }
-        getJobAPI().cleanJobItemErrorMessage(jobId, 
jobItemContext.getShardingItem());
+        getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
         prepare(jobItemContext);
         log.info("start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
         tasksRunner.start();
     }
+    
+    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
+        log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+        getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
+        getJobAPI().stop(jobId);
+    }
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 5f4edfc4bda..7e28a6b3787 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -36,7 +36,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -44,6 +43,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -114,17 +114,15 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         } catch (final RuntimeException ex) {
             // CHECKSTYLE:ON
             for (PipelineJobItemContext each : jobItemContexts) {
-                processFailed(each, ex);
+                processFailed(each.getJobId(), each.getShardingItem(), ex);
             }
             throw ex;
         }
     }
     
-    @Override
-    protected void processFailed(final PipelineJobItemContext jobItemContext, 
final Exception ex) {
-        String jobId = jobItemContext.getJobId();
-        log.error("job prepare failed, {}-{}", jobId, 
jobItemContext.getShardingItem(), ex);
-        jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(), 
jobItemContext.getShardingItem(), ex);
+    private void processFailed(final String jobId, final int shardingItem, 
final Exception ex) {
+        log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+        jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
         PipelineJobCenter.stop(jobId);
         jobAPI.updateJobConfigurationDisabled(jobId, true);
     }

Reply via email to