This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 682c2aa5ee3 Refactor PipelineSQLException (#30817)
682c2aa5ee3 is described below

commit 682c2aa5ee3a0a884103fd02cc1eff1644f4295f
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Apr 9 14:09:28 2024 +0800

    Refactor PipelineSQLException (#30817)
---
 .../user-manual/error-code/sql-error-code.cn.md    | 52 +++++++++++-----------
 .../user-manual/error-code/sql-error-code.en.md    | 52 +++++++++++-----------
 ...edException.java => PipelineDataException.java} | 26 ++++++-----
 ...DataConsistencyCheckLoadingFailedException.java |  5 +--
 ...PipelineUnexpectedDataRecordOrderException.java |  5 +--
 ...C32SingleTableInventoryCalculatorException.java |  5 +--
 .../UnsupportedPipelineDatabaseTypeException.java  |  5 +--
 .../job/ConsistencyCheckJobNotFoundException.java  |  5 +--
 .../CreateTableSQLGenerateException.java           |  7 ++-
 .../MissingRequiredTargetDatabaseException.java    |  5 +--
 .../job/PipelineImporterJobWriteException.java     |  9 +---
 ...bCreationWithInvalidShardingCountException.java |  5 +--
 ...iteException.java => PipelineJobException.java} | 24 ++++++----
 .../job/PipelineJobNotFoundException.java          |  5 +--
 ...repareJobWithCheckPrivilegeFailedException.java |  5 +--
 .../PrepareJobWithGetBinlogPositionException.java  |  5 +--
 ...areJobWithInvalidSourceDataSourceException.java |  5 +--
 ...PrepareJobWithTargetTableNotEmptyException.java |  5 +--
 .../PrepareJobWithoutEnoughPrivilegeException.java |  5 +--
 .../job/PrepareJobWithoutUserException.java        |  5 +--
 .../job/SplitPipelineJobByUniqueKeyException.java  |  5 +--
 ...FoundWhenSplitPipelineJobByRangeException.java} |  9 ++--
 ...ompletedConsistencyCheckJobExistsException.java |  5 +--
 .../param/PipelineInvalidParameterException.java   |  2 +-
 .../loader/PipelineTableMetaDataUtils.java         |  4 +-
 .../StandardPipelineTableMetaDataLoader.java       |  2 +-
 .../mysql/sqlbuilder/MySQLPipelineSQLBuilder.java  |  2 +-
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  2 +-
 .../h2/sqlbuilder/H2PipelineSQLBuilder.java        |  2 +-
 29 files changed, 132 insertions(+), 141 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index 4a15cee1d5e..cbeb6106428 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -106,32 +106,32 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 
 ### 数据管道
 
-| Vendor Code | SQL State | 错误信息                                               
                               |
