This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 414964caa8a Revise ImporterCreator and ImporterCreatorFactoryTest 
(#20144)
414964caa8a is described below

commit 414964caa8a43e53a5dc3adb7158d550750ca534
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 13 21:39:58 2022 +0800

    Revise ImporterCreator and ImporterCreatorFactoryTest (#20144)
    
    * Revise ImporterCreator and ImporterCreatorFactoryTest
    
    * Fix rat
    
    * Refactor PipelineDataSourceManager as interface, add default impl
    
    * Remove ImporterCreator and factory to api module spi package
---
 .../datasource/PipelineDataSourceManager.java}     | 21 +++++-
 .../pipeline/{spi => api}/importer/Importer.java   |  2 +-
 .../pipeline/spi}/importer/ImporterCreator.java    | 16 ++--
 .../spi}/importer/ImporterCreatorFactory.java      | 11 +--
 ....java => DefaultPipelineDataSourceManager.java} |  5 +-
 .../pipeline/core/importer/DefaultImporter.java    |  4 +-
 .../core}/importer/DefaultImporterCreator.java     | 21 +++---
 .../datasource/AbstractDataSourcePreparer.java     | 15 ++--
 .../datasource/PrepareTargetSchemasParameter.java  |  5 +-
 .../datasource/PrepareTargetTablesParameter.java   |  2 +-
 .../data/pipeline/core/task/IncrementalTask.java   |  8 +-
 .../data/pipeline/core/task/InventoryTask.java     |  6 +-
 .../core/util/PipelineJobPreparerUtils.java        |  2 +-
 .../scenario/rulealtered/RuleAlteredJob.java       |  7 +-
 .../rulealtered/RuleAlteredJobContext.java         |  2 +-
 .../rulealtered/RuleAlteredJobPreparer.java        |  2 +-
 .../rulealtered/prepare/InventoryTaskSplitter.java |  2 +-
 ...ere.data.pipeline.spi.importer.ImporterCreator} |  2 +-
 ...phere.scaling.core.job.importer.ImporterCreator | 18 -----
 .../datasource/MySQLDataSourcePreparer.java        |  2 +-
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  7 +-
 .../mysql/ingest/MySQLInventoryDumperTest.java     |  5 +-
 .../datasource/MySQLDataSourcePreparerTest.java    |  2 +-
 .../ingest/PostgreSQLJdbcDumperTest.java           |  5 +-
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  5 +-
 .../ingest/wal/WalEventConverterTest.java          |  7 +-
 .../shardingsphere-pipeline-test/pom.xml           | 12 ++-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  9 ++-
 .../core/api/impl/RuleAlteredJobAPIImplTest.java   | 12 +--
 .../consistency/DataConsistencyCheckerTest.java    |  6 +-
 ...a => DefaultPipelineDataSourceManagerTest.java} | 13 +++-
 .../pipeline/core/fixture/FixtureImporter.java     |  4 +-
 .../core/fixture/FixtureImporterCreator.java       |  6 +-
 .../FixturePipelineDataSourceConfiguration.java    | 36 ++++-----
 .../core/importer/DefaultImporterTest.java         |  2 +-
 .../core/importer/ImporterCreatorFactoryTest.java  | 71 ++++++++++++++++++
 .../pipeline/core/task/IncrementalTaskTest.java    |  6 +-
 .../data/pipeline/core/task/InventoryTaskTest.java | 12 +--
 .../rulealtered/RuleAlteredJobWorkerTest.java      |  6 +-
 .../prepare/InventoryTaskSplitterTest.java         |  7 +-
 .../job/importer/ImporterCreatorFactoryTest.java   | 87 ----------------------
 ...ere.data.pipeline.spi.importer.ImporterCreator} |  0
 42 files changed, 245 insertions(+), 228 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
similarity index 59%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
index 2ecb413685d..41f52958bda 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/PipelineDataSourceManager.java
@@ -15,12 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.importer;
+package org.apache.shardingsphere.data.pipeline.api.datasource;
 
-import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 
 /**
- * Importer.
+ * Pipeline data source manager.
  */
