This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 769ff9079b7 Extract PipelineTableMetaDataLoader interface; Move dumper 
creator SPI to api module (#20739)
769ff9079b7 is described below

commit 769ff9079b79e67030f68d52222de6305cfaab7c
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sun Sep 4 22:50:20 2022 +0800

    Extract PipelineTableMetaDataLoader interface; Move dumper creator SPI to 
api module (#20739)
    
    * Extract PipelineTableMetaDataLoader as interface and move to api module
    
    * Move dumper creator SPI to api module
    
    * Move out dumper interface from spi package
    
    * Improve code style
---
 .../DataConsistencyCalculateParameter.java         |  2 +-
 .../{spi => api}/ingest/dumper/Dumper.java         |  2 +-
 .../ingest/dumper/IncrementalDumper.java           |  2 +-
 .../ingest/dumper/InventoryDumper.java             |  2 +-
 .../loader/PipelineTableMetaDataLoader.java}       | 27 +++++------
 .../{ => model}/PipelineColumnMetaData.java        |  2 +-
 .../api}/metadata/model/PipelineIndexMetaData.java |  3 +-
 .../api}/metadata/model/PipelineTableMetaData.java |  3 +-
 .../ingest/dumper/IncrementalDumperCreator.java    | 12 ++---
 .../dumper/IncrementalDumperCreatorFactory.java    |  4 +-
 .../spi}/ingest/dumper/InventoryDumperCreator.java | 23 +++++-----
 .../dumper/InventoryDumperCreatorFactory.java      | 14 +++---
 .../ingest/dumper/AbstractIncrementalDumper.java   |  4 +-
 .../ingest/dumper/AbstractInventoryDumper.java     |  6 +--
 .../core/ingest/dumper/DefaultInventoryDumper.java |  2 +-
 .../dumper/DefaultInventoryDumperCreator.java      | 10 ++++-
 ...va => StandardPipelineTableMetaDataLoader.java} | 22 ++++-----
 .../core/prepare/InventoryTaskSplitter.java        |  8 ++--
 .../data/pipeline/core/task/IncrementalTask.java   |  6 +--
 .../data/pipeline/core/task/InventoryTask.java     |  6 +--
 .../migration/MigrationDataConsistencyChecker.java |  8 ++--
 .../migration/MigrationJobItemContext.java         |  5 ++-
 .../scenario/migration/MigrationJobPreparer.java   |  2 +-
 ...MatchDataConsistencyCalculateAlgorithmTest.java |  2 +-
 .../mysql/MySQLIncrementalDumperCreator.java       |  6 +--
 .../mysql/MySqlInventoryDumperCreator.java         |  9 ++--
 .../mysql/ingest/MySQLIncrementalDumper.java       |  6 +--
 .../mysql/ingest/MySQLInventoryDumper.java         |  2 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  9 ++--
 .../mysql/ingest/MySQLInventoryDumperTest.java     |  4 +-
 .../OpenGaussIncrementalDumperCreator.java         |  6 +--
 .../opengauss/ingest/OpenGaussWalDumper.java       |  2 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 .../PostgreSQLIncrementalDumperCreator.java        |  6 +--
 .../PostgreSQLInventoryDumperCreator.java          |  6 +--
 .../ingest/PostgreSQLInventoryDumper.java          |  2 +-
 .../postgresql/ingest/PostgreSQLWalDumper.java     |  2 +-
 .../postgresql/ingest/wal/WalEventConverter.java   |  4 +-
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 .../ingest/PostgreSQLJdbcDumperTest.java           |  4 +-
 .../postgresql/ingest/PostgreSQLWalDumperTest.java |  5 ++-
 .../ingest/wal/WalEventConverterTest.java          |  4 +-
 .../api/impl/GovernanceRepositoryAPIImplTest.java  |  7 +--
 .../IncrementalDumperCreatorFactoryTest.java       | 52 +++++++++-------------
 .../dumper/InventoryDumperCreatorFactoryTest.java  | 43 +++++++-----------
 .../core/fixture/FixtureIncrementalDumper.java     |  2 +-
 .../fixture/FixtureIncrementalDumperCreator.java   | 13 +++---
 .../core/fixture/FixtureInventoryDumper.java       |  2 +-
 .../fixture/FixtureInventoryDumperCreator.java     | 13 +++---
 ...> StandardPipelineTableMetaDataLoaderTest.java} | 13 +++---
 .../metadata/model/PipelineTableMetaDataTest.java  |  3 +-
 .../pipeline/core/task/IncrementalTaskTest.java    |  5 ++-
 .../data/pipeline/core/task/InventoryTaskTest.java |  7 +--
 ...ine.spi.ingest.dumper.IncrementalDumperCreator} |  0
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |  0
 58 files changed, 200 insertions(+), 214 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
