This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 6e78436e21e Refactor PipelineJobException (#20958)
6e78436e21e is described below
commit 6e78436e21e5bed43d11a3dcad1389bd3c1d972f
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Sep 14 15:39:12 2022 +0800
Refactor PipelineJobException (#20958)
* Add PipelineImporterJobWriteException
* Add BinlogSyncChannelAlreadyClosedException
* Add PipelineJobExecutionException
* Add SplitTableRangeException
* Renam ZeroShardingCountException
* Refactor PipelineJobException
* Fix test case
* Fix test case
---
.../user-manual/error-code/sql-error-code.cn.md | 5 +++++
.../user-manual/error-code/sql-error-code.en.md | 5 +++++
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 11 ++++-------
...va => BinlogSyncChannelAlreadyClosedException.java} | 10 +++++-----
...ion.java => PipelineImporterJobWriteException.java} | 10 +++++-----
...eJobCreationWithInvalidShardingCountException.java} | 11 +++++++----
.../exception/job/PipelineJobExecutionException.java | 15 +++++++--------
...java => PipelineJobHasAlreadyStartedException.java} | 6 +++---
...edException.java => SplitPipelineJobException.java} | 10 +++++-----
.../data/pipeline/core/importer/DefaultImporter.java | 7 +++----
.../pipeline/core/prepare/InventoryTaskSplitter.java | 8 ++++----
.../data/pipeline/core/task/IncrementalTask.java | 2 +-
.../data/pipeline/core/task/InventoryTask.java | 2 +-
.../pipeline/core/util/PipelineTableMetaDataUtil.java | 18 ++++++------------
.../data/pipeline/mysql/ingest/client/MySQLClient.java | 9 ++++-----
.../pipeline/mysql/ingest/client/MySQLClientTest.java | 4 ++--
.../core/prepare/InventoryTaskSplitterTest.java | 6 +++---
17 files changed, 70 insertions(+), 69 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index c6a9e1805a1..af6843f255e 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -94,6 +94,11 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| HY000 | 18020 | Failed to get DDL for table \`%s\` |
| HY000 | 18080 | Can not find pipeline job \`%s\` |
| HY000 | 18081 | Job has already started |
+| HY000 | 18082 | Sharding count of job \`%s\` is 0 |
+| HY000 | 18083 | Can not split range for table \`%s\`, reason: %s" |
+| HY000 | 18090 | Importer job write data failed |
+| HY000 | 18091 | Can not poll event because of binlog sync channel
already closed |
+| HY000 | 18092 | Task \`%s\` execute failed |
## 功能异常
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 04d6f9ca6e0..ee600f344e7 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -94,6 +94,11 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| HY000 | 18020 | Failed to get DDL for table \`%s\` |
| HY000 | 18080 | Can not find pipeline job \`%s\` |
| HY000 | 18081 | Job has already started |
+| HY000 | 18082 | Sharding count of job \`%s\` is 0 |
+| HY000 | 18083 | Can not split range for table \`%s\`, reason: %s" |
+| HY000 | 18090 | Importer job write data failed |
+| HY000 | 18091 | Can not poll event because of binlog sync channel
already closed |
+| HY000 | 18092 | Task \`%s\` execute failed |
## Feature Exception
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 8b0b4d5fb36..b05649ea31f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -28,8 +28,8 @@ import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineJobAPI;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.JobHasAlreadyStartedException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobHasAlreadyStartedException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.AlterNotExistProcessConfigurationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.CreateExistsProcessConfigurationException;
@@ -145,10 +145,7 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
@Override
public Optional<String> start(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
- if (0 == jobConfig.getJobShardingCount()) {
- log.warn("Invalid job config since job sharding count is 0,
jobId={}", jobId);
- throw new PipelineJobCreationException("job sharding count is 0,
jobId: " + jobId);
- }
+ Preconditions.checkState(0 != jobConfig.getJobShardingCount(), new
PipelineJobCreationWithInvalidShardingCountException(jobId));
log.info("Start job by {}", jobConfig);
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI();
String jobConfigKey = PipelineMetaDataNode.getJobConfigPath(jobId);
@@ -179,7 +176,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
log.info("Start disabled pipeline job {}", jobId);
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getJobBarrierDisablePath(jobId));
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), new
JobHasAlreadyStartedException(jobId));
+ ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), new
PipelineJobHasAlreadyStartedException(jobId));
jobConfigPOJO.setDisabled(false);
jobConfigPOJO.getProps().remove("stop_time");
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
similarity index 73%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
index debed7788fc..284256a42e6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/BinlogSyncChannelAlreadyClosedException.java
@@ -21,13 +21,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Job has already started exception.
+ * Binlog sync channel already closed exception.
*/
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class BinlogSyncChannelAlreadyClosedException extends
PipelineSQLException {
- private static final long serialVersionUID = 2854259384634892428L;
+ private static final long serialVersionUID = -8897293295641185703L;
- public JobHasAlreadyStartedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started",
jobId);
+ public BinlogSyncChannelAlreadyClosedException() {
+ super(XOpenSQLState.GENERAL_ERROR, 91, "Can not poll event because of
binlog sync channel already closed");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
similarity index 76%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index debed7788fc..2476a1d0594 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -21,13 +21,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Job has already started exception.
+ * Pipeline importer job write exception.
*/
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class PipelineImporterJobWriteException extends
PipelineSQLException {
- private static final long serialVersionUID = 2854259384634892428L;
+ private static final long serialVersionUID = -7924663094479253130L;
- public JobHasAlreadyStartedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started",
jobId);
+ public PipelineImporterJobWriteException() {
+ super(XOpenSQLState.GENERAL_ERROR, 90, "Importer job write data
failed");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
similarity index 65%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
index 75935cf3a33..d18868f735f 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
@@ -17,14 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.exception.job;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
/**
- * Pipeline job creation exception.
+ * Pipeline job creation with invalid sharding count exception.
*/
-public final class PipelineJobCreationException extends RuntimeException {
+public final class PipelineJobCreationWithInvalidShardingCountException
extends PipelineSQLException {
private static final long serialVersionUID = 5829502315976905271L;
- public PipelineJobCreationException(final String message) {
- super(message);
+ public PipelineJobCreationWithInvalidShardingCountException(final String
jobId) {
+ super(XOpenSQLState.GENERAL_ERROR, 82, "Sharding count of job `%s` is
0", jobId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
index 30438b797f2..13d2df77bb5 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobExecutionException.java
@@ -17,18 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.exception.job;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLException;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
+
/**
* Pipeline job execution exception.
*/
-public final class PipelineJobExecutionException extends RuntimeException {
-
- private static final long serialVersionUID = 1797495940081148743L;
+public final class PipelineJobExecutionException extends PipelineSQLException {
- public PipelineJobExecutionException(final String message) {
- super(message);
- }
+ private static final long serialVersionUID = -5530453461378051166L;
- public PipelineJobExecutionException(final String message, final Throwable
cause) {
- super(message, cause);
+ public PipelineJobExecutionException(final String taskId, final Throwable
cause) {
+ super(XOpenSQLState.GENERAL_ERROR, 92, "Task `%s` execute failed",
taskId, cause.getMessage());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
similarity index 85%
copy from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
copy to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
index debed7788fc..1cc534bf38d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobHasAlreadyStartedException.java
@@ -21,13 +21,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Job has already started exception.
+ * Pipeline job has already started exception.
*/
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class PipelineJobHasAlreadyStartedException extends
PipelineSQLException {
private static final long serialVersionUID = 2854259384634892428L;
- public JobHasAlreadyStartedException(final String jobId) {
+ public PipelineJobHasAlreadyStartedException(final String jobId) {
super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started",
jobId);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
similarity index 73%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
index debed7788fc..7a82e6da397 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/JobHasAlreadyStartedException.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobException.java
@@ -21,13 +21,13 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Job has already started exception.
+ * Split pipeline job exception.
*/
-public final class JobHasAlreadyStartedException extends PipelineSQLException {
+public final class SplitPipelineJobException extends PipelineSQLException {
- private static final long serialVersionUID = 2854259384634892428L;
+ private static final long serialVersionUID = -8509592086832334026L;
- public JobHasAlreadyStartedException(final String jobId) {
- super(XOpenSQLState.GENERAL_ERROR, 81, "Job `%s` has already started",
jobId);
+ public SplitPipelineJobException(final String tableName, final String
reason) {
+ super(XOpenSQLState.GENERAL_ERROR, 83, "Can not split range for table
`%s`, reason: %s", tableName, reason);
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 3de72582f09..5ca7e20d3cc 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -33,13 +33,14 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImporterJobWriteException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -126,9 +127,7 @@ public final class DefaultImporter extends
AbstractLifecycleExecutor implements
return;
}
boolean success = tryFlush(dataSource, buffer);
- if (isRunning() && !success) {
- throw new PipelineJobExecutionException("write failed.");
- }
+ ShardingSpherePreconditions.checkState(!isRunning() || success, new
PipelineImporterJobWriteException());
}
private boolean tryFlush(final DataSource dataSource, final
List<DataRecord> buffer) {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 7cc8a52f8ec..e116ec9e290 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -34,8 +34,8 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalProcessContext;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobPrepareFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
@@ -148,11 +148,11 @@ public final class InventoryTaskSplitter {
int uniqueKeyDataType = dumperConfig.getUniqueKeyDataType();
if (PipelineJdbcUtils.isIntegerColumn(uniqueKeyDataType)) {
return getPositionByIntegerPrimaryKeyRange(jobItemContext,
dataSource, dumperConfig);
- } else if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
+ }
+ if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
return getPositionByStringPrimaryKeyRange();
- } else {
- throw new PipelineJobCreationException(String.format("Can not
split range for table %s, reason: primary key is not integer or string type",
dumperConfig.getActualTableName()));
}
+ throw new SplitPipelineJobException(dumperConfig.getActualTableName(),
"primary key is not integer or string type");
}
private Collection<IngestPosition<?>>
getPositionByIntegerPrimaryKeyRange(final InventoryIncrementalJobItemContext
jobItemContext, final DataSource dataSource,
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index 0af479927af..475107073b9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -136,7 +136,7 @@ public final class IncrementalTask extends
AbstractLifecycleExecutor implements
future.get();
} catch (final InterruptedException ignored) {
} catch (final ExecutionException ex) {
- throw new PipelineJobExecutionException(String.format("Task %s
execute failed ", taskId), ex.getCause());
+ throw new PipelineJobExecutionException(taskId, ex.getCause());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 60a1afb27dd..b955a9d4ed1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -128,7 +128,7 @@ public final class InventoryTask extends
AbstractLifecycleExecutor implements Pi
future.get();
} catch (final InterruptedException ignored) {
} catch (final ExecutionException ex) {
- throw new PipelineJobExecutionException(String.format("Task %s
execute failed ", taskId), ex.getCause());
+ throw new PipelineJobExecutionException(taskId, ex.getCause());
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
index f6d04b192e5..eb8b74d6f87 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineTableMetaDataUtil.java
@@ -25,8 +25,9 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumn
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import java.sql.SQLException;
import java.util.Collection;
@@ -103,27 +104,20 @@ public final class PipelineTableMetaDataUtil {
}
private static PipelineColumnMetaData
mustGetAnAppropriateUniqueKeyColumn(final PipelineTableMetaData tableMetaData,
final String tableName) {
- if (null == tableMetaData) {
- throw new PipelineJobCreationException(String.format("Can not
split range for table %s, reason: can not get table metadata ", tableName));
- }
+ ShardingSpherePreconditions.checkNotNull(tableMetaData, new
SplitPipelineJobException(tableName, "can not get table metadata"));
List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
- if (primaryKeys.size() > 1) {
- throw new PipelineJobCreationException(String.format("Can not
split range for table %s, reason: primary key is union primary", tableName));
- }
if (1 == primaryKeys.size()) {
return
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
}
+ ShardingSpherePreconditions.checkState(primaryKeys.isEmpty(), new
SplitPipelineJobException(tableName, "primary key is union primary"));
Collection<PipelineIndexMetaData> uniqueIndexes =
tableMetaData.getUniqueIndexes();
- if (uniqueIndexes.isEmpty()) {
- throw new PipelineJobCreationException(String.format("Can not
split range for table %s, reason: no primary key or unique index", tableName));
- }
+ ShardingSpherePreconditions.checkState(!uniqueIndexes.isEmpty(), new
SplitPipelineJobException(tableName, "no primary key or unique index"));
if (1 == uniqueIndexes.size() && 1 ==
uniqueIndexes.iterator().next().getColumns().size()) {
PipelineColumnMetaData column =
uniqueIndexes.iterator().next().getColumns().get(0);
if (!column.isNullable()) {
return column;
}
}
- throw new PipelineJobCreationException(
- String.format("Can not split range for table %s, reason: table
contains multiple unique index or unique index contains nullable/multiple
column(s)", tableName));
+ throw new SplitPipelineJobException(tableName, "table contains
multiple unique index or unique index contains nullable/multiple column(s)");
}
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
index 610c34f1d9e..8036c022092 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClient.java
@@ -31,7 +31,7 @@ import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.GlobalTableMapEventMapping;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
import
org.apache.shardingsphere.data.pipeline.mysql.ingest.client.netty.MySQLBinlogEventPacketDecoder;
@@ -46,6 +46,7 @@ import
org.apache.shardingsphere.db.protocol.mysql.packet.command.query.text.que
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLErrPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.generic.MySQLOKPacket;
import org.apache.shardingsphere.db.protocol.netty.ChannelAttrInitializer;
+import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import java.net.InetSocketAddress;
@@ -206,11 +207,9 @@ public final class MySQLClient {
* @return binlog event
*/
public synchronized AbstractBinlogEvent poll() {
- if (!running) {
- throw new PipelineJobExecutionException("binlog sync channel
already closed, can't poll event");
- }
+ ShardingSpherePreconditions.checkState(running, new
BinlogSyncChannelAlreadyClosedException());
try {
- return blockingEventQueue.poll(100, TimeUnit.MILLISECONDS);
+ return blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ignored) {
return null;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
index 43f1d777bf7..dec7b1a84ed 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/client/MySQLClientTest.java
@@ -22,7 +22,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Promise;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobExecutionException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.BinlogSyncChannelAlreadyClosedException;
import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComBinlogDumpCommandPacket;
import
org.apache.shardingsphere.db.protocol.mysql.packet.command.binlog.MySQLComRegisterSlaveCommandPacket;
@@ -150,7 +150,7 @@ public final class MySQLClientTest {
assertFalse(channel.isOpen());
}
- @Test(expected = PipelineJobExecutionException.class)
+ @Test(expected = BinlogSyncChannelAlreadyClosedException.class)
public void assertPollFailed() throws NoSuchFieldException,
IllegalAccessException {
ReflectionUtil.setFieldValue(mysqlClient, "channel", channel);
ReflectionUtil.setFieldValue(mysqlClient, "running", false);
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 06d4a52e159..636970ee778 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -24,7 +24,7 @@ import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationException;
+import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobException;
import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -120,7 +120,7 @@ public final class InventoryTaskSplitterTest {
assertThat(actual.size(), is(1));
}
- @Test(expected = PipelineJobCreationException.class)
+ @Test(expected = SplitPipelineJobException.class)
public void assertSplitInventoryDataWithIllegalKeyDataType() throws
SQLException, NoSuchFieldException, IllegalAccessException {
initUnionPrimaryEnvironment(taskConfig.getDumperConfig());
InventoryDumperConfiguration dumperConfig =
ReflectionUtil.getFieldValue(inventoryTaskSplitter, "dumperConfig",
InventoryDumperConfiguration.class);
@@ -130,7 +130,7 @@ public final class InventoryTaskSplitterTest {
inventoryTaskSplitter.splitInventoryData(jobItemContext);
}
- @Test(expected = PipelineJobCreationException.class)
+ @Test(expected = SplitPipelineJobException.class)
public void assertSplitInventoryDataWithoutPrimaryAndUniqueIndex() throws
SQLException, NoSuchFieldException, IllegalAccessException {
initNoPrimaryEnvironment(taskConfig.getDumperConfig());
try (PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(taskConfig.getDumperConfig().getDataSourceConfig()))
{