This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0e824cf6afc Improve exception handling in AbstractPipelineJob and
refactor processFailed (#28677)
0e824cf6afc is described below
commit 0e824cf6afc5ed9e4651e584b37de2fbbdbf31f4
Author: Xinze Guo <[email protected]>
AuthorDate: Sun Oct 8 19:39:13 2023 +0800
Improve exception handling in AbstractPipelineJob and refactor
processFailed (#28677)
* Improve exception handle at AbstractPipelineJob and refactor processFailed
* Move processFailed to AbstractSimplePipelineJob
---
.../pipeline/core/job/AbstractPipelineJob.java | 15 +-------------
.../core/job/AbstractSimplePipelineJob.java | 23 ++++++++++++++++++++--
.../data/pipeline/cdc/core/job/CDCJob.java | 12 +++++------
3 files changed, 27 insertions(+), 23 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 5fceae71fc5..e307fe0f679 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemCon
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
@@ -34,6 +33,7 @@ import
org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -90,27 +90,14 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
try {
doPrepare(jobItemContext);
// CHECKSTYLE:OFF
- } catch (final RuntimeException ex) {
- // CHECKSTYLE:ON
- processFailed(jobItemContext, ex);
- throw ex;
- // CHECKSTYLE:OFF
} catch (final SQLException ex) {
// CHECKSTYLE:ON
- processFailed(jobItemContext, ex);
throw new PipelineInternalException(ex);
}
}
protected abstract void doPrepare(PipelineJobItemContext jobItemContext)
throws SQLException;
- protected void processFailed(final PipelineJobItemContext jobItemContext,
final Exception ex) {
- String jobId = jobItemContext.getJobId();
- log.error("job prepare failed, {}-{}", jobId,
jobItemContext.getShardingItem(), ex);
- jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
- jobAPI.stop(jobId);
- }
-
@Override
public Optional<PipelineTasksRunner> getTasksRunner(final int
shardingItem) {
return Optional.ofNullable(tasksRunnerMap.get(shardingItem));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
index 7480585c916..3491f3882b1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSimplePipelineJob.java
@@ -52,14 +52,33 @@ public abstract class AbstractSimplePipelineJob extends
AbstractPipelineJob impl
log.info("stopping true, ignore");
return;
}
- PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
+ try {
+ PipelineJobItemContext jobItemContext =
buildPipelineJobItemContext(shardingContext);
+ execute0(jobItemContext);
+ // CHECKSTYLE:OFF
+ } catch (final RuntimeException ex) {
+ // CHECKSTYLE:ON
+ processFailed(jobId, shardingItem, ex);
+ throw ex;
+ }
+ }
+
+ private void execute0(final PipelineJobItemContext jobItemContext) {
+ String jobId = jobItemContext.getJobId();
+ int shardingItem = jobItemContext.getShardingItem();
PipelineTasksRunner tasksRunner =
buildPipelineTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
return;
}
- getJobAPI().cleanJobItemErrorMessage(jobId,
jobItemContext.getShardingItem());
+ getJobAPI().cleanJobItemErrorMessage(jobId, shardingItem);
prepare(jobItemContext);
log.info("start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
tasksRunner.start();
}
+
+ private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
+ log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+ getJobAPI().updateJobItemErrorMessage(jobId, shardingItem, ex);
+ getJobAPI().stop(jobId);
+ }
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 5f4edfc4bda..7e28a6b3787 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -36,7 +36,6 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
-import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -44,6 +43,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
+import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
import java.util.LinkedList;
@@ -114,17 +114,15 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
for (PipelineJobItemContext each : jobItemContexts) {
- processFailed(each, ex);
+ processFailed(each.getJobId(), each.getShardingItem(), ex);
}
throw ex;
}
}
- @Override
- protected void processFailed(final PipelineJobItemContext jobItemContext,
final Exception ex) {
- String jobId = jobItemContext.getJobId();
- log.error("job prepare failed, {}-{}", jobId,
jobItemContext.getShardingItem(), ex);
- jobAPI.updateJobItemErrorMessage(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), ex);
+ private void processFailed(final String jobId, final int shardingItem,
final Exception ex) {
+ log.error("job prepare failed, {}-{}", jobId, shardingItem, ex);
+ jobAPI.updateJobItemErrorMessage(jobId, shardingItem, ex);
PipelineJobCenter.stop(jobId);
jobAPI.updateJobConfigurationDisabled(jobId, true);
}