This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 56a258187c0 Refactor PipelineJobPreparer (#29427)
56a258187c0 is described below

commit 56a258187c004b1932f2115e1d8e04eae6cd2da3
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Dec 17 22:35:44 2023 +0800

    Refactor PipelineJobPreparer (#29427)
    
    * Refactor DefaultDataSourcePrepareOption
    
    * Rename DialectDataSourcePrepareOption
    
    * Move DialectDataSourcePrepareOption
    
    * Add checker package
    
    * Refactor DataSourceCheckEngine
    
    * Refactor DataSourceCheckEngine
    
    * Refactor InventoryTaskSplitter
    
    * Move InventoryTaskSplitter
    
    * Refactor CreateTableConfiguration
    
    * Rename PipelineJobDataSourcePreparer
    
    * Rename PipelineJobPreparer
    
    * Refactor PipelineJobPreparer
    
    * Refactor PipelineJobPreparer
    
    * Refactor PipelineJobPreparer
    
    * Refactor PipelineJobPreparer
---
 .../DataSourceCheckEngine.java                     | 22 ++++----
 .../DialectDataSourceChecker.java                  |  2 +-
 ...PreparerUtils.java => PipelineJobPreparer.java} | 66 +++++++++-------------
 ...rer.java => PipelineJobDataSourcePreparer.java} |  9 +--
 ...DefaultPipelineJobDataSourcePrepareOption.java} | 19 ++++++-
 ...DialectPipelineJobDataSourcePrepareOption.java} | 15 ++---
 .../param}/CreateTableConfiguration.java           |  4 +-
 .../param/PrepareTargetSchemasParameter.java       |  1 -
 .../param/PrepareTargetTablesParameter.java        |  1 -
 .../InventoryRecordsCountCalculator.java           |  2 +-
 .../{ => inventory}/InventoryTaskSplitter.java     |  4 +-
 ...tion.DialectPipelineJobDataSourcePrepareOption} |  2 +-
 .../datasource/DataSourceCheckEngineTest.java      |  1 +
 .../check/datasource/MySQLDataSourceChecker.java   |  2 +-
 ...pipeline.core.checker.DialectDataSourceChecker} |  0
 .../datasource/OpenGaussDataSourceChecker.java     |  2 +-
 ...enGaussPipelineJobDataSourcePrepareOption.java} |  6 +-
 ...pipeline.core.checker.DialectDataSourceChecker} |  0
 ...tion.DialectPipelineJobDataSourcePrepareOption} |  2 +-
 .../datasource/PostgreSQLDataSourceChecker.java    |  2 +-
 ...pipeline.core.checker.DialectDataSourceChecker} |  0
 .../data/pipeline/cdc/api/CDCJobAPI.java           |  8 ++-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  8 +--
 .../pipeline/scenario/migration/MigrationJob.java  |  9 +--
 .../config/MigrationTaskConfiguration.java         |  2 +-
 .../migration/preparer/MigrationJobPreparer.java   | 30 +++++-----
 .../core/fixture/FixtureDataSourceChecker.java     |  2 +-
 .../core/prepare/InventoryTaskSplitterTest.java    |  2 +-
 .../pipeline/core/util/PipelineContextUtils.java   |  2 +-
 ...pipeline.core.checker.DialectDataSourceChecker} |  0
 30 files changed, 110 insertions(+), 115 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
