This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b95c4fc9d4b Refactor DatabaseTypedSPILoader (#26842)
b95c4fc9d4b is described below

commit b95c4fc9d4b3ee6cc1be7a350f465fd1e074d079
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 8 19:51:14 2023 +0800

    Refactor DatabaseTypedSPILoader (#26842)
---
 .../infra/spi/DatabaseTypedSPILoader.java          | 20 +++----
 .../infra/spi/DatabaseTypedSPILoaderTest.java      | 55 +++++++++++++++++++
 .../infra/spi/fixture/DatabaseTypedSPIFixture.java | 23 ++++++++
 .../fixture/impl/FooDatabaseTypedSPIFixture.java   | 28 ++++++++++
 ...phere.infra.spi.fixture.DatabaseTypedSPIFixture | 18 +++++++
 ...rdingSpherePipelineDataSourceConfiguration.java |  4 +-
 .../StandardPipelineDataSourceConfiguration.java   |  4 +-
 ...YamlJobItemIncrementalTasksProgressSwapper.java |  4 +-
 .../metadata/generator/PipelineDDLGenerator.java   |  4 +-
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  4 +-
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  6 +--
 .../data/pipeline/core/dumper/InventoryDumper.java |  6 +--
 .../core/importer/sink/PipelineDataSourceSink.java |  4 +-
 .../preparer/InventoryRecordsCountCalculator.java  |  4 +-
 .../core/preparer/InventoryTaskSplitter.java       |  4 +-
 .../core/preparer/PipelineJobPreparerUtils.java    | 16 +++---
 .../datasource/AbstractDataSourcePreparer.java     |  4 +-
 .../checker/AbstractDataSourceChecker.java         |  4 +-
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  4 +-
 .../migration/api/impl/MigrationJobAPI.java        |  4 +-
 .../migration/prepare/MigrationJobPreparer.java    |  6 +--
 .../core/util/spi/PipelineTypedSPILoaderTest.java  | 62 ----------------------
 22 files changed, 175 insertions(+), 113 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
similarity index 76%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
rename to 
infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
index 9be5e7c1251..088bb0dbd7c 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.util.spi;
+package org.apache.shardingsphere.infra.spi;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -28,20 +28,20 @@ import 
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import java.util.Optional;
 
 /**
- * Pipeline typed SPI loader.
+ * Database typed SPI loader.
  */
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineTypedSPILoader {
+public final class DatabaseTypedSPILoader {
     
     /**
-     * Find database typed service.
+     * Find service.
      *
      * @param spiClass typed SPI class
      * @param databaseType database type
      * @param <T> SPI class type
-     * @return service
+     * @return found service
      */
-    public static <T extends TypedSPI> Optional<T> 
findDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+    public static <T extends TypedSPI> Optional<T> findService(final Class<T> 
spiClass, final String databaseType) {
         Optional<T> result = TypedSPILoader.findService(spiClass, 
databaseType);
         if (result.isPresent()) {
             return result;
@@ -54,14 +54,14 @@ public final class PipelineTypedSPILoader {
     }
     
     /**
-     * Get database typed service.
+     * Get service.
      *
      * @param spiClass typed SPI class
      * @param databaseType database type
      * @param <T> SPI class type
-     * @return service
+     * @return found service
      */
-    public static <T extends TypedSPI> T getDatabaseTypedService(final 
Class<T> spiClass, final String databaseType) {
-        return findDatabaseTypedService(spiClass, databaseType).orElseThrow(() 
-> new ServiceProviderNotFoundServerException(spiClass));
+    public static <T extends TypedSPI> T getService(final Class<T> spiClass, 
final String databaseType) {
+        return findService(spiClass, databaseType).orElseThrow(() -> new 
ServiceProviderNotFoundServerException(spiClass));
     }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
new file mode 100644
index 00000000000..b59f86b2841
--- /dev/null
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi;
+
+import org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture;
+import 
org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundServerException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DatabaseTypedSPILoaderTest {
+    
+    @Test
+    void assertFindServiceWithTrunkDatabaseType() {
+        
assertTrue(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class, 
"MySQL").isPresent());
+    }
+    
+    @Test
+    void assertFindServiceWithBranchDatabaseType() {
+        
assertTrue(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class, 
"MariaDB").isPresent());
+    }
+    
+    @Test
+    void assertFindServiceWithUnknownDatabaseType() {
+        
assertFalse(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class, 
"Unknown").isPresent());
+    }
+    
+    @Test
+    void assertGetExistedService() {
+        assertDoesNotThrow(() -> 
DatabaseTypedSPILoader.getService(DatabaseTypedSPIFixture.class, "MySQL"));
+    }
+    
+    @Test
+    void assertGetNotExistedService() {
+        assertThrows(ServiceProviderNotFoundServerException.class, () -> 
DatabaseTypedSPILoader.getService(DatabaseTypedSPIFixture.class, "Unknown"));
+    }
+}
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
new file mode 100644
index 00000000000..eefabb86218
--- /dev/null
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi.fixture;
+
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+
+public interface DatabaseTypedSPIFixture extends TypedSPI {
+}
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
new file mode 100644
index 00000000000..c3da9f81ddf
--- /dev/null
+++ 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi.fixture.impl;
+
+import org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture;
+
+public final class FooDatabaseTypedSPIFixture implements 
DatabaseTypedSPIFixture {
+    
+    @Override
+    public String getType() {
+        return "MySQL";
+    }
+}
diff --git 
a/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
 
b/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
new file mode 100644
index 00000000000..fc816c9799a
--- /dev/null
+++ 
b/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.infra.spi.fixture.impl.FooDatabaseTypedSPIFixture
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index f358ec13a22..615b6104163 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -23,7 +23,7 @@ import lombok.Getter;
 import lombok.Setter;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import 
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -88,7 +88,7 @@ public final class 
ShardingSpherePipelineDataSourceConfiguration implements Pipe
     }
     
     private void appendJdbcQueryProperties(final String databaseType) {
-        Optional<JdbcQueryPropertiesExtension> extension = 
PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class,
 databaseType);
+        Optional<JdbcQueryPropertiesExtension> extension = 
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, 
databaseType);
         if (!extension.isPresent()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 0b4f12a6e14..0567dbc89d9 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -22,7 +22,7 @@ import lombok.Getter;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
 import 
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -99,7 +99,7 @@ public final class StandardPipelineDataSourceConfiguration 
implements PipelineDa
     }
     
     private void appendJdbcQueryProperties(final String databaseType, final 
Map<String, Object> yamlConfig) {
-        Optional<JdbcQueryPropertiesExtension> extension = 
PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class,
 databaseType);
+        Optional<JdbcQueryPropertiesExtension> extension = 
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class, 
databaseType);
         if (!extension.isPresent()) {
             return;
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 6caeac00923..fb27e9f7628 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
 import 
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 /**
  * YAML job item incremental tasks progress swapper.
@@ -59,7 +59,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
             return new JobItemIncrementalTasksProgress(null);
         }
         // TODO consider to remove parameter databaseType
-        PositionInitializer positionInitializer = 
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, 
databaseType);
+        PositionInitializer positionInitializer = 
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
         IncrementalTaskProgress taskProgress = new 
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
         taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
         return new JobItemIncrementalTasksProgress(taskProgress);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
index 5fa32b5ce4d..9580f676687 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.common.metadata.generator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
 import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
 import 
org.apache.shardingsphere.infra.binder.statement.ddl.AlterTableStatementContext;
@@ -76,7 +76,7 @@ public final class PipelineDDLGenerator {
                                    final String schemaName, final String 
sourceTableName, final String targetTableName, final SQLParserEngine 
parserEngine) throws SQLException {
         long startTimeMillis = System.currentTimeMillis();
         StringBuilder result = new StringBuilder();
-        for (String each : 
PipelineTypedSPILoader.getDatabaseTypedService(CreateTableSQLGenerator.class, 
databaseType.getType()).generate(sourceDataSource, schemaName, 
sourceTableName)) {
+        for (String each : 
DatabaseTypedSPILoader.getService(CreateTableSQLGenerator.class, 
databaseType.getType()).generate(sourceDataSource, schemaName, 
sourceTableName)) {
             Optional<String> queryContext = decorate(databaseType, 
sourceDataSource, schemaName, targetTableName, parserEngine, each);
             queryContext.ifPresent(optional -> 
result.append(optional).append(DELIMITER).append(System.lineSeparator()));
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 86141cbbe29..f049106641f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Data
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
@@ -52,7 +52,7 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
     
     @Override
     public Iterable<DataConsistencyCalculatedResult> calculate(final 
DataConsistencyCalculateParameter param) {
-        PipelineSQLBuilder sqlBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
param.getDatabaseType());
+        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
param.getDatabaseType());
         List<CalculatedItem> calculatedItems = 
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param, 
each)).collect(Collectors.toList());
         return Collections.singletonList(new 
CalculatedResult(calculatedItems.get(0).getRecordsCount(), 
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 860d891246d..83aa4a547e9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -29,7 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -91,7 +91,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm 
extends AbstractSt
         try {
             Collection<Collection<Object>> records = new LinkedList<>();
             Object maxUniqueKeyValue = null;
-            ColumnValueReader columnValueReader = 
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, 
param.getDatabaseType());
+            ColumnValueReader columnValueReader = 
DatabaseTypedSPILoader.getService(ColumnValueReader.class, 
param.getDatabaseType());
             ResultSet resultSet = calculationContext.getResultSet();
             while (resultSet.next()) {
                 ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName()));
@@ -167,7 +167,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         if (null == param.getUniqueKey()) {
             throw new UnsupportedOperationException("Data consistency of 
DATA_MATCH type not support table without unique key and primary key now");
         }
-        PipelineSQLBuilder sqlBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
param.getDatabaseType());
+        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
param.getDatabaseType());
         boolean firstQuery = null == param.getTableCheckPosition();
         return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(), param.getColumnNames(), 
param.getUniqueKey().getName(), firstQuery);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index c87ba9f56fb..7926fa26341 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -47,7 +47,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -91,8 +91,8 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         this.channel = channel;
         this.dataSource = dataSource;
         String databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
-        sqlBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
databaseType);
-        columnValueReader = 
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, 
databaseType);
+        sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+        columnValueReader = 
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
         this.metaDataLoader = metaDataLoader;
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index c4a4155206a..a7b69a89d39 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -37,7 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImport
 import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -79,7 +79,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         this.importerConfig = importerConfig;
         rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
         this.dataSourceManager = dataSourceManager;
-        pipelineSqlBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
importerConfig.getDataSourceConfig().getDatabaseType().getType());
+        pipelineSqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
importerConfig.getDataSourceConfig().getDatabaseType().getType());
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index faf961bf0c8..cc8327ca6e6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -55,7 +55,7 @@ public final class InventoryRecordsCountCalculator {
     public static long getTableRecordsCount(final InventoryDumperConfiguration 
dumperConfig, final PipelineDataSourceWrapper dataSource) {
         String schemaName = dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName()));
         String actualTableName = dumperConfig.getActualTableName();
-        PipelineSQLBuilder pipelineSQLBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
dataSource.getDatabaseType().getType());
+        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
dataSource.getDatabaseType().getType());
         Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
         try {
             if (sql.isPresent()) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index ca9537187d5..5a0dae236ef 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -203,7 +203,7 @@ public final class InventoryTaskSplitter {
     
     private Range<Long> getUniqueKeyValuesRange(final 
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource, 
final InventoryDumperConfiguration dumperConfig) {
         String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
-        String sql = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
jobItemContext.getJobConfig().getSourceDatabaseType())
+        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobItemContext.getJobConfig().getSourceDatabaseType())
                 .buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new 
LogicTableName(dumperConfig.getLogicTableName())), 
dumperConfig.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 2b88928f2bd..79af4bd3485 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -36,7 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
@@ -71,7 +71,7 @@ public final class PipelineJobPreparerUtils {
         if ("H2".equalsIgnoreCase(databaseType)) {
             return TypedSPILoader.findService(IncrementalDumperCreator.class, 
databaseType).isPresent();
         }
-        return 
PipelineTypedSPILoader.findDatabaseTypedService(IncrementalDumperCreator.class, 
databaseType).isPresent();
+        return 
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class, 
databaseType).isPresent();
     }
     
     /**
@@ -82,7 +82,7 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException if prepare target schema fail
      */
     public static void prepareTargetSchema(final String databaseType, final 
PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = 
PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, 
databaseType);
+        Optional<DataSourcePreparer> dataSourcePreparer = 
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
@@ -114,7 +114,7 @@ public final class PipelineJobPreparerUtils {
      * @throws SQLException SQL exception
      */
     public static void prepareTargetTables(final String databaseType, final 
PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
-        Optional<DataSourcePreparer> dataSourcePreparer = 
PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class, 
databaseType);
+        Optional<DataSourcePreparer> dataSourcePreparer = 
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
         if (!dataSourcePreparer.isPresent()) {
             log.info("dataSourcePreparer null, ignore prepare target");
             return;
@@ -143,7 +143,7 @@ public final class PipelineJobPreparerUtils {
         }
         String databaseType = 
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
         DataSource dataSource = 
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
-        return 
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, 
databaseType).init(dataSource, dumperConfig.getJobId());
+        return DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType).init(dataSource, dumperConfig.getJobId());
     }
     
     /**
@@ -156,7 +156,7 @@ public final class PipelineJobPreparerUtils {
         if (dataSources.isEmpty()) {
             return;
         }
-        DataSourceChecker dataSourceChecker = 
PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+        DataSourceChecker dataSourceChecker = 
DatabaseTypedSPILoader.findService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
         dataSourceChecker.checkConnection(dataSources);
         dataSourceChecker.checkPrivilege(dataSources);
         dataSourceChecker.checkVariable(dataSources);
@@ -174,7 +174,7 @@ public final class PipelineJobPreparerUtils {
             log.info("target data source is empty, skip check");
             return;
         }
-        DataSourceChecker dataSourceChecker = 
PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+        DataSourceChecker dataSourceChecker = 
DatabaseTypedSPILoader.findService(DataSourceChecker.class, 
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
         dataSourceChecker.checkConnection(targetDataSources);
         dataSourceChecker.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
     }
@@ -188,7 +188,7 @@ public final class PipelineJobPreparerUtils {
      */
     public static void destroyPosition(final String jobId, final 
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
         DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
-        PositionInitializer positionInitializer = 
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class, 
databaseType.getType());
+        PositionInitializer positionInitializer = 
DatabaseTypedSPILoader.getService(PositionInitializer.class, 
databaseType.getType());
         final long startTimeMillis = System.currentTimeMillis();
         log.info("Cleanup database type:{}, data source type:{}", 
databaseType.getType(), pipelineDataSourceConfig.getType());
         if (pipelineDataSourceConfig instanceof 
ShardingSpherePipelineDataSourceConfiguration) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index 6bbd0f0424a..909612efb90 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -25,7 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.parser.SQLParserEngine;
@@ -57,7 +57,7 @@ public abstract class AbstractDataSourcePreparer implements 
DataSourcePreparer {
         }
         CreateTableConfiguration createTableConfig = 
param.getCreateTableConfig();
         String defaultSchema = 
DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
-        PipelineSQLBuilder sqlBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
targetDatabaseType.getType());
+        PipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
targetDatabaseType.getType());
         Collection<String> createdSchemaNames = new HashSet<>();
         for (CreateTableEntry each : 
createTableConfig.getCreateTableEntries()) {
             String targetSchemaName = 
each.getTargetName().getSchemaName().getOriginal();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index 1517b226a81..a7ae7bfbe73 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -65,7 +65,7 @@ public abstract class AbstractDataSourceChecker implements 
DataSourceChecker {
     }
     
     private boolean checkEmpty(final DataSource dataSource, final String 
schemaName, final String tableName) throws SQLException {
-        String sql = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
+        String sql = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 28d9617a341..5853056800f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -45,7 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
 import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 
@@ -146,7 +146,7 @@ public final class CDCJobPreparer {
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), 
jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), 
jobItemContext.getJobProcessContext().getPipelineChannelCreator(), 
taskProgress);
         channelProgressPairs.add(new CDCChannelProgressPair(channel, 
jobItemContext));
-        Dumper dumper = 
PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, 
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
                 .createIncrementalDumper(dumperConfig, 
dumperConfig.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
         boolean needSorting = needSorting(ImporterType.INCREMENTAL, 
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
         Importer importer = importerUsed.get() ? null
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index de2dc1b6c04..d1f59ab2c52 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -73,7 +73,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -396,7 +396,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
-        PipelineSQLBuilder pipelineSQLBuilder = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
jobConfig.getTargetDatabaseType());
+        PipelineSQLBuilder pipelineSQLBuilder = 
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, 
jobConfig.getTargetDatabaseType());
         TableNameSchemaNameMapping mapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index a8cab249a97..f5c2cc09d3d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -56,7 +56,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.lock.GlobalLockNames;
 import org.apache.shardingsphere.infra.lock.LockContext;