-public interface Importer extends LifecycleExecutor {
+public interface PipelineDataSourceManager {
+    
+    /**
+     * Get cached data source.
+     *
+     * @param dataSourceConfig data source configuration
+     * @return data source
+     */
+    PipelineDataSourceWrapper getDataSource(PipelineDataSourceConfiguration 
dataSourceConfig);
+    
+    /**
+     * Close, close cached data source.
+     */
+    void close();
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
index 2ecb413685d..29a1a104998 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/Importer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/importer/Importer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.importer;
+package org.apache.shardingsphere.data.pipeline.api.importer;
 
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
similarity index 79%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
index 20a96972b3d..8289e767199 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreator.java
@@ -15,25 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
+/**
+ * Importer creator.
+ */
 @SingletonSPI
 public interface ImporterCreator extends TypedSPI {
     
     /**
      * Create importer.
-     * @param importerConfig importerConfig
-     * @param dataSourceManager dataSourceManager
+     *
+     * @param importerConfig importer configuration
+     * @param dataSourceManager data source manager
      * @param channel channel
-     * @param jobProgressListener jobProgressListener
+     * @param jobProgressListener job progress listener
      * @return importer
      */
     Importer createImporter(ImporterConfiguration importerConfig, 
PipelineDataSourceManager dataSourceManager, PipelineChannel channel,
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
similarity index 87%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
index b10924edc3e..35e85f0d5f7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/ImporterCreatorFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/importer/ImporterCreatorFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.spi.importer;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 /**
- * Importer factory.
+ * Importer creator factory.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public final class ImporterCreatorFactory {
@@ -33,9 +33,10 @@ public final class ImporterCreatorFactory {
     }
     
     /**
-     * Get ImporterCreator.
-     * @param databaseType databaseType
-     * @return ImporterCreator
+     * Get importer creator instance.
+     *
+     * @param databaseType database type
+     * @return importer creator
      */
     public static ImporterCreator getInstance(final String databaseType) {
         return TypedSPIRegistry.getRegisteredService(ImporterCreator.class, 
databaseType);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
similarity index 91%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
index 8659174fe4b..66ad7498d3c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManager.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManager.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 
@@ -26,10 +27,10 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Pipeline data source manager.
+ * Default pipeline data source manager.
  */
 @Slf4j
-public final class PipelineDataSourceManager implements AutoCloseable {
+public final class DefaultPipelineDataSourceManager implements 
PipelineDataSourceManager {
     
     private final Map<PipelineDataSourceConfiguration, 
PipelineDataSourceWrapper> cachedDataSources = new ConcurrentHashMap<>();
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
index 28b87b12eb2..2e069cc5e4b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporter.java
@@ -21,7 +21,9 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -30,13 +32,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataReco
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
 import javax.sql.DataSource;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
similarity index 80%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
index 2e407671b8c..c8d5b4ce50f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/scaling/core/job/importer/DefaultImporterCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterCreator.java
@@ -15,22 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.scaling.core.job.importer;
+package org.apache.shardingsphere.data.pipeline.core.importer;
 
-import java.util.Collection;
-import java.util.LinkedList;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Default importer creator.
  */
 public final class DefaultImporterCreator implements ImporterCreator {
     
+    private static final Collection<String> TYPE_ALIASES = 
Collections.unmodifiableList(Arrays.asList("PostgreSQL", "openGauss"));
+    
     @Override
     public Importer createImporter(final ImporterConfiguration importerConfig,
                                    final PipelineDataSourceManager 
dataSourceManager, final PipelineChannel channel,
@@ -45,9 +49,6 @@ public final class DefaultImporterCreator implements 
ImporterCreator {
     
     @Override
     public Collection<String> getTypeAliases() {
-        Collection<String> aliases = new LinkedList<>();
-        aliases.add("PostgreSQL");
-        aliases.add("openGauss");
-        return aliases;
+        return TYPE_ALIASES;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
index a5bde33ac74..ed7a73235ea 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/AbstractDataSourcePreparer.java
@@ -17,23 +17,24 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Pattern;
+
 /**
  * Abstract data source preparer.
  */
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
index 5c4a4b382bc..df94812d018 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
@@ -17,14 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
 
-import java.util.List;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
+import java.util.List;
+
 /**
  * Prepare target schemas parameter.
  */
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
index cf399464508..9aaf034ba3a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetTablesParameter.java
@@ -21,8 +21,8 @@ import lombok.Getter;
 import lombok.NonNull;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 
 /**
  * Prepare target tables parameter.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index f21ca5e85d2..df8d0859487 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -22,23 +22,23 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
 
 import java.util.Collection;
 import java.util.LinkedList;
@@ -49,7 +49,7 @@ import java.util.concurrent.Future;
  * Incremental task.
  */
 @Slf4j
-@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", 
"importers", "progress"})
+@ToString(exclude = {"incrementalDumperExecuteEngine", "channel", "dumper", 
"importers", "taskProgress"})
 public final class IncrementalTask extends AbstractLifecycleExecutor 
implements PipelineTask, AutoCloseable {
     
     @Getter
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 401841be999..ffc9f328832 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -22,23 +22,23 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
 import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
 import org.apache.shardingsphere.scaling.core.job.dumper.DumperFactory;
-import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
 
 import javax.sql.DataSource;
 import java.util.List;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
index de6dcb4b892..71f590b1b32 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineJobPreparerUtils.java
@@ -20,13 +20,13 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
index 58768c0ac95..358e0af29b7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJob.java
@@ -19,14 +19,15 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.PipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.task.PipelineTasksRunner;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
@@ -42,7 +43,7 @@ import 
org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 @RequiredArgsConstructor
 public final class RuleAlteredJob extends AbstractPipelineJob implements 
SimpleJob, PipelineJob {
     
-    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     // Shared by all sharding items
     private final RuleAlteredJobPreparer jobPreparer = new 
RuleAlteredJobPreparer();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
index e2b2e4b8772..142497c64d8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobContext.java
@@ -25,11 +25,11 @@ import 
org.apache.commons.lang3.concurrent.ConcurrentException;
 import org.apache.commons.lang3.concurrent.LazyInitializer;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 304bc4fcc32..ebf4b7d2b68 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
@@ -28,7 +29,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrement
 import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 23c1a6e3d6a..5740d80df71 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfigu
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineProcessContext;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
@@ -34,7 +35,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimary
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
similarity index 91%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
index f7c30c08c48..377062ac343 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
+org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporterCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
deleted file mode 100644
index f7c30c08c48..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.scaling.core.job.importer.DefaultImporterCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
index 5c282fbfe0d..84a3879e5cf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparer.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeEntry;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.AbstractDataSourcePreparer;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 8ecb162cc34..09d0e2bc24d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -20,13 +20,14 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -70,7 +71,7 @@ public final class MySQLIncrementalDumperTest {
     
     private MultiplexMemoryPipelineChannel channel;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     @Mock
     private PipelineTableMetaData pipelineTableMetaData;
@@ -97,7 +98,7 @@ public final class MySQLIncrementalDumperTest {
     
     @SneakyThrows(SQLException.class)
     private void initTableData(final DumperConfiguration dumperConfig) {
-        DataSource dataSource = new 
PipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
+        DataSource dataSource = new 
DefaultPipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index a8bffd315fb..61892fd483a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -20,9 +20,10 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.junit.Before;
@@ -50,7 +51,7 @@ public final class MySQLInventoryDumperTest {
     
     @Before
     public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         InventoryDumperConfiguration dumperConfig = 
mockInventoryDumperConfiguration();
         PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         mysqlJdbcDumper = new 
MySQLInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
index 98c6d100a99..64995eeecfc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/prepare/datasource/MySQLDataSourcePreparerTest.java
@@ -19,11 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.prepare.datasource;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import org.junit.Before;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index ea3f6b9be84..d4388d1473f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -20,9 +20,10 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.junit.Before;
@@ -45,7 +46,7 @@ public final class PostgreSQLJdbcDumperTest {
     
     @Before
     public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         InventoryDumperConfiguration dumperConfig = 
mockInventoryDumperConfiguration();
         dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         jdbcDumper = new 
PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100),
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 8c0912d1874..fef17423483 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -19,11 +19,12 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -72,7 +73,7 @@ public final class PostgreSQLWalDumperTest {
     
     private MultiplexMemoryPipelineChannel channel;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     @Before
     public void setUp() {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index d0f7b954a82..16681b8b044 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -20,13 +20,14 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
@@ -53,7 +54,7 @@ public final class WalEventConverterTest {
     
     private WalEventConverter walEventConverter;
     
-    private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+    private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
     @Before
     public void setUp() {
@@ -77,7 +78,7 @@ public final class WalEventConverterTest {
     
     @SneakyThrows(SQLException.class)
     private void initTableData(final DumperConfiguration dumperConfig) {
-        DataSource dataSource = new 
PipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
+        DataSource dataSource = new 
DefaultPipelineDataSourceManager().getDataSource(dumperConfig.getDataSourceConfig());
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
diff --git a/shardingsphere-test/shardingsphere-pipeline-test/pom.xml 
b/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
index 7f0534a81c9..046ef5758df 100644
--- a/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
+++ b/shardingsphere-test/shardingsphere-pipeline-test/pom.xml
@@ -30,7 +30,17 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
-            <artifactId>shardingsphere-data-pipeline-core</artifactId>
+            <artifactId>shardingsphere-data-pipeline-mysql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-postgresql</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-data-pipeline-opengauss</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 3f8a59ac51f..41335152a85 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
@@ -126,7 +126,8 @@ public final class GovernanceRepositoryAPIImplTest {
     }
     
     private RuleAlteredJobContext mockJobItemContext() {
-        RuleAlteredJobContext result = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
InventoryIncrementalJobItemProgress(), new PipelineDataSourceManager());
+        RuleAlteredJobContext result = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(),
+                0, new InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager());
         TaskConfiguration taskConfig = result.getTaskConfig();
         result.getInventoryTasks().add(mockInventoryTask(taskConfig));
         result.getIncrementalTasks().add(mockIncrementalTask(taskConfig));
@@ -144,7 +145,7 @@ public final class GovernanceRepositoryAPIImplTest {
         PipelineDataSourceWrapper dataSource = 
mock(PipelineDataSourceWrapper.class);
         PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), 
PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), dataSource, metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
+                new DefaultPipelineDataSourceManager(), dataSource, 
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
     }
     
     private IncrementalTask mockIncrementalTask(final TaskConfiguration 
taskConfig) {
@@ -152,6 +153,6 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, 
taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
+                new DefaultPipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
index 9a465a59670..8a0ce93370d 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/RuleAlteredJobAPIImplTest.java
@@ -19,8 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.api.impl;
 
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCountCheckResult;
@@ -32,7 +30,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncreme
 import org.apache.shardingsphere.data.pipeline.api.pojo.JobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -200,7 +200,7 @@ public final class RuleAlteredJobAPIImplTest {
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         final GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new PipelineDataSourceManager());
+        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new DefaultPipelineDataSourceManager());
         ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         ruleAlteredJobAPI.updateJobItemStatus(jobId.get(), 0, 
JobStatus.FINISHED);
@@ -213,7 +213,7 @@ public final class RuleAlteredJobAPIImplTest {
         Optional<String> jobId = ruleAlteredJobAPI.start(jobConfig);
         assertTrue(jobId.isPresent());
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
-        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new PipelineDataSourceManager());
+        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new DefaultPipelineDataSourceManager());
         ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
         repositoryAPI.persistJobCheckResult(jobId.get(), true);
         ruleAlteredJobAPI.updateJobItemStatus(jobId.get(), 
jobItemContext.getShardingItem(), JobStatus.EXECUTE_INVENTORY_TASK);
@@ -257,7 +257,7 @@ public final class RuleAlteredJobAPIImplTest {
     @Test
     public void assertRenewJobStatus() {
         final RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new PipelineDataSourceManager());
+        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new DefaultPipelineDataSourceManager());
         ruleAlteredJobAPI.persistJobItemProgress(jobItemContext);
         ruleAlteredJobAPI.updateJobItemStatus(jobConfig.getJobId(), 0, 
JobStatus.FINISHED);
         InventoryIncrementalJobItemProgress actual = 
ruleAlteredJobAPI.getJobItemProgress(jobItemContext.getJobId(), 
jobItemContext.getShardingItem());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
index fbffa7cb359..743e5ee4f06 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyCheckerTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsist
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.DataConsistencyCalculateAlgorithmFixture;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
@@ -55,7 +55,7 @@ public final class DataConsistencyCheckerTest {
     
     private RuleAlteredJobConfiguration createJobConfiguration() throws 
SQLException {
         RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0,
-                new InventoryIncrementalJobItemProgress(), new 
PipelineDataSourceManager());
+                new InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager());
         
initTableData(jobItemContext.getTaskConfig().getDumperConfig().getDataSourceConfig());
         
initTableData(jobItemContext.getTaskConfig().getImporterConfig().getDataSourceConfig());
         return jobItemContext.getJobConfig();
@@ -63,7 +63,7 @@ public final class DataConsistencyCheckerTest {
     
     private void initTableData(final PipelineDataSourceConfiguration 
dataSourceConfig) throws SQLException {
         try (
-                Connection connection = new 
PipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
+                Connection connection = new 
DefaultPipelineDataSourceManager().getDataSource(dataSourceConfig).getConnection();
                 Statement statement = connection.createStatement()) {
             statement.execute("DROP TABLE IF EXISTS t_order");
             statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id VARCHAR(12))");
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
similarity index 86%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
index 7cf6418e7df..d7b80a38fac 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/PipelineDataSourceManagerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/DefaultPipelineDataSourceManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.datasource;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -36,7 +37,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-public final class PipelineDataSourceManagerTest {
+public final class DefaultPipelineDataSourceManagerTest {
     
     private RuleAlteredJobConfiguration jobConfig;
     
@@ -52,7 +53,7 @@ public final class PipelineDataSourceManagerTest {
     
     @Test
     public void assertGetDataSource() {
-        PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         DataSource actual = dataSourceManager.getDataSource(
                 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter()));
         assertThat(actual, instanceOf(PipelineDataSourceWrapper.class));
@@ -60,7 +61,9 @@ public final class PipelineDataSourceManagerTest {
     
     @Test
     public void assertClose() throws NoSuchFieldException, 
IllegalAccessException {
-        try (PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager()) {
+        PipelineDataSourceManager dataSourceManager = null;
+        try {
+            dataSourceManager = new DefaultPipelineDataSourceManager();
             dataSourceManager.getDataSource(
                     
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getSource().getType(),
 jobConfig.getSource().getParameter()));
             dataSourceManager.getDataSource(
@@ -70,6 +73,10 @@ public final class PipelineDataSourceManagerTest {
             assertThat(cachedDataSources.size(), is(2));
             dataSourceManager.close();
             assertTrue(cachedDataSources.isEmpty());
+        } finally {
+            if (null != dataSourceManager) {
+                dataSourceManager.close();
+            }
         }
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
index fdbd30b03dc..ec3d2ef7220 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporter.java
@@ -18,10 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
 
 public final class FixtureImporter implements Importer {
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
index 36484d23195..9d0ae14fdfb 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureImporterCreator.java
@@ -18,11 +18,11 @@
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator;
+import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
 
 /**
  * Fixture importer creator.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
similarity index 58%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
copy to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
index 5c4a4b382bc..1d03f7f84ca 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/datasource/PrepareTargetSchemasParameter.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixturePipelineDataSourceConfiguration.java
@@ -15,32 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.prepare.datasource;
+package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import java.util.List;
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 
-/**
- * Prepare target schemas parameter.
- */
 @RequiredArgsConstructor
-@Getter
-public final class PrepareTargetSchemasParameter {
-    
-    private final List<String> logicTableNames;
+public final class FixturePipelineDataSourceConfiguration implements 
PipelineDataSourceConfiguration {
     
-    private final DatabaseType targetDatabaseType;
+    private final DatabaseType databaseType;
     
-    private final String databaseName;
+    @Override
+    public String getParameter() {
+        return null;
+    }
     
-    private final PipelineDataSourceConfiguration dataSourceConfig;
+    @Override
+    public Object getDataSourceConfiguration() {
+        return null;
+    }
     
-    private final PipelineDataSourceManager dataSourceManager;
+    @Override
+    public DatabaseType getDatabaseType() {
+        return databaseType;
+    }
     
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    @Override
+    public String getType() {
+        return "FIXTURE";
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
index 49ab326e7a9..e1f29a5f5b3 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/DefaultImporterTest.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.core.importer;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -29,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.junit.Before;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
new file mode 100644
index 00000000000..9b09961c71c
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterCreatorFactoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.importer;
+
+import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
+import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
+import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
+import org.junit.Test;
+import org.mockito.Mock;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public final class ImporterCreatorFactoryTest {
+    
+    @Mock
+    private PipelineDataSourceManager dataSourceManager;
+    
+    @Mock
+    private PipelineChannel channel;
+    
+    @Test
+    public void assertCreateImporter() {
+        for (String each : Arrays.asList("MySQL", "PostgreSQL", "openGauss")) {
+            Importer actual = 
ImporterCreatorFactory.getInstance(each).createImporter(createImporterConfiguration(each),
 dataSourceManager, channel, new FixturePipelineJobProgressListener());
+            assertThat(actual, instanceOf(DefaultImporter.class));
+        }
+    }
+    
+    @Test
+    public void assertCreateImporterForH2() {
+        Importer actual = 
ImporterCreatorFactory.getInstance("H2").createImporter(createImporterConfiguration("H2"),
 dataSourceManager, channel, new FixturePipelineJobProgressListener());
+        assertThat(actual, instanceOf(FixtureImporter.class));
+    }
+    
+    private ImporterConfiguration createImporterConfiguration(final String 
databaseType) {
+        Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("t_order"), new 
HashSet<>(Arrays.asList("order_id", "user_id", "status")));
+        PipelineDataSourceConfiguration dataSourceConfig = new 
FixturePipelineDataSourceConfiguration(DatabaseTypeFactory.getInstance(databaseType));
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
+    }
+}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index 6eac12dddb6..bbbd8b323e7 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfig
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -49,12 +49,12 @@ public final class IncrementalTaskTest {
     @Before
     public void setUp() {
         TaskConfiguration taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
InventoryIncrementalJobItemProgress(),
-                new PipelineDataSourceManager()).getTaskConfig();
+                new DefaultPipelineDataSourceManager()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
         PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), 
taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(),
-                new PipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
+                new DefaultPipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
     }
     
     @Test
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index 0a0b42f39b1..fa507f89363 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -20,10 +20,11 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
@@ -45,7 +46,7 @@ import static org.junit.Assert.assertThat;
 
 public final class InventoryTaskTest {
     
-    private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new 
PipelineDataSourceManager();
+    private static final PipelineDataSourceManager DATA_SOURCE_MANAGER = new 
DefaultPipelineDataSourceManager();
     
     private TaskConfiguration taskConfig;
     
@@ -61,7 +62,7 @@ public final class InventoryTaskTest {
     
     @Before
     public void setUp() {
-        taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
InventoryIncrementalJobItemProgress(), new 
PipelineDataSourceManager()).getTaskConfig();
+        taskConfig = new 
RuleAlteredJobContext(JobConfigurationBuilder.createJobConfiguration(), 0, new 
InventoryIncrementalJobItemProgress(), new 
DefaultPipelineDataSourceManager()).getTaskConfig();
     }
     
     @Test(expected = IngestException.class)
@@ -87,15 +88,15 @@ public final class InventoryTaskTest {
         try (
                 InventoryTask inventoryTask = new 
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
-                        new PipelineDataSourceManager(), dataSource, 
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener())) {
+                        new DefaultPipelineDataSourceManager(), dataSource, 
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener())) {
             inventoryTask.start();
             assertThat(inventoryTask.getTaskProgress().getPosition(), 
instanceOf(IntegerPrimaryKeyPosition.class));
         }
     }
     
     private void initTableData(final DumperConfiguration dumperConfig) throws 
SQLException {
+        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         try (
-                PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement()) {
@@ -103,6 +104,7 @@ public final class InventoryTaskTest {
             statement.execute("CREATE TABLE t_order (order_id INT PRIMARY KEY, 
user_id VARCHAR(12))");
             statement.execute("INSERT INTO t_order (order_id, user_id) VALUES 
(1, 'xxx'), (999, 'yyy')");
         }
+        dataSourceManager.close();
     }
     
     private InventoryDumperConfiguration 
createInventoryDumperConfiguration(final String logicTableName, final String 
actualTableName) {
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
index d94acec2c48..a05b8849877 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobWorkerTest.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered;
 
 import org.apache.commons.io.FileUtils;
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.RuleAlteredJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.yaml.YamlRuleAlteredJobConfiguration;
@@ -27,7 +26,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
@@ -98,7 +98,7 @@ public final class RuleAlteredJobWorkerTest {
     // @Test
     public void assertHasUncompletedJob() throws InvocationTargetException, 
NoSuchMethodException, IllegalAccessException, IOException {
         final RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
-        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new PipelineDataSourceManager());
+        RuleAlteredJobContext jobItemContext = new 
RuleAlteredJobContext(jobConfig, 0, new InventoryIncrementalJobItemProgress(), 
new DefaultPipelineDataSourceManager());
         jobItemContext.setStatus(JobStatus.PREPARING);
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI();
         
RuleAlteredJobAPIFactory.getInstance().persistJobItemProgress(jobItemContext);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
index c7606c53531..449e484a0ac 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitterTest.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.prepare;
 
-import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.RuleAlteredJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.core.api.RuleAlteredJobAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -68,7 +69,7 @@ public final class InventoryTaskSplitterTest {
     private void initJobItemContext() {
         RuleAlteredJobConfiguration jobConfig = 
JobConfigurationBuilder.createJobConfiguration();
         InventoryIncrementalJobItemProgress initProgress = 
RuleAlteredJobAPIFactory.getInstance().getJobItemProgress(jobConfig.getJobId(), 
0);
-        jobItemContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, 
new PipelineDataSourceManager());
+        jobItemContext = new RuleAlteredJobContext(jobConfig, 0, initProgress, 
new DefaultPipelineDataSourceManager());
         dataSourceManager = jobItemContext.getDataSourceManager();
         taskConfig = jobItemContext.getTaskConfig();
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
deleted file mode 100644
index 2f54c27594d..00000000000
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/scaling/cor/job/importer/ImporterCreatorFactoryTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.scaling.cor.job.importer;
-
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
-import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.ImporterConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
-import org.apache.shardingsphere.data.pipeline.core.fixture.FixtureImporter;
-import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import org.apache.shardingsphere.data.pipeline.core.importer.DefaultImporter;
-import org.apache.shardingsphere.data.pipeline.spi.importer.Importer;
-import 
org.apache.shardingsphere.scaling.core.job.importer.ImporterCreatorFactory;
-import org.junit.Test;
-import org.mockito.Mock;
-
-public final class ImporterCreatorFactoryTest {
-    
-    @Mock
-    private PipelineDataSourceManager dataSourceManager;
-    
-    @Mock
-    private PipelineChannel channel;
-    
-    private final PipelineDataSourceConfiguration dataSourceConfig = new 
StandardPipelineDataSourceConfiguration(
-            
"jdbc:h2:mem:test_db;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL;USER=root;PASSWORD=root",
 "root", "root");
-    
-    @Test
-    public void assertCreateImporterForMysql() {
-        Importer importer = ImporterCreatorFactory.getInstance("MySQL")
-                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
-                        new FixturePipelineJobProgressListener());
-        assertThat(importer, instanceOf(DefaultImporter.class));
-    }
-    
-    @Test
-    public void assertCreateImporterForPostgreSQL() {
-        Importer importer = ImporterCreatorFactory.getInstance("PostgreSQL")
-                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
-                        new FixturePipelineJobProgressListener());
-        assertThat(importer, instanceOf(DefaultImporter.class));
-    }
-    
-    @Test
-    public void assertCreateImporterForOpenGauss() {
-        Importer importer = ImporterCreatorFactory.getInstance("openGauss")
-                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
-                        new FixturePipelineJobProgressListener());
-        assertThat(importer, instanceOf(DefaultImporter.class));
-    }
-    
-    @Test
-    public void assertCreateImporterForH2() {
-        Importer importer = ImporterCreatorFactory.getInstance("H2")
-                .createImporter(mockImporterConfiguration(), 
dataSourceManager, channel,
-                        new FixturePipelineJobProgressListener());
-        assertThat(importer, instanceOf(FixtureImporter.class));
-    }
-    
-    private ImporterConfiguration mockImporterConfiguration() {
-        Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("test_table"), 
Collections.singleton("user"));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, 3, 3);
-    }
-}
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator
similarity index 100%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.scaling.core.job.importer.ImporterCreator
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator

Reply via email to