similarity index 83%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
index 6b8565fb5dd..89dbd1d2b75 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DataSourceCheckEngine.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package org.apache.shardingsphere.data.pipeline.core.checker;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 
@@ -37,13 +36,13 @@ import java.util.Collection;
  */
 public final class DataSourceCheckEngine {
     
-    private final DatabaseType databaseType;
+    private final DialectDataSourceChecker checker;
     
-    private final DialectDataSourceChecker dialectDataSourceChecker;
+    private final PipelineCommonSQLBuilder sqlBuilder;
     
     public DataSourceCheckEngine(final DatabaseType databaseType) {
-        this.databaseType = databaseType;
-        dialectDataSourceChecker = 
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, 
databaseType).orElse(null);
+        checker = 
DatabaseTypedSPILoader.findService(DialectDataSourceChecker.class, 
databaseType).orElse(null);
+        sqlBuilder = new PipelineCommonSQLBuilder(databaseType);
     }
     
     /**
@@ -87,8 +86,7 @@ public final class DataSourceCheckEngine {
     }
     
     private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
-        PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(databaseType);
-        String sql = pipelineSQLBuilder.buildCheckEmptySQL(schemaName, 
tableName);
+        String sql = sqlBuilder.buildCheckEmptySQL(schemaName, tableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
@@ -103,11 +101,11 @@ public final class DataSourceCheckEngine {
      * @param dataSources data sources
      */
     public void checkPrivilege(final Collection<? extends DataSource> 
dataSources) {
-        if (null == dialectDataSourceChecker) {
+        if (null == checker) {
             return;
         }
         for (DataSource each : dataSources) {
-            dialectDataSourceChecker.checkPrivilege(each);
+            checker.checkPrivilege(each);
         }
     }
     
@@ -117,11 +115,11 @@ public final class DataSourceCheckEngine {
      * @param dataSources data sources
      */
     public void checkVariable(final Collection<? extends DataSource> 
dataSources) {
-        if (null == dialectDataSourceChecker) {
+        if (null == checker) {
             return;
         }
         for (DataSource each : dataSources) {
-            dialectDataSourceChecker.checkVariable(each);
+            checker.checkVariable(each);
         }
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
similarity index 95%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
index 0f9bca5cea9..946f6b53e71 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/spi/datasource/DialectDataSourceChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/DialectDataSourceChecker.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.spi.datasource;
+package org.apache.shardingsphere.data.pipeline.core.checker;
 
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
similarity index 71%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
index 8be1d3e1e3d..51d3a5e17d1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparer.java
@@ -17,21 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer;
 
-import lombok.AccessLevel;
-import lombok.NoArgsConstructor;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourceCheckEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePreparer;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.PipelineJobDataSourcePreparer;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
@@ -41,7 +40,6 @@ import 
org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import org.apache.shardingsphere.parser.rule.SQLParserRule;
@@ -52,60 +50,56 @@ import java.util.Collection;
 import java.util.Optional;
 
 /**
- * Pipeline job preparer utility class.
+ * Pipeline job preparer.
  */
-@NoArgsConstructor(access = AccessLevel.PRIVATE)
+@RequiredArgsConstructor
 @Slf4j
-public final class PipelineJobPreparerUtils {
+public final class PipelineJobPreparer {
+    
+    private final DatabaseType databaseType;
     
     /**
      * Is incremental supported.
      *
-     * @param databaseType database type
-     * @return true if supported, otherwise false
+     * @return support incremental or not
      */
-    public static boolean isIncrementalSupported(final DatabaseType 
databaseType) {
+    public boolean isIncrementalSupported() {
         return 
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class, 
databaseType).map(IncrementalDumperCreator::isSupportIncrementalDump).orElse(false);
     }
     
     /**
      * Prepare target schema.
      *
-     * @param databaseType database type
-     * @param prepareTargetSchemasParam prepare target schemas parameter
+     * @param param prepare target schemas parameter
      * @throws SQLException if prepare target schema fail
      */
-    public static void prepareTargetSchema(final DatabaseType databaseType, 
final PrepareTargetSchemasParameter prepareTargetSchemasParam) throws 
SQLException {
-        DataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
-                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
-        new 
DataSourcePreparer(option).prepareTargetSchemas(prepareTargetSchemasParam);
+    public void prepareTargetSchema(final PrepareTargetSchemasParameter param) 
throws SQLException {
+        DialectPipelineJobDataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
+                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 null));
+        new PipelineJobDataSourcePreparer(option).prepareTargetSchemas(param);
     }
     
     /**
      * Get SQL parser engine.
      *
      * @param metaData meta data
-     * @param targetDatabaseName target database name
      * @return SQL parser engine
      */
-    public static SQLParserEngine getSQLParserEngine(final 
ShardingSphereMetaData metaData, final String targetDatabaseName) {
-        ShardingSphereDatabase database = 
metaData.getDatabase(targetDatabaseName);
-        DatabaseType databaseType = 
database.getProtocolType().getTrunkDatabaseType().orElse(database.getProtocolType());
+    public SQLParserEngine getSQLParserEngine(final ShardingSphereMetaData 
metaData) {
         return 
metaData.getGlobalRuleMetaData().getSingleRule(SQLParserRule.class).getSQLParserEngine(databaseType);
     }
     
     /**
      * Prepare target tables.
      *
-     * @param databaseType database type
      * @param prepareTargetTablesParam prepare target tables parameter
      * @throws SQLException SQL exception
      */
-    public static void prepareTargetTables(final DatabaseType databaseType, 
final PrepareTargetTablesParameter prepareTargetTablesParam) throws 
SQLException {
-        DataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DataSourcePrepareOption.class, databaseType)
-                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DataSourcePrepareOption.class, null));
+    public void prepareTargetTables(final PrepareTargetTablesParameter 
prepareTargetTablesParam) throws SQLException {
+        DialectPipelineJobDataSourcePrepareOption option = 
DatabaseTypedSPILoader.findService(DialectPipelineJobDataSourcePrepareOption.class,
 databaseType)
+                .orElseGet(() -> 
DatabaseTypedSPILoader.getService(DialectPipelineJobDataSourcePrepareOption.class,
 null));
         long startTimeMillis = System.currentTimeMillis();
-        new 
DataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
+        new 
PipelineJobDataSourcePreparer(option).prepareTargetTables(prepareTargetTablesParam);
         log.info("prepareTargetTables cost {} ms", System.currentTimeMillis() 
- startTimeMillis);
     }
     
@@ -118,15 +112,14 @@ public final class PipelineJobPreparerUtils {
      * @return ingest position
      * @throws SQLException sql exception
      */
-    public static IngestPosition getIncrementalPosition(final 
JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext 
dumperContext,
-                                                        final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
+    public IngestPosition getIncrementalPosition(final 
JobItemIncrementalTasksProgress initIncremental, final IncrementalDumperContext 
dumperContext,
+                                                 final 
PipelineDataSourceManager dataSourceManager) throws SQLException {
         if (null != initIncremental) {
             Optional<IngestPosition> position = 
initIncremental.getIncrementalPosition();
             if (position.isPresent()) {
                 return position.get();
             }
         }
-        DatabaseType databaseType = 
dumperContext.getCommonContext().getDataSourceConfig().getDatabaseType();
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig());
         return DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType).init(dataSource, dumperContext.getJobId());
     }
@@ -134,10 +127,9 @@ public final class PipelineJobPreparerUtils {
     /**
      * Check data source.
      *
-     * @param databaseType database type
      * @param dataSources data source
      */
-    public static void checkSourceDataSource(final DatabaseType databaseType, 
final Collection<? extends DataSource> dataSources) {
+    public void checkSourceDataSource(final Collection<? extends DataSource> 
dataSources) {
         if (dataSources.isEmpty()) {
             return;
         }
@@ -150,11 +142,10 @@ public final class PipelineJobPreparerUtils {
     /**
      * Check target data source.
      *
-     * @param databaseType database type
      * @param importerConfig importer config
      * @param targetDataSources target data sources
      */
-    public static void checkTargetDataSource(final DatabaseType databaseType, 
final ImporterConfiguration importerConfig, final Collection<? extends 
DataSource> targetDataSources) {
+    public void checkTargetDataSource(final ImporterConfiguration 
importerConfig, final Collection<? extends DataSource> targetDataSources) {
         if (null == targetDataSources || targetDataSources.isEmpty()) {
             log.info("target data source is empty, skip check");
             return;
@@ -167,12 +158,11 @@ public final class PipelineJobPreparerUtils {
     /**
      * Cleanup job preparer.
      *
-     * @param jobId job id
+     * @param jobId pipeline job id
      * @param pipelineDataSourceConfig pipeline data source config
-     * @throws SQLException sql exception
+     * @throws SQLException SQL exception
      */
-    public static void destroyPosition(final String jobId, final 
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
-        DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
+    public void destroyPosition(final String jobId, final 
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         PositionInitializer positionInitializer = 
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
         final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup database type:{}, data source type:{}", 
databaseType.getType(), pipelineDataSourceConfig.getType());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
similarity index 95%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index d738edeba8e..3f9ae7a8bc0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -23,7 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineCommonSQLBuilder;
@@ -42,17 +43,17 @@ import java.util.Optional;
 import java.util.regex.Pattern;
 
 /**
- * Data source preparer.
+ * Pipeline job data source preparer.
  */
 @RequiredArgsConstructor
 @Slf4j
-public final class DataSourcePreparer {
+public final class PipelineJobDataSourcePreparer {
     
     private static final Pattern PATTERN_CREATE_TABLE_IF_NOT_EXISTS = 
Pattern.compile("CREATE\\s+TABLE\\s+IF\\s+NOT\\s+EXISTS\\s+", 
Pattern.CASE_INSENSITIVE);
     
     private static final Pattern PATTERN_CREATE_TABLE = 
Pattern.compile("CREATE\\s+TABLE\\s+", Pattern.CASE_INSENSITIVE);
     
-    private final DataSourcePrepareOption option;
+    private final DialectPipelineJobDataSourcePrepareOption option;
     
     /**
      * Prepare target schemas.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
similarity index 66%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
index fd22b23e244..34a42ed258e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DefaultDataSourcePrepareOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DefaultPipelineJobDataSourcePrepareOption.java
@@ -15,12 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
- * Default data source prepare option.
+ * Default pipeline job data source prepare option.
  */
-public final class DefaultDataSourcePrepareOption implements 
DataSourcePrepareOption {
+public final class DefaultPipelineJobDataSourcePrepareOption implements 
DialectPipelineJobDataSourcePrepareOption {
+    
+    @Override
+    public boolean isSupportIfNotExistsOnCreateSchema() {
+        return true;
+    }
+    
+    @Override
+    public Collection<String> getIgnoredExceptionMessages() {
+        return Collections.emptyList();
+    }
     
     @Override
     public boolean isDefault() {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
similarity index 79%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
index 52bc1962bcd..70a0d7944ac 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourcePrepareOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/option/DialectPipelineJobDataSourcePrepareOption.java
@@ -15,35 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
+package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option;
 
 import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPI;
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 
 import java.util.Collection;
-import java.util.Collections;
 
 /**
- * Data source prepare option.
+ * Dialect pipeline job data source prepare option.
  */
 @SingletonSPI
-public interface DataSourcePrepareOption extends DatabaseTypedSPI {
+public interface DialectPipelineJobDataSourcePrepareOption extends 
DatabaseTypedSPI {
     
     /**
      * Is support if not exists on create schema SQL.
      * 
      * @return supported or not
      */
-    default boolean isSupportIfNotExistsOnCreateSchema() {
-        return true;
-    }
+    boolean isSupportIfNotExistsOnCreateSchema();
     
     /**
      * Get ignored exception messages.
      *
      * @return ignored exception messages
      */
-    default Collection<String> getIgnoredExceptionMessages() {
-        return Collections.emptyList();
-    }
+    Collection<String> getIgnoredExceptionMessages();
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
similarity index 90%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
index 2f7d284e103..737e16159bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/CreateTableConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/CreateTableConfiguration.java
@@ -15,11 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 
@@ -28,7 +27,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQual
  */
 @RequiredArgsConstructor
 @Getter
-@ToString(exclude = {"sourceDataSourceConfig", "targetDataSourceConfig"})
 public final class CreateTableConfiguration {
     
     private final PipelineDataSourceConfiguration sourceDataSourceConfig;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
index 7b4cf8e8d36..a4ae2cc3942 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetSchemasParameter.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
index 348c931eb0a..26487299886 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/param/PrepareTargetTablesParameter.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
 
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
similarity index 98%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
index f910490fe56..894d5d55f91 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryRecordsCountCalculator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
similarity index 99%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
index 74d1bf53d04..95d9c9e4eaa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/InventoryTaskSplitter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.preparer;
+package org.apache.shardingsphere.data.pipeline.core.preparer.inventory;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Inventory data task splitter.
+ * Inventory task splitter.
  */
 @RequiredArgsConstructor
 @Slf4j
diff --git 
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
 
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
similarity index 94%
rename from 
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
rename to 
kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
index 6acc3318877..9a154a9272c 100644
--- 
a/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
+++ 
b/kernel/data-pipeline/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DefaultDataSourcePrepareOption
+org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DefaultPipelineJobDataSourcePrepareOption
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index 3f8f36c97a8..f117cf933dd 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DataSourceCheckEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
index 2bc39c2ecd4..36132ba2769 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/check/datasource/MySQLDataSourceChecker.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.check.datasource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidSourceDataSourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
 
b/kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to 
kernel/data-pipeline/dialect/mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
index eae8d6e6672..f9819c42f46 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/check/datasource/OpenGaussDataSourceChecker.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.check.datasource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
similarity index 85%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
index 846a041b4d2..2771c5d9353 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussDataSourcePrepareOption.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/prepare/datasource/OpenGaussPipelineJobDataSourcePrepareOption.java
@@ -18,16 +18,16 @@
 package org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption;
 
 import java.util.Arrays;
 import java.util.Collection;
 
 /**
- * Data source prepare option for openGauss.
+ * Pipeline job data source prepare option for openGauss.
  */
 @Slf4j
-public final class OpenGaussDataSourcePrepareOption implements 
DataSourcePrepareOption {
+public final class OpenGaussPipelineJobDataSourcePrepareOption implements 
DialectPipelineJobDataSourcePrepareOption {
     
     @Override
     public boolean isSupportIfNotExistsOnCreateSchema() {
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
similarity index 94%
rename from 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
rename to 
kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
index 72923bbe665..e1fd88a4da6 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.DataSourcePrepareOption
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.preparer.datasource.option.DialectPipelineJobDataSourcePrepareOption
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussDataSourcePrepareOption
+org.apache.shardingsphere.data.pipeline.opengauss.prepare.datasource.OpenGaussPipelineJobDataSourcePrepareOption
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
index c8e00146663..693afc16076 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/check/datasource/PostgreSQLDataSourceChecker.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithCheckPrivilegeFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutEnoughPrivilegeException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithoutUserException;
-import 
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
 
b/kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 8474fcfe2e9..04ef66dcac7 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -52,7 +52,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -202,7 +202,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
         TransmissionJobItemProgress result = new TransmissionJobItemProgress();
         result.setSourceDatabaseType(jobConfig.getSourceDatabaseType());
         
result.setDataSourceName(incrementalDumperContext.getCommonContext().getDataSourceName());
-        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(PipelineJobPreparerUtils.getIncrementalPosition(null, 
incrementalDumperContext, dataSourceManager));
+        IncrementalTaskProgress incrementalTaskProgress = new 
IncrementalTaskProgress(new PipelineJobPreparer(
+                
incrementalDumperContext.getCommonContext().getDataSourceConfig().getDatabaseType()).getIncrementalPosition(null,
 incrementalDumperContext, dataSourceManager));
         result.setIncremental(new 
JobItemIncrementalTasksProgress(incrementalTaskProgress));
         return result;
     }
@@ -258,7 +259,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     private void cleanup(final CDCJobConfiguration jobConfig) {
         for (Entry<String, Map<String, Object>> entry : 
jobConfig.getDataSourceConfig().getRootConfig().getDataSources().entrySet()) {
             try {
-                PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), 
new StandardPipelineDataSourceConfiguration(entry.getValue()));
+                StandardPipelineDataSourceConfiguration 
pipelineDataSourceConfig = new 
StandardPipelineDataSourceConfiguration(entry.getValue());
+                new 
PipelineJobPreparer(pipelineDataSourceConfig.getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 pipelineDataSourceConfig);
             } catch (final SQLException ex) {
                 log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
             }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 82e1330c395..9b991c3e8e1 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -41,8 +41,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.ingest.dumper.IncrementalDumperCreator;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
@@ -106,8 +106,8 @@ public final class CDCJobPreparer {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            taskConfig.getDumperContext().getCommonContext().setPosition(
-                    
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            taskConfig.getDumperContext().getCommonContext().setPosition(new 
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
+                    .getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index af98652a279..7b276da8749 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -30,7 +30,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
@@ -45,7 +45,6 @@ import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.sql.SQLException;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -76,11 +75,7 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     }
     
     private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper mapper) {
-        Collection<CreateTableConfiguration> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
-            result.add(getCreateTableConfiguration(jobConfig, mapper, each));
-        }
-        return result;
+        return 
jobConfig.getTablesFirstDataNodes().getEntries().stream().map(each -> 
getCreateTableConfiguration(jobConfig, mapper, 
each)).collect(Collectors.toList());
     }
     
     private CreateTableConfiguration getCreateTableConfiguration(final 
MigrationJobConfiguration jobConfig, final TableAndSchemaNameMapper mapper, 
final JobDataNodeEntry jobDataNodeEntry) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
index 07aba7d6dd9..d36dd904cde 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/MigrationTaskConfiguration.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
index 69c4780ab64..cf01823a977 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/preparer/MigrationJobPreparer.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
@@ -47,8 +47,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparerUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.PrepareTargetTablesParameter;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -95,7 +95,8 @@ public final class MigrationJobPreparer {
         
ShardingSpherePreconditions.checkState(StandardPipelineDataSourceConfiguration.class.equals(
                 
jobItemContext.getTaskConfig().getDumperContext().getCommonContext().getDataSourceConfig().getClass()),
                 () -> new UnsupportedSQLOperationException("Migration 
inventory dumper only support StandardPipelineDataSourceConfiguration"));
-        
PipelineJobPreparerUtils.checkSourceDataSource(jobItemContext.getJobConfig().getSourceDatabaseType(),
 Collections.singleton(jobItemContext.getSourceDataSource()));
+        PipelineJobPreparer preparer = new 
PipelineJobPreparer(jobItemContext.getJobConfig().getSourceDatabaseType());
+        
preparer.checkSourceDataSource(Collections.singleton(jobItemContext.getSourceDataSource()));
         if (jobItemContext.isStopping()) {
             PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
@@ -105,7 +106,7 @@ public final class MigrationJobPreparer {
             PipelineJobRegistry.stop(jobItemContext.getJobId());
             return;
         }
-        boolean isIncrementalSupported = 
PipelineJobPreparerUtils.isIncrementalSupported(jobItemContext.getJobConfig().getSourceDatabaseType());
+        boolean isIncrementalSupported = preparer.isIncrementalSupported();
         if (isIncrementalSupported) {
             prepareIncremental(jobItemContext);
         }
@@ -157,8 +158,8 @@ public final class MigrationJobPreparer {
         TransmissionJobItemProgress initProgress = 
jobItemContext.getInitProgress();
         if (null == initProgress) {
             PipelineDataSourceWrapper targetDataSource = 
jobItemContext.getDataSourceManager().getDataSource(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
-            PipelineJobPreparerUtils.checkTargetDataSource(
-                    jobItemContext.getJobConfig().getTargetDatabaseType(), 
jobItemContext.getTaskConfig().getImporterConfig(), 
Collections.singleton(targetDataSource));
+            new 
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType()).checkTargetDataSource(
+                    jobItemContext.getTaskConfig().getImporterConfig(), 
Collections.singleton(targetDataSource));
         }
     }
     
@@ -167,18 +168,21 @@ public final class MigrationJobPreparer {
         Collection<CreateTableConfiguration> createTableConfigs = 
jobItemContext.getTaskConfig().getCreateTableConfigurations();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(jobItemContext.getJobConfig().getTargetDatabaseType(),
 createTableConfigs, dataSourceManager);
-        
PipelineJobPreparerUtils.prepareTargetSchema(jobItemContext.getJobConfig().getTargetDatabaseType(),
 prepareTargetSchemasParam);
+        PipelineJobPreparer targetDataSourcePreparer = new 
PipelineJobPreparer(jobItemContext.getJobConfig().getTargetDatabaseType());
+        
targetDataSourcePreparer.prepareTargetSchema(prepareTargetSchemasParam);
         ShardingSphereMetaData metaData = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
-        SQLParserEngine sqlParserEngine = 
PipelineJobPreparerUtils.getSQLParserEngine(metaData, 
jobConfig.getTargetDatabaseName());
-        
PipelineJobPreparerUtils.prepareTargetTables(jobItemContext.getJobConfig().getTargetDatabaseType(),
 new PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine));
+        SQLParserEngine sqlParserEngine = new PipelineJobPreparer(
+                
metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType().getTrunkDatabaseType().orElse(metaData.getDatabase(jobConfig.getTargetDatabaseName()).getProtocolType()))
+                        .getSQLParserEngine(metaData);
+        targetDataSourcePreparer.prepareTargetTables(new 
PrepareTargetTablesParameter(createTableConfigs, dataSourceManager, 
sqlParserEngine));
     }
     
     private void prepareIncremental(final MigrationJobItemContext 
jobItemContext) {
         MigrationTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         JobItemIncrementalTasksProgress initIncremental = null == 
jobItemContext.getInitProgress() ? null : 
jobItemContext.getInitProgress().getIncremental();
         try {
-            taskConfig.getDumperContext().getCommonContext().setPosition(
-                    
PipelineJobPreparerUtils.getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
+            taskConfig.getDumperContext().getCommonContext().setPosition(new 
PipelineJobPreparer(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType())
+                    .getIncrementalPosition(initIncremental, 
taskConfig.getDumperContext(), jobItemContext.getDataSourceManager()));
         } catch (final SQLException ex) {
             throw new 
PrepareJobWithGetBinlogPositionException(jobItemContext.getJobId(), ex);
         }
@@ -224,7 +228,7 @@ public final class MigrationJobPreparer {
     public void cleanup(final MigrationJobConfiguration jobConfig) {
         for (Entry<String, PipelineDataSourceConfiguration> entry : 
jobConfig.getSources().entrySet()) {
             try {
-                PipelineJobPreparerUtils.destroyPosition(jobConfig.getJobId(), 
entry.getValue());
+                new 
PipelineJobPreparer(entry.getValue().getDatabaseType()).destroyPosition(jobConfig.getJobId(),
 entry.getValue());
             } catch (final SQLException ex) {
                 log.warn("job destroying failed, jobId={}, dataSourceName={}", 
jobConfig.getJobId(), entry.getKey(), ex);
             }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
index e80538695ab..23018527cf3 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/FixtureDataSourceChecker.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 
-import 
org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker;
 
 import javax.sql.DataSource;
 
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
index 473baa149d5..9581f5452bb 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/prepare/InventoryTaskSplitterTest.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourc
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.pk.type.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.InventoryTaskSplitter;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.InventoryTaskSplitter;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 939ff8881a7..970e4ad2625 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -38,7 +38,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.yaml.swa
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.preparer.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.spi.algorithm.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
diff --git 
a/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
 
b/test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker
similarity index 100%
rename from 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.spi.datasource.DialectDataSourceChecker
rename to 
test/it/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.checker.DialectDataSourceChecker

Reply via email to