-|-------------|-----------|-----------------------------------------------------------------------------------|
-| 18004       | 44000     | Target database \`%s\` isn't exist.                
                               |
-| 18005       | 22023     | There is invalid parameter value: \`%s\`.          
                               |
-| 18020       | HY000     | Failed to get DDL for table \`%s\`.                
                               |
-| 18050       | HY000     | Before data record is \`%s\`, after data record is 
\`%s\`.                        |
-| 18051       | 08000     | Data check table \`%s\` failed.                    
                               |
-| 18052       | 0A000     | Unsupported pipeline database type \`%s\`.         
                               |
-| 18053       | 0A000     | Unsupported CRC32 data consistency calculate 
algorithm with database type \`%s\`. |
-| 18080       | HY000     | Can not find pipeline job \`%s\`.                  
                               |
-| 18082       | HY000     | Sharding count of job \`%s\` is 0.                 
                               |
-| 18083       | HY000     | Can not split by range for table \`%s\`, reason 
is: %s                            |
-| 18084       | HY000     | Can not split by unique key \`%s\` for table 
\`%s\`, reason is: %s                |
-| 18085       | HY000     | Target table \`%s\` is not empty.                  
                               |
-| 18086       | 01007     | Source data source lacks %s privilege(s).          
                               |
-| 18087       | HY000     | Source data source required \`%s = %s\`, now is 
\`%s\`.                           |
-| 18088       | HY000     | User \`%s\` does exist.                            
                               |
-| 18089       | 08000     | Check privileges failed on source data source, 
reason is: %s                      |
-| 18091       | HY000     | Importer job write data failed.                    
                               |
-| 18092       | 08000     | Get binlog position failed by job \`%s\`, reason 
is: %s                           |
-| 18095       | HY000     | Can not find consistency check job of \`%s\`.      
                               |
-| 18096       | HY000     | Uncompleted consistency check job \`%s\` exists.   
                               |
-| 18400       | 42S02     | Can not find stream data source table.             
                               |
-| 18401       | 42S02     | Database '%s' does not exist.                      
                               |
-| 18410       | 42S02     | CDC Login request body is empty.                   
                               |
-| 18411       | 08004     | Illegal username or password.                      
                               |
+| Vendor Code | SQL State | 错误信息                                               
                             |
+|-------------|-----------|---------------------------------------------------------------------------------|
+| 18000       | 22023     | There is invalid parameter value '%s'.             
                             |
+| 18100       | 42S02     | Target database '%s' does not exist.               
                             |
+| 18101       | 42S02     | Can not find pipeline job '%s'.                    
                             |
+| 18102       | 44000     | Sharding count of job '%s' is 0.                   
                             |
+| 18103       | 42S02     | Can not get meta data for table '%s' when split by 
range.                       |
+| 18104       | HY000     | Can not split by unique key '%s' for table '%s'.   
                             |
+| 18105       | HY000     | Target table '%s' is not empty.                    
                             |
+| 18106       | 01007     | Source data source lacks '%s' privilege(s).        
                             |
+| 18107       | HY000     | Source data source required '%s = %s', now is 
'%s'.                             |
+| 18108       | HY000     | User '%s' does exist.                              
                             |
+| 18109       | 08000     | Check privileges failed on source data source.     
                             |
+| 18110       | HY000     | Importer job write data failed.                    
                             |
+| 18111       | 08000     | Get binlog position failed by job '%s'.            
                             |
+| 18112       | HY000     | Can not find consistency check job of '%s'.        
                             |
+| 18113       | HY000     | Uncompleted consistency check job '%s' exists, 
progress '%s'.                   |
+| 18014       | HY000     | Failed to get DDL for table '%s'.                  
                             |
+| 18200       | HY000     | Before data record is '%s', after data record is 
'%s'.                          |
+| 18201       | 08000     | Data check table '%s' failed.                      
                             |
+| 18202       | 0A000     | Unsupported pipeline database type '%s'.           
                             |
+| 18203       | 0A000     | Unsupported CRC32 data consistency calculate 
algorithm with database type '%s'. |
+| 18400       | 42S02     | Can not find stream data source table.             
                             |
+| 18401       | 42S02     | Database '%s' does not exist.                      
                             |
+| 18410       | 42S02     | CDC Login request body is empty.                   
                             |
+| 18411       | 08004     | Illegal username or password.                      
                             |
 
 ## 功能异常
 
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index 6b0787fdd45..a9df7d0efd4 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -107,32 +107,32 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 
 ### Data Pipeline
 
-| Vendor Code | SQL State | Reason                                             
                               |
-|-------------|-----------|-----------------------------------------------------------------------------------|
-| 18004       | 44000     | Target database \`%s\` isn't exist.                
                               |
-| 18005       | 22023     | There is invalid parameter value: \`%s\`.          
                               |
-| 18020       | HY000     | Failed to get DDL for table \`%s\`.                
                               |
-| 18050       | HY000     | Before data record is \`%s\`, after data record is 
\`%s\`.                        |
-| 18051       | 08000     | Data check table \`%s\` failed.                    
                               |
-| 18052       | 0A000     | Unsupported pipeline database type \`%s\`.         
                               |
-| 18053       | 0A000     | Unsupported CRC32 data consistency calculate 
algorithm with database type \`%s\`. |
-| 18080       | HY000     | Can not find pipeline job \`%s\`.                  
                               |
-| 18082       | HY000     | Sharding count of job \`%s\` is 0.                 
                               |
-| 18083       | HY000     | Can not split by range for table \`%s\`, reason 
is: %s                            |
-| 18084       | HY000     | Can not split by unique key \`%s\` for table 
\`%s\`, reason is: %s                |
-| 18085       | HY000     | Target table \`%s\` is not empty.                  
                               |
-| 18086       | 01007     | Source data source lacks %s privilege(s).          
                               |
-| 18087       | HY000     | Source data source required \`%s = %s\`, now is 
\`%s\`.                           |
-| 18088       | HY000     | User \`%s\` does exist.                            
                               |
-| 18089       | 08000     | Check privileges failed on source data source, 
reason is: %s                      |
-| 18091       | HY000     | Importer job write data failed.                    
                               |
-| 18092       | 08000     | Get binlog position failed by job \`%s\`, reason 
is: %s                           |
-| 18095       | HY000     | Can not find consistency check job of \`%s\`.      
                               |
-| 18096       | HY000     | Uncompleted consistency check job \`%s\` exists.   
                               |
-| 18400       | 42S02     | Can not find stream data source table.             
                               |
-| 18401       | 42S02     | Database '%s' does not exist.                      
                               |
-| 18410       | 42S02     | CDC Login request body is empty.                   
                               |
-| 18411       | 08004     | Illegal username or password.                      
                               |
+| Vendor Code | SQL State | Reason                                             
                             |
+|-------------|-----------|---------------------------------------------------------------------------------|
+| 18000       | 22023     | There is invalid parameter value '%s'.             
                             |
+| 18100       | 42S02     | Target database '%s' does not exist.               
                             |
+| 18101       | 42S02     | Can not find pipeline job '%s'.                    
                             |
+| 18102       | 44000     | Sharding count of job '%s' is 0.                   
                             |
+| 18103       | 42S02     | Can not get meta data for table '%s' when split by 
range.                       |
+| 18104       | HY000     | Can not split by unique key '%s' for table '%s'.   
                             |
+| 18105       | HY000     | Target table '%s' is not empty.                    
                             |
+| 18106       | 01007     | Source data source lacks '%s' privilege(s).        
                             |
+| 18107       | HY000     | Source data source required '%s = %s', now is 
'%s'.                             |
+| 18108       | HY000     | User '%s' does exist.                              
                             |
+| 18109       | 08000     | Check privileges failed on source data source.     
                             |
+| 18110       | HY000     | Importer job write data failed.                    
                             |
+| 18111       | 08000     | Get binlog position failed by job '%s'.            
                             |
+| 18112       | HY000     | Can not find consistency check job of '%s'.        
                             |
+| 18113       | HY000     | Uncompleted consistency check job '%s' exists, 
progress '%s'.                   |
+| 18014       | HY000     | Failed to get DDL for table '%s'.                  
                             |
+| 18200       | HY000     | Before data record is '%s', after data record is 
'%s'.                          |
+| 18201       | 08000     | Data check table '%s' failed.                      
                             |
+| 18202       | 0A000     | Unsupported pipeline database type '%s'.           
                             |
+| 18203       | 0A000     | Unsupported CRC32 data consistency calculate 
algorithm with database type '%s'. |
+| 18400       | 42S02     | Can not find stream data source table.             
                             |
+| 18401       | 42S02     | Database '%s' does not exist.                      
                             |
+| 18410       | 42S02     | CDC Login request body is empty.                   
                             |
+| 18411       | 08004     | Illegal username or password.                      
                             |
 
 ## Feature Exception
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineDataException.java
similarity index 54%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineDataException.java
index 6edab67d77f..650d437ebbd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineDataException.java
@@ -17,23 +17,29 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
+import com.google.common.base.Preconditions;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.SQLState;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
-
-import javax.annotation.Nullable;
 
 /**
- * Pipeline table data consistency check loading failed exception.
+ * Pipeline job exception.
  */
-public final class PipelineTableDataConsistencyCheckLoadingFailedException 
extends PipelineSQLException {
+public abstract class PipelineDataException extends PipelineSQLException {
+    
+    private static final long serialVersionUID = -4353142247923068753L;
     
-    private static final long serialVersionUID = 8965231249677009738L;
+    private static final int JOB_CODE = 2;
+    
+    protected PipelineDataException(final SQLState sqlState, final int 
errorCode, final String reason, final Object... messageArgs) {
+        super(sqlState, getErrorCode(errorCode), reason, messageArgs);
+    }
     
-    public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable 
final String schemaName, final String tableName) {
-        this(schemaName, tableName, null);
+    protected PipelineDataException(final SQLState sqlState, final int 
errorCode, final String reason, final Exception cause) {
+        super(sqlState, getErrorCode(errorCode), reason, cause);
     }
     
-    public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable 
final String schemaName, final String tableName, final Exception cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 51, String.format("Data 
check table `%s` failed.", null != schemaName ? schemaName + "." + tableName : 
tableName), cause);
+    private static int getErrorCode(final int errorCode) {
+        Preconditions.checkArgument(errorCode >= 0 && errorCode < 100, "The 
value range of error code should be [0, 100).");
+        return JOB_CODE * 100 + errorCode;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
index 6edab67d77f..6641476adf8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 import javax.annotation.Nullable;
@@ -25,7 +24,7 @@ import javax.annotation.Nullable;
 /**
  * Pipeline table data consistency check loading failed exception.
  */
-public final class PipelineTableDataConsistencyCheckLoadingFailedException 
extends PipelineSQLException {
+public final class PipelineTableDataConsistencyCheckLoadingFailedException 
extends PipelineDataException {
     
     private static final long serialVersionUID = 8965231249677009738L;
     
@@ -34,6 +33,6 @@ public final class 
PipelineTableDataConsistencyCheckLoadingFailedException exten
     }
     
     public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable 
final String schemaName, final String tableName, final Exception cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 51, String.format("Data 
check table `%s` failed.", null != schemaName ? schemaName + "." + tableName : 
tableName), cause);
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 1, String.format("Data check 
table '%s' failed.", null != schemaName ? schemaName + "." + tableName : 
tableName), cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
index e285cbf1166..1a531d3da75 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineUnexpectedDataRecordOrderException.java
@@ -19,16 +19,15 @@ package 
org.apache.shardingsphere.data.pipeline.core.exception.data;
 
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 
 /**
  * Pipeline unexpected data record order exception.
  */
-public final class PipelineUnexpectedDataRecordOrderException extends 
PipelineSQLException {
+public final class PipelineUnexpectedDataRecordOrderException extends 
PipelineDataException {
     
     private static final long serialVersionUID = 6023695604738387750L;
     
     public PipelineUnexpectedDataRecordOrderException(final DataRecord 
beforeDataRecord, final DataRecord afterDataRecord) {
-        super(XOpenSQLState.GENERAL_ERROR, 50, String.format("Before data 
record is `%s`, after data record is `%s`.", beforeDataRecord, 
afterDataRecord));
+        super(XOpenSQLState.GENERAL_ERROR, 0, "Before data record is '%s', 
after data record is '%s'.", beforeDataRecord, afterDataRecord);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedCRC32SingleTableInventoryCalculatorException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedCRC32SingleTableInventoryCalculatorException.java
index 79ea2048815..c44871e31b8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedCRC32SingleTableInventoryCalculatorException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedCRC32SingleTableInventoryCalculatorException.java
@@ -17,18 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Unsupported CRC32 single table inventory calculator exception.
  */
-public final class UnsupportedCRC32SingleTableInventoryCalculatorException 
extends PipelineSQLException {
+public final class UnsupportedCRC32SingleTableInventoryCalculatorException 
extends PipelineDataException {
     
     private static final long serialVersionUID = 580323508713524816L;
     
     public UnsupportedCRC32SingleTableInventoryCalculatorException(final 
DatabaseType databaseType) {
-        super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 53, 
String.format("Unsupported CRC32 data consistency calculate algorithm with 
database type `%s`.", databaseType.getType()));
+        super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 3, "Unsupported CRC32 data 
consistency calculate algorithm with database type '%s'.", 
databaseType.getType());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineDatabaseTypeException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineDatabaseTypeException.java
index c1d30fcce4d..16430123043 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineDatabaseTypeException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/UnsupportedPipelineDatabaseTypeException.java
@@ -17,18 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Unsupported pipeline database type exception.
  */
-public final class UnsupportedPipelineDatabaseTypeException extends 
PipelineSQLException {
+public final class UnsupportedPipelineDatabaseTypeException extends 
PipelineDataException {
     
     private static final long serialVersionUID = -4100671584682823997L;
     
     public UnsupportedPipelineDatabaseTypeException(final DatabaseType 
databaseType) {
-        super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 52, 
String.format("Unsupported pipeline database type `%s`.", 
databaseType.getType()));
+        super(XOpenSQLState.FEATURE_NOT_SUPPORTED, 2, "Unsupported pipeline 
database type '%s'.", databaseType.getType());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/ConsistencyCheckJobNotFoundException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/ConsistencyCheckJobNotFoundException.java
index 0dba4ad0d95..40090d436cd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/ConsistencyCheckJobNotFoundException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/ConsistencyCheckJobNotFoundException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Can not find consistency check job exception.
  */
-public final class ConsistencyCheckJobNotFoundException extends 
PipelineSQLException {
+public final class ConsistencyCheckJobNotFoundException extends 
PipelineJobException {
     
     private static final long serialVersionUID = 6881217592831423520L;
     
     public ConsistencyCheckJobNotFoundException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 95, String.format("Can not find 
consistency check job of `%s`.", jobId));
+        super(XOpenSQLState.GENERAL_ERROR, 12, "Can not find consistency check 
job of '%s'.", jobId);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/syntax/CreateTableSQLGenerateException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/CreateTableSQLGenerateException.java
similarity index 78%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/syntax/CreateTableSQLGenerateException.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/CreateTableSQLGenerateException.java
index 286c92a8bc9..e3196bb51aa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/syntax/CreateTableSQLGenerateException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/CreateTableSQLGenerateException.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.exception.syntax;
+package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Create table SQL generate exception.
  */
-public final class CreateTableSQLGenerateException extends 
PipelineSQLException {
+public final class CreateTableSQLGenerateException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -219467568498936298L;
     
     public CreateTableSQLGenerateException(final String tableName) {
-        super(XOpenSQLState.GENERAL_ERROR, 20, String.format("Failed to get 
DDL for table `%s`.", tableName));
+        super(XOpenSQLState.GENERAL_ERROR, 14, "Failed to get DDL for table 
'%s'.", tableName);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/MissingRequiredTargetDatabaseException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/MissingRequiredTargetDatabaseException.java
index ea04b5962fa..3e72baf662d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/MissingRequiredTargetDatabaseException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/MissingRequiredTargetDatabaseException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Missing required target database exception.
  */
-public final class MissingRequiredTargetDatabaseException extends 
PipelineSQLException {
+public final class MissingRequiredTargetDatabaseException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -1557471818392592482L;
     
     public MissingRequiredTargetDatabaseException(final String databaseName) {
-        super(XOpenSQLState.CHECK_OPTION_VIOLATION, 4, String.format("Target 
database `%s` isn't exist.", databaseName));
+        super(XOpenSQLState.NOT_FOUND, 0, "Target database '%s' does not 
exist.", databaseName);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
index 80443889282..331648148f5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
@@ -17,21 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Pipeline importer job write exception.
  */
-public final class PipelineImporterJobWriteException extends 
PipelineSQLException {
+public final class PipelineImporterJobWriteException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -7924663094479253130L;
     
     public PipelineImporterJobWriteException(final Exception cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data 
failed.", cause);
-    }
-    
-    public PipelineImporterJobWriteException(final String reason, final 
Exception cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 91, reason, cause);
+        super(XOpenSQLState.GENERAL_ERROR, 10, "Importer job write data 
failed.", cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
index 6f3351972cc..d027fbedfe4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobCreationWithInvalidShardingCountException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Pipeline job creation with invalid sharding count exception.
  */
-public final class PipelineJobCreationWithInvalidShardingCountException 
extends PipelineSQLException {
+public final class PipelineJobCreationWithInvalidShardingCountException 
extends PipelineJobException {
     
     private static final long serialVersionUID = 5829502315976905271L;
     
     public PipelineJobCreationWithInvalidShardingCountException(final String 
jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 82, String.format("Sharding count 
of job `%s` is 0.", jobId));
+        super(XOpenSQLState.CHECK_OPTION_VIOLATION, 2, "Sharding count of job 
'%s' is 0.", jobId);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobException.java
similarity index 54%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobException.java
index 80443889282..d42e4e8fa38 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineImporterJobWriteException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobException.java
@@ -17,21 +17,29 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
+import com.google.common.base.Preconditions;
+import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.SQLState;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Pipeline importer job write exception.
+ * Pipeline job exception.
  */
-public final class PipelineImporterJobWriteException extends 
PipelineSQLException {
+public abstract class PipelineJobException extends PipelineSQLException {
     
-    private static final long serialVersionUID = -7924663094479253130L;
+    private static final long serialVersionUID = -5622432104488993484L;
     
-    public PipelineImporterJobWriteException(final Exception cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 91, "Importer job write data 
failed.", cause);
+    private static final int JOB_CODE = 1;
+    
+    protected PipelineJobException(final SQLState sqlState, final int 
errorCode, final String reason, final Object... messageArgs) {
+        super(sqlState, getErrorCode(errorCode), reason, messageArgs);
+    }
+    
+    protected PipelineJobException(final SQLState sqlState, final int 
errorCode, final String reason, final Exception cause) {
+        super(sqlState, getErrorCode(errorCode), reason, cause);
     }
     
-    public PipelineImporterJobWriteException(final String reason, final 
Exception cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 91, reason, cause);
+    private static int getErrorCode(final int errorCode) {
+        Preconditions.checkArgument(errorCode >= 0 && errorCode < 100, "The 
value range of error code should be [0, 100).");
+        return JOB_CODE * 100 + errorCode;
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobNotFoundException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobNotFoundException.java
index 1cb2c1c9a63..039ce479818 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobNotFoundException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PipelineJobNotFoundException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Pipeline job not found exception.
  */
-public final class PipelineJobNotFoundException extends PipelineSQLException {
+public final class PipelineJobNotFoundException extends PipelineJobException {
     
     private static final long serialVersionUID = -903289953649758722L;
     
     public PipelineJobNotFoundException(final String jobId) {
-        super(XOpenSQLState.GENERAL_ERROR, 80, String.format("Can not find 
pipeline job `%s`.", jobId));
+        super(XOpenSQLState.NOT_FOUND, 1, "Can not find pipeline job '%s'.", 
jobId);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
index 89a9a9d5b0f..b574b154b4a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithCheckPrivilegeFailedException.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 import java.sql.SQLException;
@@ -25,11 +24,11 @@ import java.sql.SQLException;
 /**
  * Prepare job with check privilege failed exception.
  */
-public final class PrepareJobWithCheckPrivilegeFailedException extends 
PipelineSQLException {
+public final class PrepareJobWithCheckPrivilegeFailedException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -8462039913248251254L;
     
     public PrepareJobWithCheckPrivilegeFailedException(final SQLException 
cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 89, String.format("Check 
privileges failed on source data source, reason is: %s", cause.getMessage()), 
cause);
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 9, "Check privileges failed 
on source data source.", cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
index f062b307b76..bc518bf5af6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithGetBinlogPositionException.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 import java.sql.SQLException;
@@ -25,11 +24,11 @@ import java.sql.SQLException;
 /**
  * Prepare job with get binlog position.
  */
-public final class PrepareJobWithGetBinlogPositionException extends 
PipelineSQLException {
+public final class PrepareJobWithGetBinlogPositionException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -3701189611685636704L;
     
     public PrepareJobWithGetBinlogPositionException(final String jobId, final 
SQLException cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 92, String.format("Get 
binlog position failed by job `%s`, reason is: %s", jobId, cause.getMessage()), 
cause);
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 11, String.format("Get 
binlog position failed by job '%s'.", jobId), cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
index d9043d29732..df116aca476 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithInvalidSourceDataSourceException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Prepare job with invalid source data source exception.
  */
-public final class PrepareJobWithInvalidSourceDataSourceException extends 
PipelineSQLException {
+public final class PrepareJobWithInvalidSourceDataSourceException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -7710035889344958565L;
     
     public PrepareJobWithInvalidSourceDataSourceException(final String 
dataSourceKey, final String toBeCheckedValue, final String actualValue) {
-        super(XOpenSQLState.GENERAL_ERROR, 87, String.format("Source data 
source required `%s = %s`, now is `%s`.", dataSourceKey, toBeCheckedValue, 
actualValue));
+        super(XOpenSQLState.GENERAL_ERROR, 7, "Source data source required '%s 
= %s', now is '%s'.", dataSourceKey, toBeCheckedValue, actualValue);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
index c066573fa56..9f541acd06f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithTargetTableNotEmptyException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Prepare job with target table not empty exception.
  */
-public final class PrepareJobWithTargetTableNotEmptyException extends 
PipelineSQLException {
+public final class PrepareJobWithTargetTableNotEmptyException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -8462039913248251254L;
     
     public PrepareJobWithTargetTableNotEmptyException(final String tableName) {
-        super(XOpenSQLState.GENERAL_ERROR, 85, String.format("Target table 
`%s` is not empty.", tableName));
+        super(XOpenSQLState.GENERAL_ERROR, 5, "Target table '%s' is not 
empty.", tableName);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
index b7ae0a99223..e17a05f0c82 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutEnoughPrivilegeException.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 import java.util.Collection;
@@ -25,11 +24,11 @@ import java.util.Collection;
 /**
  * Prepare job without enough privilege exception.
  */
-public final class PrepareJobWithoutEnoughPrivilegeException extends 
PipelineSQLException {
+public final class PrepareJobWithoutEnoughPrivilegeException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -8462039913248251254L;
     
     public PrepareJobWithoutEnoughPrivilegeException(final Collection<String> 
privileges) {
-        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 86, String.format("Source 
data source lacks %s privilege(s).", privileges));
+        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 6, "Source data source 
lacks '%s' privilege(s).", privileges);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
index 69deab65aa0..3d16809074c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/PrepareJobWithoutUserException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Prepare job without user exception.
  */
-public final class PrepareJobWithoutUserException extends PipelineSQLException 
{
+public final class PrepareJobWithoutUserException extends PipelineJobException 
{
     
     private static final long serialVersionUID = 7250019436391155770L;
     
     public PrepareJobWithoutUserException(final String username) {
-        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 88, String.format("User 
`%s` does exist.", username));
+        super(XOpenSQLState.PRIVILEGE_NOT_GRANTED, 8, "User '%s' does exist.", 
username);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByUniqueKeyException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByUniqueKeyException.java
index e0f8efa7c47..7e5bf03b301 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByUniqueKeyException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByUniqueKeyException.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 import java.sql.SQLException;
@@ -25,11 +24,11 @@ import java.sql.SQLException;
 /**
  * Split pipeline job by unique key exception.
  */
-public final class SplitPipelineJobByUniqueKeyException extends 
PipelineSQLException {
+public final class SplitPipelineJobByUniqueKeyException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -7804078676439253443L;
     
     public SplitPipelineJobByUniqueKeyException(final String tableName, final 
String uniqueKey, final SQLException cause) {
-        super(XOpenSQLState.GENERAL_ERROR, 84, String.format("Can not split by 
unique key `%s` for table `%s`, reason is: %s", uniqueKey, tableName, 
cause.getMessage()), cause);
+        super(XOpenSQLState.GENERAL_ERROR, 4, String.format("Can not split by 
unique key '%s' for table '%s'.", uniqueKey, tableName), cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByRangeException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TableNotFoundWhenSplitPipelineJobByRangeException.java
similarity index 69%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByRangeException.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TableNotFoundWhenSplitPipelineJobByRangeException.java
index 3cfeee6f98f..6dab6c02a6c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/SplitPipelineJobByRangeException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/TableNotFoundWhenSplitPipelineJobByRangeException.java
@@ -17,17 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Split pipeline job by range exception.
+ * Table not found when split pipeline job by range exception.
  */
-public final class SplitPipelineJobByRangeException extends 
PipelineSQLException {
+public final class TableNotFoundWhenSplitPipelineJobByRangeException extends 
PipelineJobException {
     
     private static final long serialVersionUID = -8509592086832334026L;
     
-    public SplitPipelineJobByRangeException(final String tableName, final 
String reason) {
-        super(XOpenSQLState.GENERAL_ERROR, 83, String.format("Can not split by 
range for table `%s`, reason is: %s", tableName, reason));
+    public TableNotFoundWhenSplitPipelineJobByRangeException(final String 
tableName) {
+        super(XOpenSQLState.NOT_FOUND, 3, "Can not get meta data for table 
'%s' when split by range.", tableName);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
index eb584f2a043..e3c9f77a4dc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/job/UncompletedConsistencyCheckJobExistsException.java
@@ -18,17 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.core.exception.job;
 
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.kernel.category.PipelineSQLException;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
 
 /**
  * Uncompleted consistency check job exists exception.
  */
-public final class UncompletedConsistencyCheckJobExistsException extends 
PipelineSQLException {
+public final class UncompletedConsistencyCheckJobExistsException extends 
PipelineJobException {
     
     private static final long serialVersionUID = 2854259384634892428L;
     
     public UncompletedConsistencyCheckJobExistsException(final String jobId, 
final ConsistencyCheckJobItemProgress progress) {
-        super(XOpenSQLState.GENERAL_ERROR, 96, String.format("Uncompleted 
consistency check job `%s` exists, progress `%s`", jobId, progress));
+        super(XOpenSQLState.GENERAL_ERROR, 13, "Uncompleted consistency check 
job '%s' exists, progress '%s'.", jobId, progress);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/param/PipelineInvalidParameterException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/param/PipelineInvalidParameterException.java
index c6aed43ec31..2a2d0cda666 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/param/PipelineInvalidParameterException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/param/PipelineInvalidParameterException.java
@@ -28,6 +28,6 @@ public final class PipelineInvalidParameterException extends 
PipelineSQLExceptio
     private static final long serialVersionUID = -2162309404414015630L;
     
     public PipelineInvalidParameterException(final String message) {
-        super(XOpenSQLState.INVALID_PARAMETER_VALUE, 5, "There is invalid 
parameter value: %s.", message);
+        super(XOpenSQLState.INVALID_PARAMETER_VALUE, 0, "There is invalid 
parameter value '%s'.", message);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtils.java
index b3d6fc1ec41..b7ccd0f4da7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataUtils.java
@@ -22,7 +22,7 @@ import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByRangeException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.TableNotFoundWhenSplitPipelineJobByRangeException;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import java.util.Collection;
@@ -46,7 +46,7 @@ public final class PipelineTableMetaDataUtils {
      */
     public static List<PipelineColumnMetaData> getUniqueKeyColumns(final 
String schemaName, final String tableName, final PipelineTableMetaDataLoader 
metaDataLoader) {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaName, tableName);
-        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
SplitPipelineJobByRangeException(tableName, "Can not get table meta data"));
+        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
TableNotFoundWhenSplitPipelineJobByRangeException(tableName));
         List<String> primaryKeys = tableMetaData.getPrimaryKeyColumns();
         if (!primaryKeys.isEmpty()) {
             return 
primaryKeys.stream().map(tableMetaData::getColumnMetaData).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 11a21830329..659777adb18 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -68,7 +68,7 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
         }
         result = tableMetaDataMap.get(new 
CaseInsensitiveIdentifier(tableName));
         if (null == result) {
-            log.warn("getTableMetaData, can not load meta data for table 
'{}'", tableName);
+            log.warn("Can not load meta data for table '{}'", tableName);
         }
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
index b79f152a811..a06fa80441b 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/sqlbuilder/MySQLPipelineSQLBuilder.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder;
 
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index d204d71fb02..534710f5b1d 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder;
 
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
index e0a8b73e81d..79254e833db 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/h2/sqlbuilder/H2PipelineSQLBuilder.java
@@ -17,7 +17,7 @@
 
 package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.h2.sqlbuilder;
 
-import 
org.apache.shardingsphere.data.pipeline.core.exception.syntax.CreateTableSQLGenerateException;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.job.CreateTableSQLGenerateException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;

Reply via email to