This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e78436e21e Refactor PipelineJobException (#20958)
6e78436e21e is described below

commit 6e78436e21e5bed43d11a3dcad1389bd3c1d972f
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Sep 14 15:39:12 2022 +0800

    Refactor PipelineJobException (#20958)
    
    * Add PipelineImporterJobWriteException
    
    * Add BinlogSyncChannelAlreadyClosedException
    
    * Add PipelineJobExecutionException
    
    * Add SplitTableRangeException
    
    * Renam ZeroShardingCountException
    
    * Refactor PipelineJobException
    
    * Fix test case
    
    * Fix test case
---
 .../user-manual/error-code/sql-error-code.cn.md        |  5 +++++
 .../user-manual/error-code/sql-error-code.en.md        |  5 +++++
 .../core/api/impl/AbstractPipelineJobAPIImpl.java      | 11 ++++-------
 ...va => BinlogSyncChannelAlreadyClosedException.java} | 10 +++++-----
 ...ion.java => PipelineImporterJobWriteException.java} | 10 +++++-----
 ...eJobCreationWithInvalidShardingCountException.java} | 11 +++++++----
 .../exception/job/PipelineJobExecutionException.java   | 15 +++++++--------
 ...java => PipelineJobHasAlreadyStartedException.java} |  6 +++---
 ...edException.java => SplitPipelineJobException.java} | 10 +++++-----
 .../data/pipeline/core/importer/DefaultImporter.java   |  7 +++----
 .../pipeline/core/prepare/InventoryTaskSplitter.java   |  8 ++++----
 .../data/pipeline/core/task/IncrementalTask.java       |  2 +-
 .../data/pipeline/core/task/InventoryTask.java         |  2 +-
 .../pipeline/core/util/PipelineTableMetaDataUtil.java  | 18 ++++++------------
 .../data/pipeline/mysql/ingest/client/MySQLClient.java |  9 ++++-----
 .../pipeline/mysql/ingest/client/MySQLClientTest.java  |  4 ++--
 .../core/prepare/InventoryTaskSplitterTest.java        |  6 +++---
 17 files changed, 70 insertions(+), 69 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index c6a9e1805a1..af6843f255e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -94,6 +94,11 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | HY000     | 18020       | Failed to get DDL for table \`%s\` |
 | HY000     | 18080       | Can not find pipeline job \`%s\` |
 | HY000     | 18081       | Job has already started |
+| HY000     | 18082       | Sharding count of job \`%s\` is 0 |
+| HY000     | 18083       | Can not split range for table \`%s\`, reason: %s" |
+| HY000     | 18090       | Importer job write data failed |
+| HY000     | 18091       | Can not poll event because of binlog sync channel 
already closed |
+| HY000     | 18092       | Task \`%s\` execute failed |
 
 ## 功能异常
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 04d6f9ca6e0..ee600f344e7 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -94,6 +94,11 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | HY000     | 18020       | Failed to get DDL for table \`%s\` |
 | HY000     | 18080       | Can not find pipeline job \`%s\` |
 | HY000     | 18081       | Job has already started |
+| HY000     | 18082       | Sharding count of job \`%s\` is 0 |
+| HY000     | 18083       | Can not split range for table \`%s\`, reason: %s" |
+| HY000     | 18090       | Importer job write data failed |
+| HY000     | 18091       | Can not poll event because of binlog sync channel 
already closed |
+| HY000     | 18092       | Task \`%s\` execute failed |
 
 ## Feature Exception
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 8b0b4d5fb36..b05649ea31f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -28,8 +28,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.JobHasAlreadyStartedException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
@@ -145,10 +145,7 @@ public abstract class AbstractPipelineJobAPIImpl 
implements PipelineJobAPI {
     @Override
     public Optional<String> start(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
-        if (0 == jobConfig.getJobShardingCount()) {
-            log.warn("Invalid job config since job sharding count is 0, 
jobId={}", jobId);
-            throw new PipelineJobCreationException("job sharding count is 0, 
jobId: " + jobId);
-        }
+        Preconditions.checkState(0 != jobConfig.getJobShardingCount(), new 
PipelineJobCreationWithInvalidShardingCountException(jobId));
         log.info("Start job by {}", jobConfig);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
@@ -179,7 +176,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         log.info("Start disabled pipeline job {}", jobId);
         
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
-        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), new 
JobHasAlreadyStartedException(jobId));
+        ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), new 
PipelineJobHasAlreadyStartedException(jobId));
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
similarity index 73%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
index debed7788fc..284256a42e6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
@@ -21,13 +21,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Job has already started exception.
+ * Binlog sync channel already closed exception.
  */
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class BinlogSyncChannelAlreadyClosedException extends 
PipelineSQLException {
     
-    private static final long serialVersionUID = 2854259384634892428L;
+    private static final long serialVersionUID = -8897293295641185703L;
     
-    public JobHasAlreadyStartedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started", 
jobId);
+    public BinlogSyncChannelAlreadyClosedException() {
+        super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of 
binlog sync channel already closed");
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
similarity index 76%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index debed7788fc..2476a1d0594 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -21,13 +21,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Job has already started exception.
+ * Pipeline importer job write exception.
  */
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class PipelineImporterJobWriteException extends 
PipelineSQLException {
     
-    private static final long serialVersionUID = 2854259384634892428L;
+    private static final long serialVersionUID = -7924663094479253130L;
     
-    public JobHasAlreadyStartedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started", 
jobId);
+    public PipelineImporterJobWriteException() {
+        super(XOpenSQLState.GENERAL_ERROR, 90, "Importer job write data 
failed");
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
similarity index 65%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
index 75935cf3a33..d18868f735f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
@@ -17,14 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
 /**
- * Pipeline job creation exception.
+ * Pipeline job creation with invalid sharding count exception.
  */
-public final class PipelineJobCreationException extends RuntimeException {
+public final class PipelineJobCreationWithInvalidShardingCountException 
extends PipelineSQLException {
     
     private static final long serialVersionUID = 5829502315976905271L;
     
-    public PipelineJobCreationException(final String message) {
-        super(message);
+    public PipelineJobCreationWithInvalidShardingCountException(final String 
jobId) {
+        super(XOpenSQLState.GENERAL_ERROR, 82, "Sharding count of job `%s` is 
0", jobId);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index 30438b797f2..13d2df77bb5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -17,18 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
 /**
  * Pipeline job execution exception.
  */
-public final class PipelineJobExecutionException extends RuntimeException {
-    
-    private static final long serialVersionUID = 1797495940081148743L;
+public final class PipelineJobExecutionException extends PipelineSQLException {
     
-    public PipelineJobExecutionException(final String message) {
-        super(message);
-    }
+    private static final long serialVersionUID = -5530453461378051166L;
     
-    public PipelineJobExecutionException(final String message, final Throwable 
cause) {
-        super(message, cause);
+    public PipelineJobExecutionException(final String taskId, final Throwable 
cause) {
+        super(XOpenSQLState.GENERAL_ERROR, 92, "Task `%s` execute failed", 
taskId, cause.getMessage());
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
similarity index 85%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
index debed7788fc..1cc534bf38d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
@@ -21,13 +21,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Job has already started exception.
+ * Pipeline job has already started exception.
  */
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class PipelineJobHasAlreadyStartedException extends 
PipelineSQLException {
     
     private static final long serialVersionUID = 2854259384634892428L;
     
-    public JobHasAlreadyStartedException(final String jobId) {
+    public PipelineJobHasAlreadyStartedException(final String jobId) {
         super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started", 
jobId);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
similarity index 73%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
index debed7788fc..7a82e6da397 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
@@ -21,13 +21,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Job has already started exception.
+ * Split pipeline job exception.
  */
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class SplitPipelineJobException extends PipelineSQLException {
     
-    private static final long serialVersionUID = 2854259384634892428L;
+    private static final long serialVersionUID = -8509592086832334026L;
     
-    public JobHasAlreadyStartedException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started", 
jobId);
+    public SplitPipelineJobException(final String tableName, final String 
reason) {
+        super(XOpenSQLState.GENERAL_ERROR, 83, "Can not split range for table 
`%s`, reason: %s", tableName, reason);
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 3de72582f09..5ca7e20d3cc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -33,13 +33,14 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -126,9 +127,7 @@ public final class DefaultImporter extends 
AbstractLifecycleExecutor implements
             return;
         }
         boolean success = tryFlush(dataSource, buffer);
-        if (isRunning() && !success) {
-            throw new PipelineJobExecutionException("write failed.");
-        }
+        ShardingSpherePreconditions.checkState(!isRunning() || success, new 
PipelineImporterJobWriteException());
     }
     
     private boolean tryFlush(final DataSource dataSource, final 
List<DataRecord> buffer) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 7cc8a52f8ec..e116ec9e290 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -34,8 +34,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
@@ -148,11 +148,11 @@ public final class InventoryTaskSplitter {
         int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
         if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
             return getPositionByIntegerPrimaryKeyRange(jobItemContext, 
dataSource, dumperConfig);
-        } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
+        }
+        if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
             return getPositionByStringPrimaryKeyRange();
-        } else {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is not integer or string type", 
dumperConfig.getActualTableName()));
         }
+        throw new SplitPipelineJobException(dumperConfig.getActualTableName(), 
"primary key is not integer or string type");
     }
     
     private Collection<IngestPosition<?>> 
getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext 
jobItemContext, final DataSource dataSource,
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 0af479927af..475107073b9 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -136,7 +136,7 @@ public final class IncrementalTask extends 
AbstractLifecycleExecutor implements
             future.get();
         } catch (final InterruptedException ignored) {
         } catch (final ExecutionException ex) {
-            throw new PipelineJobExecutionException(String.format("Task %s 
execute failed ", taskId), ex.getCause());
+            throw new PipelineJobExecutionException(taskId, ex.getCause());
         }
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 60a1afb27dd..b955a9d4ed1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -128,7 +128,7 @@ public final class InventoryTask extends 
AbstractLifecycleExecutor implements Pi
             future.get();
         } catch (final InterruptedException ignored) {
         } catch (final ExecutionException ex) {
-            throw new PipelineJobExecutionException(String.format("Task %s 
execute failed ", taskId), ex.getCause());
+            throw new PipelineJobExecutionException(taskId, ex.getCause());
         }
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
index f6d04b192e5..eb8b74d6f87 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -25,8 +25,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -103,27 +104,20 @@ public final class PipelineTableMetaDataUtil {
     }
     
     private static PipelineColumnMetaData 
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData, 
final String tableName) {
-        if (null == tableMetaData) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: can not get table metadata ", tableName));
-        }
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, new 
SplitPipelineJobException(tableName, "can not get table metadata"));
         List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
-        if (primaryKeys.size() > 1) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: primary key is union primary", tableName));
-        }
         if (1 == primaryKeys.size()) {
             return 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
         }
+        ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), new 
SplitPipelineJobException(tableName, "primary key is union primary"));
         Collection<PipelineIndexMetaData> uniqueIndexes = 
tableMetaData.getUniqueIndexes();
-        if (uniqueIndexes.isEmpty()) {
-            throw new PipelineJobCreationException(String.format("Can not 
split range for table %s, reason: no primary key or unique index", tableName));
-        }
+        ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), new 
SplitPipelineJobException(tableName, "no primary key or unique index"));
         if (1 == uniqueIndexes.size() && 1 == 
uniqueIndexes.iterator().next().getColumns().size()) {
             PipelineColumnMetaData column = 
uniqueIndexes.iterator().next().getColumns().get(0);
             if (!column.isNullable()) {
                 return column;
             }
         }
-        throw new PipelineJobCreationException(
-                String.format("Can not split range for table %s, reason: table 
contains multiple unique index or unique index contains nullable/multiple 
column(s)", tableName));
+        throw new SplitPipelineJobException(tableName, "table contains 
multiple unique index or unique index contains nullable/multiple column(s)");
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 610c34f1d9e..8036c022092 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,7 +31,7 @@ import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.Promise;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
@@ -46,6 +46,7 @@ import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.que
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
 import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 
 import java.net.InetSocketAddress;
@@ -206,11 +207,9 @@ public final class MySQLClient {
      * @return binlog event
      */
     public synchronized AbstractBinlogEvent poll() {
-        if (!running) {
-            throw new PipelineJobExecutionException("binlog sync channel 
already closed, can't poll event");
-        }
+        ShardingSpherePreconditions.checkState(running, new 
BinlogSyncChannelAlreadyClosedException());
         try {
-            return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
+            return blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
         } catch (final InterruptedException ignored) {
             return null;
         }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 43f1d777bf7..dec7b1a84ed 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -22,7 +22,7 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.concurrent.Promise;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
 import 
org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
@@ -150,7 +150,7 @@ public final class MySQLClientTest {
         assertFalse(channel.isOpen());
     }
     
-    @Test(expected = PipelineJobExecutionException.class)
+    @Test(expected = BinlogSyncChannelAlreadyClosedException.class)
     public void assertPollFailed() throws NoSuchFieldException, 
IllegalAccessException {
         ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
         ReflectionUtil.setFieldValue(mysqlClient, "running", false);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 06d4a52e159..636970ee778 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -24,7 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -120,7 +120,7 @@ public final class InventoryTaskSplitterTest {
         assertThat(actual.size(), is(1));
     }
     
-    @Test(expected = PipelineJobCreationException.class)
+    @Test(expected = SplitPipelineJobException.class)
     public void assertSplitInventoryDataWithIllegalKeyDataType() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
         InventoryDumperConfiguration dumperConfig = 
ReflectionUtil.getFieldValue(inventoryTaskSplitter, "dumperConfig", 
InventoryDumperConfiguration.class);
@@ -130,7 +130,7 @@ public final class InventoryTaskSplitterTest {
         inventoryTaskSplitter.splitInventoryData(jobItemContext);
     }
     
-    @Test(expected = PipelineJobCreationException.class)
+    @Test(expected = SplitPipelineJobException.class)
     public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws 
SQLException, NoSuchFieldException, IllegalAccessException {
         initNoPrimaryEnvironment(taskConfig.getDumperConfig());
         try (PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()))
 {

Reply via email to