@@ -161,7 +161,7 @@ public final class MigrationJobPreparer {
         CreateTableConfiguration createTableConfig = 
jobItemContext.getTaskConfig().getCreateTableConfig();
         PipelineDataSourceManager dataSourceManager = 
jobItemContext.getDataSourceManager();
         PrepareTargetSchemasParameter prepareTargetSchemasParam = new 
PrepareTargetSchemasParameter(
-                
PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class, 
targetDatabaseType), createTableConfig, dataSourceManager);
+                DatabaseTypedSPILoader.getService(DatabaseType.class, 
targetDatabaseType), createTableConfig, dataSourceManager);
         PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType, 
prepareTargetSchemasParam);
         ShardingSphereMetaData metaData = 
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
         SQLParserEngine sqlParserEngine = 
PipelineJobPreparerUtils.getSQLParserEngine(metaData, 
jobConfig.getTargetDatabaseName());
@@ -193,7 +193,7 @@ public final class MigrationJobPreparer {
         ExecuteEngine incrementalExecuteEngine = 
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
         IncrementalTaskProgress taskProgress = 
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(), 
jobItemContext.getInitProgress());
         PipelineChannel channel = 
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(), 
pipelineChannelCreator, taskProgress);
-        Dumper dumper = 
PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class, 
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+        Dumper dumper = 
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class, 
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
                 .createIncrementalDumper(dumperConfig, 
dumperConfig.getPosition(), channel, sourceMetaDataLoader);
         Collection<Importer> importers = createImporters(importerConfig, 
jobItemContext.getSink(), channel, jobItemContext);
         PipelineTask incrementalTask = new 
