This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f9847d0ccd Refactor SPI factory and unit test for scaling (#20860)
3f9847d0ccd is described below

commit 3f9847d0ccd0d6fe80ddeb77b98a16fe7c05aad1
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 8 08:34:40 2022 +0800

    Refactor SPI factory and unit test for scaling (#20860)
---
 .../spi/ingest/position/PositionInitializer.java   |   3 +-
 .../position/PositionInitializerFactory.java       |   6 +-
 .../spi/sqlbuilder/PipelineSQLBuilder.java         |   3 +-
 .../position/DefaultPositionInitializer.java}      |  11 +-
 ...YamlJobItemIncrementalTasksProgressSwapper.java |   5 +-
 .../StandardPipelineTableMetaDataLoader.java       |   2 +-
 .../core/prepare/PipelineJobPreparerUtils.java     |   2 +-
 .../core/sqlbuilder/DefaultPipelineSQLBuilder.java |   5 +
 .../core/sqlbuilder/PipelineSQLBuilderFactory.java |   6 +-
 ...eline.spi.ingest.dumper.InventoryDumperCreator} |   2 +-
 ...peline.spi.ingest.position.PositionInitializer} |   2 +-
 ...data.pipeline.spi.sqlbuilder.PipelineSQLBuilder |   1 +
 .../FixturePipelineDataSourceCreator.java          |   4 +-
 .../core/fixture/MigrationJobAPIFixture.java       | 182 ---------------------
 ...re.datasource.creator.PipelineDataSourceCreator |   2 +-
 ...peline.spi.ingest.dumper.InventoryDumperCreator |   1 -
 .../data/pipeline/api/PipelineAPIFactoryTest.java  |  12 +-
 .../api/PipelineJobPublicAPIFactoryTest.java       |  10 +-
 .../PipelineDataSourceCreatorFactoryTest.java      |  18 +-
 .../dumper/InventoryDumperCreatorFactoryTest.java  |   7 +
 .../sqlbuilder/PipelineSQLBuilderFactoryTest.java  |  47 ++++++
 .../migration}/MigrationJobAPIFactoryTest.java     |   6 +-
 .../position/PositionInitializerFactoryTest.java   |  20 ++-
 23 files changed, 132 insertions(+), 225 deletions(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
index 9ec5cf0d1c9..5febb5ecced 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializer.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.spi.ingest.position;
 
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.infra.util.spi.annotation.SingletonSPI;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import javax.sql.DataSource;
@@ -28,7 +29,7 @@ import java.sql.SQLException;
  * Position initializer.
  */
 @SingletonSPI
-public interface PositionInitializer extends TypedSPI {
+public interface PositionInitializer extends TypedSPI, RequiredSPI {
     
     /**
      * Init position by data source.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactory.java
similarity index 81%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactory.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactory.java
index 526db00a750..9f7afb41881 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactory.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.position;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.position;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
 /**
@@ -40,6 +40,6 @@ public final class PositionInitializerFactory {
      * @return got instance
      */
     public static PositionInitializer getInstance(final String databaseType) {
-        return 
TypedSPIRegistry.getRegisteredService(PositionInitializer.class, databaseType);
+        return 
TypedSPIRegistry.findRegisteredService(PositionInitializer.class, 
databaseType).orElseGet(() -> 
RequiredSPIRegistry.getRegisteredService(PositionInitializer.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
index a96f009bace..9f13819ca95 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sqlbuilder/PipelineSQLBuilder.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
 
 import java.util.Collection;
@@ -31,7 +32,7 @@ import java.util.Set;
 /**
  * Pipeline SQL builder.
  */
-public interface PipelineSQLBuilder extends TypedSPI {
+public interface PipelineSQLBuilder extends TypedSPI, RequiredSPI {
     
     /**
      * Build create schema SQL.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
similarity index 89%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
index 0f423a77a86..8c2a667c698 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/fixture/FixturePositionInitializer.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DefaultPositionInitializer.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.position.fixture;
+package org.apache.shardingsphere.data.pipeline.core.ingest.position;
 
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
@@ -23,7 +23,10 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionIniti
 import javax.sql.DataSource;
 import java.sql.SQLException;
 
-public final class FixturePositionInitializer implements PositionInitializer {
+/**
+ * Default position initializer.
+ */
+public final class DefaultPositionInitializer implements PositionInitializer {
     
     @Override
     public IngestPosition<?> init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
@@ -36,7 +39,7 @@ public final class FixturePositionInitializer implements 
PositionInitializer {
     }
     
     @Override
-    public String getType() {
-        return "FIXTURE";
+    public boolean isDefault() {
+        return true;
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index b78115699a3..2452490c192 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -17,10 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
 
-import java.util.Collections;
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.api.task.progress.IncrementalTaskProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
+
+import java.util.Collections;
 
 /**
  * YAML job item incremental tasks progress swapper.
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index dcc6ed43e6a..aadbcce129c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -62,7 +62,7 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
         try {
             loadTableMetaData(schemaName, tableName);
         } catch (final SQLException ex) {
-            throw new RuntimeException(String.format("Load metadata for table 
'%s' failed", tableName), ex);
+            throw new RuntimeException(String.format("Load metadata for schema 
'%s' and table '%s' failed", schemaName, tableName), ex);
         }
         result = tableMetaDataMap.get(new TableName(tableName));
         if (null == result) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
index 0a080946ad9..99087c7358e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/PipelineJobPreparerUtils.java
@@ -29,13 +29,13 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPositio
 import 
org.apache.shardingsphere.data.pipeline.api.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.check.datasource.DataSourceCheckerFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.PositionInitializerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparer;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.DataSourcePreparerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetSchemasParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.prepare.datasource.PrepareTargetTablesParameter;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
+import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializerFactory;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
 import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
index 6a00c558493..88d0089df31 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/DefaultPipelineSQLBuilder.java
@@ -31,4 +31,9 @@ public final class DefaultPipelineSQLBuilder extends 
AbstractPipelineSQLBuilder
     public String getRightIdentifierQuoteString() {
         return "";
     }
+    
+    @Override
+    public boolean isDefault() {
+        return true;
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
index c3951d0d9be..db2ad04f18c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactory.java
@@ -21,10 +21,9 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
 
-import java.util.Optional;
-
 /**
  * Pipeline SQL builder factory.
  */
@@ -42,7 +41,6 @@ public final class PipelineSQLBuilderFactory {
      * @return got instance
      */
     public static PipelineSQLBuilder getInstance(final String databaseType) {
-        Optional<PipelineSQLBuilder> result = 
TypedSPIRegistry.findRegisteredService(PipelineSQLBuilder.class, databaseType, 
null);
-        return result.orElseGet(DefaultPipelineSQLBuilder::new);
+        return 
TypedSPIRegistry.findRegisteredService(PipelineSQLBuilder.class, databaseType, 
null).orElseGet(() -> 
RequiredSPIRegistry.getRegisteredService(PipelineSQLBuilder.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
similarity index 89%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
index 15029e72b0b..03a43360d78 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.ingest.position.fixture.FixturePositionInitializer
+org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DefaultInventoryDumperCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
similarity index 89%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
index d4f8cbc7c86..7cc67f99c2e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.fixture.MigrationJobAPIFixture
+org.apache.shardingsphere.data.pipeline.core.ingest.position.DefaultPositionInitializer
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
index 918e0b10c24..ea00b5cb926 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder
@@ -16,3 +16,4 @@
 #
 
 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.OraclePipelineSQLBuilder
+org.apache.shardingsphere.data.pipeline.core.sqlbuilder.DefaultPipelineSQLBuilder
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/fixture/FixturePipelineDataSourceCreator.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/FixturePipelineDataSourceCreator.java
similarity index 91%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/fixture/FixturePipelineDataSourceCreator.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/FixturePipelineDataSourceCreator.java
index 19eafd5a5c6..aa2d13cd13e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/fixture/FixturePipelineDataSourceCreator.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/FixturePipelineDataSourceCreator.java
@@ -15,9 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.fixture;
-
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreator;
+package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
deleted file mode 100644
index 9608049f057..00000000000
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/fixture/MigrationJobAPIFixture.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.core.fixture;
-
-import 
org.apache.shardingsphere.data.pipeline.api.check.consistency.DataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
-import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
-import 
org.apache.shardingsphere.data.pipeline.api.job.progress.InventoryIncrementalJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.api.pojo.CreateMigrationJobParameter;
-import 
org.apache.shardingsphere.data.pipeline.api.pojo.DataConsistencyCheckAlgorithmInfo;
-import org.apache.shardingsphere.data.pipeline.api.pojo.MigrationJobInfo;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.infra.config.rule.data.pipeline.PipelineProcessConfiguration;
-import org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-
-public final class MigrationJobAPIFixture implements MigrationJobAPI {
-    
-    @Override
-    public String marshalJobId(final PipelineJobId pipelineJobId) {
-        return null;
-    }
-    
-    @Override
-    public void extendYamlJobConfiguration(final YamlPipelineJobConfiguration 
yamlJobConfig) {
-    }
-    
-    @Override
-    public void createProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
-    }
-    
-    @Override
-    public void alterProcessConfiguration(final PipelineProcessConfiguration 
processConfig) {
-    }
-    
-    @Override
-    public void dropProcessConfiguration(final String confPath) {
-    }
-    
-    @Override
-    public PipelineProcessConfiguration showProcessConfiguration() {
-        return null;
-    }
-    
-    @Override
-    public void startDisabledJob(final String jobId) {
-    }
-    
-    @Override
-    public void stop(final String jobId) {
-    }
-    
-    @Override
-    public void rollback(final String jobId) {
-    }
-    
-    @Override
-    public List<MigrationJobInfo> list() {
-        return null;
-    }
-    
-    @Override
-    public Optional<String> start(final PipelineJobConfiguration jobConfig) {
-        return Optional.empty();
-    }
-    
-    @Override
-    public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final String jobId) {
-        return null;
-    }
-    
-    @Override
-    public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final MigrationJobConfiguration jobConfig) {
-        return null;
-    }
-    
-    @Override
-    public Collection<DataConsistencyCheckAlgorithmInfo> 
listDataConsistencyCheckAlgorithms() {
-        return null;
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId) {
-        return null;
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
MigrationJobConfiguration jobConfig) {
-        return null;
-    }
-    
-    @Override
-    public Map<String, DataConsistencyCheckResult> dataConsistencyCheck(final 
String jobId, final String algorithmType, final Properties algorithmProps) {
-        return null;
-    }
-    
-    @Override
-    public boolean aggregateDataConsistencyCheckResults(final String jobId, 
final Map<String, DataConsistencyCheckResult> checkResults) {
-        return false;
-    }
-    
-    @Override
-    public void commit(final String jobId) {
-    }
-    
-    @Override
-    public void addMigrationSourceResources(final Map<String, 
DataSourceProperties> dataSourcePropsMap) {
-    }
-    
-    @Override
-    public void dropMigrationSourceResources(final Collection<String> 
resourceNames) {
-    }
-    
-    @Override
-    public Collection<Collection<Object>> listMigrationSourceResources() {
-        return null;
-    }
-    
-    @Override
-    public String createJobAndStart(final CreateMigrationJobParameter 
parameter) {
-        return null;
-    }
-    
-    @Override
-    public MigrationJobConfiguration getJobConfiguration(final String jobId) {
-        return null;
-    }
-    
-    @Override
-    public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
-        return null;
-    }
-    
-    @Override
-    public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        return null;
-    }
-    
-    @Override
-    public boolean isDefault() {
-        return MigrationJobAPI.super.isDefault();
-    }
-    
-    @Override
-    public void persistJobItemProgress(final PipelineJobItemContext 
jobItemContext) {
-    }
-    
-    @Override
-    public InventoryIncrementalJobItemProgress getJobItemProgress(final String 
jobId, final int shardingItem) {
-        return null;
-    }
-    
-    @Override
-    public void updateJobItemStatus(final String jobId, final int 
shardingItem, final JobStatus status) {
-    }
-}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreator
index a579961f541..cdf64e9dcf7 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreator
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.datasource.creator.PipelineDataSourceCreator
@@ -15,4 +15,4 @@
 # limitations under the License.
 #
 
-org.apache.shardingsphere.data.pipeline.core.datasource.creator.fixture.FixturePipelineDataSourceCreator
+org.apache.shardingsphere.data.pipeline.core.datasource.creator.FixturePipelineDataSourceCreator
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
index 031f57d19b6..b7d86e9a51b 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-dialect/shardingsphere-data-pipeline-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.InventoryDumperCreator
@@ -16,4 +16,3 @@
 #
 
 org.apache.shardingsphere.data.pipeline.mysql.MySqlInventoryDumperCreator
-
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
similarity index 66%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
index 53c18567c86..b0a6c4d42bc 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineAPIFactoryTest.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+package org.apache.shardingsphere.data.pipeline.api;
 
-import 
org.apache.shardingsphere.data.pipeline.core.check.consistency.algorithm.fixture.FixturePipelineSQLBuilder;
+import org.apache.shardingsphere.data.pipeline.api.job.JobType;
+import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
-public final class PipelineSQLBuilderFactoryTest {
+public final class PipelineAPIFactoryTest {
     
     @Test
-    public void assertGetInstance() {
-        assertThat(PipelineSQLBuilderFactory.getInstance("FIXTURE"), 
instanceOf(FixturePipelineSQLBuilder.class));
+    public void assertGetPipelineJobAPI() {
+        assertThat(PipelineAPIFactory.getPipelineJobAPI(JobType.MIGRATION), 
instanceOf(MigrationJobAPIImpl.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
similarity index 71%
copy from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
copy to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
index ff70bc5e641..1060131fd4e 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/PipelineJobPublicAPIFactoryTest.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.position;
+package org.apache.shardingsphere.data.pipeline.api;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.fixture.FixturePositionInitializer;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIImpl;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
-public final class PositionInitializerFactoryTest {
+public final class PipelineJobPublicAPIFactoryTest {
     
     @Test
-    public void assertGetInstance() {
-        assertThat(PositionInitializerFactory.getInstance("FIXTURE"), 
instanceOf(FixturePositionInitializer.class));
+    public void assertGetMigrationJobPublicAPI() {
+        assertThat(PipelineJobPublicAPIFactory.getMigrationJobPublicAPI(), 
instanceOf(MigrationJobAPIImpl.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
similarity index 51%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
index b62f797f432..312a665e1ef 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/datasource/creator/PipelineDataSourceCreatorFactoryTest.java
@@ -17,9 +17,16 @@
 
 package org.apache.shardingsphere.data.pipeline.core.datasource.creator;
 
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.fixture.FixturePipelineDataSourceCreator;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.creator.impl.StandardPipelineDataSourceCreator;
+import 
org.apache.shardingsphere.driver.data.pipeline.datasource.creator.ShardingSpherePipelineDataSourceCreator;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
@@ -27,6 +34,13 @@ public final class PipelineDataSourceCreatorFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        assertThat(PipelineDataSourceCreatorFactory.getInstance("FIXTURE"), 
instanceOf(FixturePipelineDataSourceCreator.class));
+        Collection<Pair<String, Class<? extends PipelineDataSourceCreator>>> 
paramResult = Arrays.asList(
+                Pair.of(StandardPipelineDataSourceConfiguration.TYPE, 
StandardPipelineDataSourceCreator.class),
+                Pair.of(ShardingSpherePipelineDataSourceConfiguration.TYPE, 
ShardingSpherePipelineDataSourceCreator.class)
+        );
+        for (Pair<String, Class<? extends PipelineDataSourceCreator>> each : 
paramResult) {
+            PipelineDataSourceCreator actual = 
PipelineDataSourceCreatorFactory.getInstance(each.getKey());
+            assertThat(actual, instanceOf(each.getValue()));
+        }
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
index d8bcbf36379..e0d7335a9d4 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumperCreatorFactoryTest.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.Standa
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.InventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.fixture.FixtureInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.channel.memory.SimpleMemoryPipelineChannel;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DefaultInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLInventoryDumper;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLInventoryDumper;
@@ -53,6 +54,12 @@ public final class InventoryDumperCreatorFactoryTest {
         assertThat(actual, instanceOf(PostgreSQLInventoryDumper.class));
     }
     
+    @Test
+    public void assertInventoryDumperCreatorForOracle() {
+        InventoryDumper actual = createInventoryDumper("Oracle");
+        assertThat(actual, instanceOf(DefaultInventoryDumper.class));
+    }
+    
     @Test
     public void assertInventoryDumperCreatorForFixture() {
         InventoryDumper actual = createInventoryDumper("Fixture");
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
new file mode 100644
index 00000000000..c17297f9c47
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/PipelineSQLBuilderFactoryTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.sqlbuilder;
+
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.shardingsphere.data.pipeline.mysql.sqlbuilder.MySQLPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.opengauss.sqlbuilder.OpenGaussPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.sqlbuilder.PostgreSQLPipelineSQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+public final class PipelineSQLBuilderFactoryTest {
+    
+    @Test
+    public void assertGetInstance() {
+        Collection<Pair<String, Class<? extends PipelineSQLBuilder>>> 
paramResult = Arrays.asList(
+                Pair.of("MySQL", MySQLPipelineSQLBuilder.class), 
Pair.of("PostgreSQL", PostgreSQLPipelineSQLBuilder.class),
+                Pair.of("openGauss", OpenGaussPipelineSQLBuilder.class), 
Pair.of("Oracle", OraclePipelineSQLBuilder.class),
+                Pair.of("DB2", DefaultPipelineSQLBuilder.class)
+        );
+        for (Pair<String, Class<? extends PipelineSQLBuilder>> each : 
paramResult) {
+            PipelineSQLBuilder actual = 
PipelineSQLBuilderFactory.getInstance(each.getKey());
+            assertThat(actual, instanceOf(each.getValue()));
+        }
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java
similarity index 80%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java
index 06ca59cfd4f..a8d7bf9a88d 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/MigrationJobAPIFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIFactoryTest.java
@@ -15,10 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core;
+package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
-import 
org.apache.shardingsphere.data.pipeline.core.fixture.MigrationJobAPIFixture;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import org.junit.Test;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -28,6 +26,6 @@ public final class MigrationJobAPIFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        assertThat(MigrationJobAPIFactory.getInstance(), 
instanceOf(MigrationJobAPIFixture.class));
+        assertThat(MigrationJobAPIFactory.getInstance(), 
instanceOf(MigrationJobAPIImpl.class));
     }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactoryTest.java
similarity index 50%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
rename to 
shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactoryTest.java
index ff70bc5e641..fa76fd61d62 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/test/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/PositionInitializerFactoryTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/spi/ingest/position/PositionInitializerFactoryTest.java
@@ -15,11 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.ingest.position;
+package org.apache.shardingsphere.data.pipeline.spi.ingest.position;
 
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.fixture.FixturePositionInitializer;
+import org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DefaultPositionInitializer;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLPositionInitializer;
+import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.OpenGaussPositionInitializer;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLPositionInitializer;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertThat;
 
@@ -27,6 +34,13 @@ public final class PositionInitializerFactoryTest {
     
     @Test
     public void assertGetInstance() {
-        assertThat(PositionInitializerFactory.getInstance("FIXTURE"), 
instanceOf(FixturePositionInitializer.class));
+        Collection<Pair<String, Class<? extends PositionInitializer>>> 
paramResult = Arrays.asList(
+                Pair.of("MySQL", MySQLPositionInitializer.class), 
Pair.of("PostgreSQL", PostgreSQLPositionInitializer.class),
+                Pair.of("openGauss", OpenGaussPositionInitializer.class), 
Pair.of("Oracle", DefaultPositionInitializer.class)
+        );
+        for (Pair<String, Class<? extends PositionInitializer>> each : 
paramResult) {
+            PositionInitializer actual = 
PositionInitializerFactory.getInstance(each.getKey());
+            assertThat(actual, instanceOf(each.getValue()));
+        }
     }
 }

Reply via email to