index 6d6cb7ecb18..7463fadae10 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/check/consistency/DataConsistencyCalculateParameter.java
@@ -24,7 +24,7 @@ import lombok.Setter;
 import lombok.ToString;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 
 import java.util.Collection;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
index bd4e101fb61..79724699ab6 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/Dumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/Dumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 import org.apache.shardingsphere.data.pipeline.api.executor.LifecycleExecutor;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
index 6871815663a..db292c1b1bf 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/IncrementalDumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 /**
  * Incremental dumper.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
index cd79d9b89d9..9405e29878b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/InventoryDumper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.api.ingest.dumper;
 
 /**
  * Inventory dumper.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
similarity index 60%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
copy to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
index 759ce69a0fe..5fff7bb307c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/loader/PipelineTableMetaDataLoader.java
@@ -15,24 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.loader;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-
-import java.util.List;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 
 /**
- * Pipeline meta data of index.
+ * Pipeline table metadata loader.
  */
-@RequiredArgsConstructor
-@Getter
-@ToString
-public final class PipelineIndexMetaData {
-    
-    private final String name;
+public interface PipelineTableMetaDataLoader {
     
-    private final List<PipelineColumnMetaData> columns;
+    /**
+     * Get table metadata, load if it does not exist.
+     *
+     * @param schemaName schema name. nullable
+     * @param tableName dedicated table name, not table name pattern
+     * @return table metadata
+     */
+    PipelineTableMetaData getTableMetaData(String schemaName, String 
tableName);
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
similarity index 96%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
index d4476eeb53f..ccc001cf6c1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/PipelineColumnMetaData.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineColumnMetaData.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.metadata;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.NonNull;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
similarity index 88%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
index 759ce69a0fe..2a1713efeac 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineIndexMetaData.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
 
 import java.util.List;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
similarity index 96%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
index 7a1ed2d9789..c35eb8f463f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/metadata/model/PipelineTableMetaData.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.metadata.model;
+package org.apache.shardingsphere.data.pipeline.api.metadata.model;
 
 import lombok.Getter;
 import lombok.NonNull;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
 
 import java.util.ArrayList;
 import java.util.Collection;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
similarity index 82%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
index 5583e056e1e..6eb1b956145 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreator.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
@@ -34,11 +34,11 @@ public interface IncrementalDumperCreator<P> extends 
TypedSPI {
     /**
      * Create incremental dumper.
      *
-     * @param dumperConfig dumperConfig
+     * @param dumperConfig dumper configuration
      * @param position position
      * @param channel channel
-     * @param metaDataLoader metaDataLoader
-     * @return IncrementalDumper
+     * @param metaDataLoader meta data loader
+     * @return incremental dumper
      */
     IncrementalDumper createIncrementalDumper(DumperConfiguration 
dumperConfig, IngestPosition<P> position,
                                               PipelineChannel channel, 
PipelineTableMetaDataLoader metaDataLoader);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
similarity index 93%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
index 4db34d487ef..f81e9fbcd61 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/IncrementalDumperCreatorFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/IncrementalDumperCreatorFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -36,7 +36,7 @@ public class IncrementalDumperCreatorFactory {
      * Incremental dumper creator.
      *
      * @param databaseType database type
-     * @return Incremental dumper creator
+     * @return incremental dumper creator
      */
     public static IncrementalDumperCreator getInstance(final String 
databaseType) {
         return 
TypedSPIRegistry.getRegisteredService(IncrementalDumperCreator.class, 
databaseType);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
similarity index 70%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
index 0bc3df83de2..496e9bb7a66 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreator.java
@@ -15,29 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
-import javax.sql.DataSource;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
+import javax.sql.DataSource;
+
 /**
  * Inventory dumper creator.
  */
 @SingletonSPI
-public interface InventoryDumperCreator extends TypedSPI {
+public interface InventoryDumperCreator extends TypedSPI, RequiredSPI {
     
     /**
-     * Create Inventory Dumper.
-     * @param inventoryDumperConfig inventoryDumperConfig
+     * Create inventory dumper.
+     *
+     * @param inventoryDumperConfig inventory dumper configuration
      * @param channel chanel
-     * @param sourceDataSource sourceDataSource
-     * @param sourceMetaDataLoader sourceMetaDataLoader
-     * @return InventoryDumper
+     * @param sourceDataSource source data source
+     * @param sourceMetaDataLoader source meta data loader
+     * @return inventory dumper
      */
     InventoryDumper createInventoryDumper(InventoryDumperConfiguration 
inventoryDumperConfig, PipelineChannel channel,
                                           DataSource sourceDataSource, 
PipelineTableMetaDataLoader sourceMetaDataLoader);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
similarity index 74%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
index 6c218b56391..6c7b2120630 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/InventoryDumperCreatorFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/dumper/InventoryDumperCreatorFactory.java
@@ -15,15 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.dumper;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
-import java.util.Optional;
-
 /**
  * Inventory dumper creator factory.
  */
@@ -35,13 +34,12 @@ public class InventoryDumperCreatorFactory {
     }
     
     /**
-     * Get inventoryDumper creator instance.
+     * Get inventory dumper creator instance.
      *
-     * @param databaseType databaseType
-     * @return InventoryDumperCreator
+     * @param databaseType database type
+     * @return inventory dumper creator
      */
     public static InventoryDumperCreator getInstance(final String 
databaseType) {
-        Optional<InventoryDumperCreator> result = 
TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, 
databaseType);
-        return result.orElseGet(DefaultInventoryDumperCreator::new);
+        return 
TypedSPIRegistry.findRegisteredService(InventoryDumperCreator.class, 
databaseType).orElseGet(() -> 
RequiredSPIRegistry.getRegisteredService(InventoryDumperCreator.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
index f36e16e6f2d..bd396dac9cb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractIncrementalDumper.java
@@ -20,9 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 
 /**
  * Abstract incremental dumper.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
index 34f0ebee193..78de64e85dd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/AbstractInventoryDumper.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
@@ -38,14 +39,13 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
index 22e9c343dc8..596f740a609 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumper.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
index 335b989d454..2860dcb14f0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/DefaultInventoryDumperCreator.java
@@ -19,8 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
 
 import javax.sql.DataSource;
 
@@ -34,4 +35,9 @@ public final class DefaultInventoryDumperCreator implements 
InventoryDumperCreat
                                                  final DataSource 
sourceDataSource, final PipelineTableMetaDataLoader sourceMetaDataLoader) {
         return new DefaultInventoryDumper(inventoryDumperConfig, channel, 
sourceDataSource, sourceMetaDataLoader);
     }
+    
+    @Override
+    public boolean isDefault() {
+        return true;
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
similarity index 91%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 2b5f943091f..dcc6ed43e6a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoader.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -21,9 +21,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.TableName;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeFactory;
 
 import java.sql.Connection;
@@ -41,25 +42,18 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Pipeline table metadata loader.
+ * Standard pipeline table metadata loader.
  */
 @RequiredArgsConstructor
 @Slf4j
-// TODO PipelineTableMetaDataLoader SPI
-public final class PipelineTableMetaDataLoader {
+public final class StandardPipelineTableMetaDataLoader implements 
PipelineTableMetaDataLoader {
     
-    // TODO it doesn't support ShardingSphereDataSource
+    // It doesn't support ShardingSphereDataSource
     private final PipelineDataSourceWrapper dataSource;
     
     private final Map<TableName, PipelineTableMetaData> tableMetaDataMap = new 
ConcurrentHashMap<>();
     
-    /**
-     * Get table metadata, load if it does not exist.
-     *
-     * @param schemaName schema name. nullable
-     * @param tableName dedicated table name, not table name pattern
-     * @return table metadata
-     */
+    @Override
     public PipelineTableMetaData getTableMetaData(final String schemaName, 
final String tableName) {
         PipelineTableMetaData result = tableMetaDataMap.get(new 
TableName(tableName));
         if (null != result) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index c5d10c6f5b4..f2e3a02064b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -35,14 +35,14 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.StringPrimary
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCreationException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
index df9150dff41..d6aad311a44 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTask.java
@@ -26,19 +26,19 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 
 import java.util.Collection;
 import java.util.LinkedList;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
index 23dbf5ffc77..ba1f187c65d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTask.java
@@ -26,19 +26,19 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import org.apache.shardingsphere.data.pipeline.api.importer.Importer;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.listener.PipelineJobProgressListener;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.InventoryTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobExecutionException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreatorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.channel.PipelineChannelCreator;
-import org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
 
 import javax.sql.DataSource;
 import java.util.List;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 201422f5ed0..3c14a669de0 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -28,11 +28,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -174,7 +174,7 @@ public final class MigrationDataConsistencyChecker {
             String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getType();
             for (String each : Collections.singletonList(sourceTableName)) {
-                PipelineTableMetaData tableMetaData = new 
PipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each);
+                PipelineTableMetaData tableMetaData = new 
StandardPipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each);
                 if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can 
not get metadata for table " + each);
                 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a4470921b..9a72f14c16c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -29,8 +29,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 
@@ -79,7 +80,7 @@ public final class MigrationJobItemContext implements 
InventoryIncrementalJobIte
         
         @Override
         protected PipelineTableMetaDataLoader initialize() throws 
ConcurrentException {
-            return new 
PipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
+            return new 
StandardPipelineTableMetaDataLoader(sourceDataSourceLazyInitializer.get());
         }
     };
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
index c3cd181a04d..c02cd51ea0f 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobPreparer.java
@@ -28,12 +28,12 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.apache.shardingsphere.data.pipeline.core.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.DefaultPipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.InventoryTaskSplitter;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.PipelineJobPreparerUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
index 04c6b47d9a9..d4240d77e4a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithmTest.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm
 import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCalculateParameter;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
 import org.junit.Before;
 import org.junit.Test;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
index 6fe23ba7a34..076c700b23b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.mysql;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * MySql incremental dumper creator.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
index 7697ee5d13c..225fbeb9e9e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/MySqlInventoryDumperCreator.java
@@ -17,13 +17,14 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql;
 
-import javax.sql.DataSource;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
 
 public class MySqlInventoryDumperCreator implements InventoryDumperCreator {
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 40727cb4ac3..b6f0e056f20 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -31,11 +31,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractRowsEvent;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
index 2fde33ba951..6fd46b68b3a 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumpe
 [...]
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 3ee5618cd76..002201f3ec3 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -27,12 +27,13 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderReco
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.event.AbstractBinlogEvent;
@@ -82,7 +83,7 @@ public final class MySQLIncrementalDumperTest {
         initTableData(dumperConfig);
         dumperConfig.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", 
"root", "root"));
         channel = new MultiplexMemoryPipelineChannel();
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         incrementalDumper = new MySQLIncrementalDumper(dumperConfig, new 
BinlogPosition("binlog-000001", 4L), channel, metaDataLoader);
         PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", 
Types.INTEGER, "INTEGER", true, true);
         
when(pipelineTableMetaData.getColumnMetaData(anyInt())).thenReturn(column);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
index 0d8bc4f31fc..d8aaf019c89 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLInventoryDumperTest.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -49,7 +49,7 @@ public final class MySQLInventoryDumperTest {
         PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
         InventoryDumperConfiguration dumperConfig = 
mockInventoryDumperConfiguration();
         PipelineDataSourceWrapper dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        mysqlJdbcDumper = new 
MySQLInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
+        mysqlJdbcDumper = new 
MySQLInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
StandardPipelineTableMetaDataLoader(dataSource));
         initTableData(dataSource);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
index fce078e1591..4083a4cd690 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/OpenGaussIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.opengauss;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * OpenGauss incremental dumper creator.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
index 8362bed849d..84123622fb8 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWalDumper.java
@@ -23,9 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Incremen
 [...]
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-opengauss/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
index 684f191b380..4b067556abc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLIncrementalDumperCreator.java
@@ -19,12 +19,12 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 
 /**
  * PostgreSQL incremental dumper creator.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
index aedabf2fd75..6394e2e9cb2 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/PostgreSQLInventoryDumperCreator.java
@@ -19,10 +19,10 @@ package org.apache.shardingsphere.data.pipeline.postgresql;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
 
 import javax.sql.DataSource;
 import java.util.Collection;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
index 7f56c647281..3cbf9a158b5 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLInventoryDumper.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
index 67f181511ab..c304037e96d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumper.java
@@ -23,9 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalEventConverter;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
index cc9e26117d4..4ffeabcc458 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverter.java
@@ -23,9 +23,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractWalEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Increm
 [...]
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.Inventor
 [...]
similarity index 100%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
index d4388d1473f..2ed82a1616e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLJdbcDumperTest.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSource
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -50,7 +50,7 @@ public final class PostgreSQLJdbcDumperTest {
         InventoryDumperConfiguration dumperConfig = 
mockInventoryDumperConfiguration();
         dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
         jdbcDumper = new 
PostgreSQLInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100),
-                dataSource, new PipelineTableMetaDataLoader(dataSource));
+                dataSource, new 
StandardPipelineTableMetaDataLoader(dataSource));
         initTableData(dataSource);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
index 7e686c000ec..2ecb055f62d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWalDumperTest.java
@@ -24,10 +24,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDat
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.MultiplexMemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.util.ReflectionUtil;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.LogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
@@ -84,7 +85,7 @@ public final class PostgreSQLWalDumperTest {
         position = new WalPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         channel = new MultiplexMemoryPipelineChannel();
         dumperConfig = mockDumperConfiguration();
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig()));
         walDumper = new PostgreSQLWalDumper(dumperConfig, position, channel, 
metaDataLoader);
     }
     
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
index 16681b8b044..dd93c267854 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WalEventConverterTest.java
@@ -29,7 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.IngestDataChangeType;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.AbstractRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.DeleteRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.event.PlaceholderEvent;
@@ -59,7 +59,7 @@ public final class WalEventConverterTest {
     @Before
     public void setUp() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
-        walEventConverter = new WalEventConverter(dumperConfig, new 
PipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
+        walEventConverter = new WalEventConverter(dumperConfig, new 
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig())));
         initTableData(dumperConfig);
     }
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index f340101601c..f64fde18e31 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,12 +22,13 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
 import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
@@ -141,7 +142,7 @@ public final class GovernanceRepositoryAPIImplTest {
         dumperConfig.setUniqueKeyDataType(Types.INTEGER);
         dumperConfig.setShardingItem(0);
         PipelineDataSourceWrapper dataSource = 
mock(PipelineDataSourceWrapper.class);
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
         return new InventoryTask(dumperConfig, taskConfig.getImporterConfig(), 
PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), dataSource, 
metaDataLoader, PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
     }
@@ -149,7 +150,7 @@ public final class GovernanceRepositoryAPIImplTest {
     private IncrementalTask mockIncrementalTask(final TaskConfiguration 
taskConfig) {
         DumperConfiguration dumperConfig = taskConfig.getDumperConfig();
         dumperConfig.setPosition(new PlaceholderPosition());
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         return new IncrementalTask(3, dumperConfig, 
taskConfig.getImporterConfig(), PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
     }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
index 6352be60159..b9b1477b6b8 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/IncrementalDumperCreatorFactoryTest.java
@@ -17,80 +17,68 @@
 
 package org.apache.shardingsphere.data.pipeline.core.dumper;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
-import java.util.Collections;
 import 
org.apache.shardingsphere.data.pipeline.api.config.TableNameSchemaNameMapping;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixtureIncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreatorFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLIncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.binlog.BinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussWalDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLWalDumper;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WalPosition;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
-import org.junit.Before;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreatorFactory;
 import org.junit.Test;
 import org.mockito.Mock;
 
+import java.util.Collections;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
 public final class IncrementalDumperCreatorFactoryTest {
     
-    private PipelineDataSourceWrapper dataSource;
-    
     @Mock
-    private WalPosition walPosition;
-    
-    @Before
-    public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-        DumperConfiguration dumperConfig = mockDumperConfiguration();
-        dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-    }
+    private IngestPosition<?> ingestPosition;
     
     @Test
     public void assertIncrementalDumperCreatorForMysql() {
-        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("MySQL")
-                .createIncrementalDumper(mockDumperConfiguration(), new 
BinlogPosition("binlog-000001", 4L), new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("MySQL");
         assertThat(actual, instanceOf(MySQLIncrementalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForPostgreSQL() {
-        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("PostgreSQL")
-                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("PostgreSQL");
         assertThat(actual, instanceOf(PostgreSQLWalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForOpenGauss() {
-        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("openGauss")
-                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("openGauss");
         assertThat(actual, instanceOf(OpenGaussWalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForFixture() {
-        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("Fixture")
-                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("Fixture");
         assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
     }
     
     @Test
     public void assertIncrementalDumperCreatorForH2() {
-        IncrementalDumper actual = 
IncrementalDumperCreatorFactory.getInstance("H2")
-                .createIncrementalDumper(mockDumperConfiguration(), 
walPosition, new SimpleMemoryPipelineChannel(100), new 
PipelineTableMetaDataLoader(dataSource));
+        IncrementalDumper actual = createIncrementalDumper("H2");
         assertThat(actual, instanceOf(FixtureIncrementalDumper.class));
     }
     
+    private IncrementalDumper createIncrementalDumper(final String 
databaseType) {
+        return IncrementalDumperCreatorFactory.getInstance(databaseType)
+                .createIncrementalDumper(mockDumperConfiguration(), 
ingestPosition, new SimpleMemoryPipelineChannel(100), new 
StandardPipelineTableMetaDataLoader(null));
+    }
+    
     private DumperConfiguration mockDumperConfiguration() {
         DumperConfiguration result = new DumperConfiguration();
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:mysql://127.0.0.1:3306/ds_0", 
"root", "root"));
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
index 3a9c90b14d8..d8bcbf36379 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
@@ -17,63 +17,54 @@
 
 package org.apache.shardingsphere.data.pipeline.core.dumper;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.junit.Assert.assertThat;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreatorFactory;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
-import org.junit.Before;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreatorFactory;
 import org.junit.Test;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
 public final class InventoryDumperCreatorFactoryTest {
     
-    private PipelineDataSourceWrapper dataSource;
-    
-    @Before
-    public void setUp() {
-        PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
-        InventoryDumperConfiguration dumperConfig = 
mockInventoryDumperConfiguration();
-        dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-    }
-    
     @Test
-    public void assertInventoryDumperCreatorForMysql() {
-        InventoryDumper actual = 
InventoryDumperCreatorFactory.getInstance("MySql")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
+    public void assertInventoryDumperCreatorForMySQL() {
+        InventoryDumper actual = createInventoryDumper("MySQL");
         assertThat(actual, instanceOf(MySQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForPostgreSQL() {
-        InventoryDumper actual = 
InventoryDumperCreatorFactory.getInstance("PostgreSQL")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("PostgreSQL");
         assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForOpenGauss() {
-        InventoryDumper actual = 
InventoryDumperCreatorFactory.getInstance("openGauss")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("openGauss");
         assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
     }
     
     @Test
     public void assertInventoryDumperCreatorForFixture() {
-        InventoryDumper actual = 
InventoryDumperCreatorFactory.getInstance("Fixture")
-                .createInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
PipelineTableMetaDataLoader(dataSource));
+        InventoryDumper actual = createInventoryDumper("Fixture");
         assertThat(actual, instanceOf(FixtureInventoryDumper.class));
     }
     
+    private InventoryDumper createInventoryDumper(final String databaseType) {
+        PipelineDataSourceWrapper dataSource = null;
+        return InventoryDumperCreatorFactory.getInstance(databaseType)
+                .createInventoryDumper(mockInventoryDumperConfiguration(), new 
SimpleMemoryPipelineChannel(100), dataSource, new 
StandardPipelineTableMetaDataLoader(dataSource));
+    }
+    
     private InventoryDumperConfiguration mockInventoryDumperConfiguration() {
         DumperConfiguration dumperConfig = mockDumperConfiguration();
         InventoryDumperConfiguration result = new 
InventoryDumperConfiguration(dumperConfig);
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
index b97eebb219a..c613f92e5b7 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumper.java
@@ -21,8 +21,8 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfigura
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractIncrementalDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 public final class FixtureIncrementalDumper extends 
AbstractIncrementalDumper<FinishedPosition> {
     
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
index 7ca4d4d49eb..b0152f36f90 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureIncrementalDumperCreator.java
@@ -17,16 +17,17 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.DumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Fixture incremental dumper creator.
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
index 3da47584451..dfa4d701596 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumper.java
@@ -19,8 +19,8 @@ package org.apache.shardingsphere.data.pipeline.core.fixture;
 
 import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.AbstractInventoryDumper;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
index d099e0ab3a9..d41cde2b172 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/FixtureInventoryDumperCreator.java
@@ -17,15 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.fixture;
 
+import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator;
+
+import javax.sql.DataSource;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import javax.sql.DataSource;
-import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumperConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumper;
 
 /**
  * Fixture iInventory dumper creator.
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
similarity index 91%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
index 3c59c49838e..a6223c54031 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/PipelineTableMetaDataLoaderTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoaderTest.java
@@ -18,9 +18,10 @@
 package org.apache.shardingsphere.data.pipeline.core.metadata.loader;
 
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndexMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineIndexMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.apache.shardingsphere.infra.database.type.dialect.H2DatabaseType;
 import org.apache.shardingsphere.test.mock.MockedDataSource;
 import org.junit.Before;
@@ -43,7 +44,7 @@ import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 // TODO use H2 to do real test
-public final class PipelineTableMetaDataLoaderTest {
+public final class StandardPipelineTableMetaDataLoaderTest {
     
     private static final String TEST_CATALOG = "catalog";
     
@@ -115,7 +116,7 @@ public final class PipelineTableMetaDataLoaderTest {
     
     @Test
     public void assertGetTableMetaData() {
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(null, TEST_TABLE);
         assertColumnMetaData(tableMetaData);
         assertPrimaryKeys(tableMetaData.getPrimaryKeyColumns());
@@ -151,6 +152,6 @@ public final class PipelineTableMetaDataLoaderTest {
     @Test(expected = RuntimeException.class)
     public void assertGetTableMetaDataFailure() throws SQLException {
         when(dataSource.getConnection()).thenThrow(new SQLException(""));
-        new PipelineTableMetaDataLoader(dataSource).getTableMetaData(null, 
TEST_TABLE);
+        new 
StandardPipelineTableMetaDataLoader(dataSource).getTableMetaData(null, 
TEST_TABLE);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index 53638b69e49..59b1449474f 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
-import 
org.apache.shardingsphere.data.pipeline.api.metadata.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
index e5818fb9667..c0097e72f04 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/IncrementalTaskTest.java
@@ -20,9 +20,10 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 import org.apache.shardingsphere.data.pipeline.api.config.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.junit.After;
@@ -48,7 +49,7 @@ public final class IncrementalTaskTest {
     public void setUp() {
         TaskConfiguration taskConfig = 
PipelineContextUtil.mockMigrationJobItemContext(JobConfigurationBuilder.createJobConfiguration()).getTaskConfig();
         taskConfig.getDumperConfig().setPosition(new PlaceholderPosition());
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(mock(PipelineDataSourceWrapper.class));
         incrementalTask = new IncrementalTask(3, taskConfig.getDumperConfig(), 
taskConfig.getImporterConfig(),
                 PipelineContextUtil.getPipelineChannelCreator(),
                 new DefaultPipelineDataSourceManager(), metaDataLoader, 
PipelineContextUtil.getExecuteEngine(), new 
FixturePipelineJobProgressListener());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
index d1a6d3b28e2..03096ef1cad 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/task/InventoryTaskTest.java
@@ -23,10 +23,11 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixturePipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.exception.IngestException;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobConfigurationBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineContextUtil;
 import org.junit.AfterClass;
@@ -67,7 +68,7 @@ public final class InventoryTaskTest {
     public void assertStartWithGetEstimatedRowsFailure() {
         InventoryDumperConfiguration inventoryDumperConfig = 
createInventoryDumperConfiguration("t_non_exist", "t_non_exist");
         PipelineDataSourceWrapper dataSource = 
DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
         try (
                 InventoryTask inventoryTask = new 
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
@@ -82,7 +83,7 @@ public final class InventoryTaskTest {
         // TODO use t_order_0, and also others
         InventoryDumperConfiguration inventoryDumperConfig = 
createInventoryDumperConfiguration("t_order", "t_order");
         PipelineDataSourceWrapper dataSource = 
DATA_SOURCE_MANAGER.getDataSource(inventoryDumperConfig.getDataSourceConfig());
-        PipelineTableMetaDataLoader metaDataLoader = new 
PipelineTableMetaDataLoader(dataSource);
+        PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(dataSource);
         try (
                 InventoryTask inventoryTask = new 
InventoryTask(inventoryDumperConfig, taskConfig.getImporterConfig(),
                         PipelineContextUtil.getPipelineChannelCreator(),
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
similarity index 100%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.IncrementalDumperCreator
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 100%
rename from 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.ingest.dumper.InventoryDumperCreator
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator

Reply via email to