IncrementalTask(dumperConfig.getDataSourceName(), incrementalExecuteEngine, 
dumper, importers, taskProgress);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
deleted file mode 100644
index aefb6eff3de..00000000000
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.util.spi;
-
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader;
-import 
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.junit.jupiter.api.Test;
-
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class PipelineTypedSPILoaderTest {
-    
-    @Test
-    void assertFindDatabaseTypedService() {
-        Optional<PipelineSQLBuilder> actual = 
PipelineTypedSPILoader.findDatabaseTypedService(PipelineSQLBuilder.class, 
"MariaDB");
-        assertTrue(actual.isPresent());
-        assertThat(actual.get().getType(), is("MySQL"));
-    }
-    
-    @Test
-    void assertGetPipelineSQLBuilder() {
-        PipelineSQLBuilder actual = 
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class, 
"MariaDB");
-        assertNotNull(actual);
-        assertThat(actual.getType(), is("MySQL"));
-    }
-    
-    @Test
-    void assertFindColumnValueReaderByUnknown() {
-        Optional<ColumnValueReader> actual = 
PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class, 
"Unknown");
-        assertFalse(actual.isPresent());
-    }
-    
-    @Test
-    void assertGetColumnValueReaderByBranchDB() {
-        ColumnValueReader actual = 
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class, 
"MariaDB");
-        assertNotNull(actual);
-        assertThat(actual.getClass().getName(), 
is(MySQLColumnValueReader.class.getName()));
-    }
-}

Reply via email to