This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b95c4fc9d4b Refactor DatabaseTypedSPILoader (#26842)
b95c4fc9d4b is described below
commit b95c4fc9d4b3ee6cc1be7a350f465fd1e074d079
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Jul 8 19:51:14 2023 +0800
Refactor DatabaseTypedSPILoader (#26842)
---
.../infra/spi/DatabaseTypedSPILoader.java | 20 +++----
.../infra/spi/DatabaseTypedSPILoaderTest.java | 55 +++++++++++++++++++
.../infra/spi/fixture/DatabaseTypedSPIFixture.java | 23 ++++++++
.../fixture/impl/FooDatabaseTypedSPIFixture.java | 28 ++++++++++
...phere.infra.spi.fixture.DatabaseTypedSPIFixture | 18 +++++++
...rdingSpherePipelineDataSourceConfiguration.java | 4 +-
.../StandardPipelineDataSourceConfiguration.java | 4 +-
...YamlJobItemIncrementalTasksProgressSwapper.java | 4 +-
.../metadata/generator/PipelineDDLGenerator.java | 4 +-
...RC32MatchDataConsistencyCalculateAlgorithm.java | 4 +-
...DataMatchDataConsistencyCalculateAlgorithm.java | 6 +--
.../data/pipeline/core/dumper/InventoryDumper.java | 6 +--
.../core/importer/sink/PipelineDataSourceSink.java | 4 +-
.../preparer/InventoryRecordsCountCalculator.java | 4 +-
.../core/preparer/InventoryTaskSplitter.java | 4 +-
.../core/preparer/PipelineJobPreparerUtils.java | 16 +++---
.../datasource/AbstractDataSourcePreparer.java | 4 +-
.../checker/AbstractDataSourceChecker.java | 4 +-
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 4 +-
.../migration/api/impl/MigrationJobAPI.java | 4 +-
.../migration/prepare/MigrationJobPreparer.java | 6 +--
.../core/util/spi/PipelineTypedSPILoaderTest.java | 62 ----------------------
22 files changed, 175 insertions(+), 113 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
similarity index 76%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
rename to
infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
index 9be5e7c1251..088bb0dbd7c 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/util/spi/PipelineTypedSPILoader.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.util.spi;
+package org.apache.shardingsphere.infra.spi;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -28,20 +28,20 @@ import
org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import java.util.Optional;
/**
- * Pipeline typed SPI loader.
+ * Database typed SPI loader.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class PipelineTypedSPILoader {
+public final class DatabaseTypedSPILoader {
/**
- * Find database typed service.
+ * Find service.
*
* @param spiClass typed SPI class
* @param databaseType database type
* @param <T> SPI class type
- * @return service
+ * @return found service
*/
- public static <T extends TypedSPI> Optional<T>
findDatabaseTypedService(final Class<T> spiClass, final String databaseType) {
+ public static <T extends TypedSPI> Optional<T> findService(final Class<T>
spiClass, final String databaseType) {
Optional<T> result = TypedSPILoader.findService(spiClass,
databaseType);
if (result.isPresent()) {
return result;
@@ -54,14 +54,14 @@ public final class PipelineTypedSPILoader {
}
/**
- * Get database typed service.
+ * Get service.
*
* @param spiClass typed SPI class
* @param databaseType database type
* @param <T> SPI class type
- * @return service
+ * @return found service
*/
- public static <T extends TypedSPI> T getDatabaseTypedService(final
Class<T> spiClass, final String databaseType) {
- return findDatabaseTypedService(spiClass, databaseType).orElseThrow(()
-> new ServiceProviderNotFoundServerException(spiClass));
+ public static <T extends TypedSPI> T getService(final Class<T> spiClass,
final String databaseType) {
+ return findService(spiClass, databaseType).orElseThrow(() -> new
ServiceProviderNotFoundServerException(spiClass));
}
}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
new file mode 100644
index 00000000000..b59f86b2841
--- /dev/null
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/DatabaseTypedSPILoaderTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi;
+
+import org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture;
+import
org.apache.shardingsphere.infra.util.spi.exception.ServiceProviderNotFoundServerException;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class DatabaseTypedSPILoaderTest {
+
+ @Test
+ void assertFindServiceWithTrunkDatabaseType() {
+
assertTrue(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class,
"MySQL").isPresent());
+ }
+
+ @Test
+ void assertFindServiceWithBranchDatabaseType() {
+
assertTrue(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class,
"MariaDB").isPresent());
+ }
+
+ @Test
+ void assertFindServiceWithUnknownDatabaseType() {
+
assertFalse(DatabaseTypedSPILoader.findService(DatabaseTypedSPIFixture.class,
"Unknown").isPresent());
+ }
+
+ @Test
+ void assertGetExistedService() {
+ assertDoesNotThrow(() ->
DatabaseTypedSPILoader.getService(DatabaseTypedSPIFixture.class, "MySQL"));
+ }
+
+ @Test
+ void assertGetNotExistedService() {
+ assertThrows(ServiceProviderNotFoundServerException.class, () ->
DatabaseTypedSPILoader.getService(DatabaseTypedSPIFixture.class, "Unknown"));
+ }
+}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
new file mode 100644
index 00000000000..eefabb86218
--- /dev/null
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/DatabaseTypedSPIFixture.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi.fixture;
+
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
+
+public interface DatabaseTypedSPIFixture extends TypedSPI {
+}
diff --git
a/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
new file mode 100644
index 00000000000..c3da9f81ddf
--- /dev/null
+++
b/infra/common/src/test/java/org/apache/shardingsphere/infra/spi/fixture/impl/FooDatabaseTypedSPIFixture.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.infra.spi.fixture.impl;
+
+import org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture;
+
+public final class FooDatabaseTypedSPIFixture implements
DatabaseTypedSPIFixture {
+
+ @Override
+ public String getType() {
+ return "MySQL";
+ }
+}
diff --git
a/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
b/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
new file mode 100644
index 00000000000..fc816c9799a
--- /dev/null
+++
b/infra/common/src/test/resources/META-INF/services/org.apache.shardingsphere.infra.spi.fixture.DatabaseTypedSPIFixture
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.infra.spi.fixture.impl.FooDatabaseTypedSPIFixture
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
index f358ec13a22..615b6104163 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/ShardingSpherePipelineDataSourceConfiguration.java
@@ -23,7 +23,7 @@ import lombok.Getter;
import lombok.Setter;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -88,7 +88,7 @@ public final class
ShardingSpherePipelineDataSourceConfiguration implements Pipe
}
private void appendJdbcQueryProperties(final String databaseType) {
- Optional<JdbcQueryPropertiesExtension> extension =
PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class,
databaseType);
+ Optional<JdbcQueryPropertiesExtension> extension =
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class,
databaseType);
if (!extension.isPresent()) {
return;
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
index 0b4f12a6e14..0567dbc89d9 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/datasource/config/impl/StandardPipelineDataSourceConfiguration.java
@@ -22,7 +22,7 @@ import lombok.Getter;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.datasource.JdbcQueryPropertiesExtension;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.metadata.url.JdbcUrlAppender;
import
org.apache.shardingsphere.infra.database.metadata.url.StandardJdbcUrlParser;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -99,7 +99,7 @@ public final class StandardPipelineDataSourceConfiguration
implements PipelineDa
}
private void appendJdbcQueryProperties(final String databaseType, final
Map<String, Object> yamlConfig) {
- Optional<JdbcQueryPropertiesExtension> extension =
PipelineTypedSPILoader.findDatabaseTypedService(JdbcQueryPropertiesExtension.class,
databaseType);
+ Optional<JdbcQueryPropertiesExtension> extension =
DatabaseTypedSPILoader.findService(JdbcQueryPropertiesExtension.class,
databaseType);
if (!extension.isPresent()) {
return;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index 6caeac00923..fb27e9f7628 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
import
org.apache.shardingsphere.data.pipeline.common.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
/**
* YAML job item incremental tasks progress swapper.
@@ -59,7 +59,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper
{
return new JobItemIncrementalTasksProgress(null);
}
// TODO consider to remove parameter databaseType
- PositionInitializer positionInitializer =
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class,
databaseType);
+ PositionInitializer positionInitializer =
DatabaseTypedSPILoader.getService(PositionInitializer.class, databaseType);
IncrementalTaskProgress taskProgress = new
IncrementalTaskProgress(positionInitializer.init(yamlProgress.getPosition()));
taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
return new JobItemIncrementalTasksProgress(taskProgress);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
index 5fa32b5ce4d..9580f676687 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/generator/PipelineDDLGenerator.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.data.pipeline.common.metadata.generator;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ddlgenerator.CreateTableSQLGenerator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.binder.SQLStatementContextFactory;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import
org.apache.shardingsphere.infra.binder.statement.ddl.AlterTableStatementContext;
@@ -76,7 +76,7 @@ public final class PipelineDDLGenerator {
final String schemaName, final String
sourceTableName, final String targetTableName, final SQLParserEngine
parserEngine) throws SQLException {
long startTimeMillis = System.currentTimeMillis();
StringBuilder result = new StringBuilder();
- for (String each :
PipelineTypedSPILoader.getDatabaseTypedService(CreateTableSQLGenerator.class,
databaseType.getType()).generate(sourceDataSource, schemaName,
sourceTableName)) {
+ for (String each :
DatabaseTypedSPILoader.getService(CreateTableSQLGenerator.class,
databaseType.getType()).generate(sourceDataSource, schemaName,
sourceTableName)) {
Optional<String> queryContext = decorate(databaseType,
sourceDataSource, schemaName, targetTableName, parserEngine, each);
queryContext.ifPresent(optional ->
result.append(optional).append(DELIMITER).append(System.lineSeparator()));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 86141cbbe29..f049106641f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Data
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedCRC32DataConsistencyCalculateAlgorithmException;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.annotation.SPIDescription;
@@ -52,7 +52,7 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
@Override
public Iterable<DataConsistencyCalculatedResult> calculate(final
DataConsistencyCalculateParameter param) {
- PipelineSQLBuilder sqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
param.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
param.getDatabaseType());
List<CalculatedItem> calculatedItems =
param.getColumnNames().stream().map(each -> calculateCRC32(sqlBuilder, param,
each)).collect(Collectors.toList());
return Collections.singletonList(new
CalculatedResult(calculatedItems.get(0).getRecordsCount(),
calculatedItems.stream().map(CalculatedItem::getCrc32).collect(Collectors.toList())));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 860d891246d..83aa4a547e9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -29,7 +29,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineSQLExcepti
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -91,7 +91,7 @@ public final class DataMatchDataConsistencyCalculateAlgorithm
extends AbstractSt
try {
Collection<Collection<Object>> records = new LinkedList<>();
Object maxUniqueKeyValue = null;
- ColumnValueReader columnValueReader =
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class,
param.getDatabaseType());
+ ColumnValueReader columnValueReader =
DatabaseTypedSPILoader.getService(ColumnValueReader.class,
param.getDatabaseType());
ResultSet resultSet = calculationContext.getResultSet();
while (resultSet.next()) {
ShardingSpherePreconditions.checkState(!isCanceling(), () ->
new
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(),
param.getLogicTableName()));
@@ -167,7 +167,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
if (null == param.getUniqueKey()) {
throw new UnsupportedOperationException("Data consistency of
DATA_MATCH type not support table without unique key and primary key now");
}
- PipelineSQLBuilder sqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
param.getDatabaseType());
+ PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
param.getDatabaseType());
boolean firstQuery = null == param.getTableCheckPosition();
return sqlBuilder.buildQueryAllOrderingSQL(param.getSchemaName(),
param.getLogicTableName(), param.getColumnNames(),
param.getUniqueKey().getName(), firstQuery);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index c87ba9f56fb..7926fa26341 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -47,7 +47,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
@@ -91,8 +91,8 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
this.channel = channel;
this.dataSource = dataSource;
String databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
- sqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
databaseType);
- columnValueReader =
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class,
databaseType);
+ sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class, databaseType);
+ columnValueReader =
DatabaseTypedSPILoader.getService(ColumnValueReader.class, databaseType);
this.metaDataLoader = metaDataLoader;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index c4a4155206a..a7b69a89d39 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -37,7 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineImport
import org.apache.shardingsphere.data.pipeline.core.importer.DataRecordMerger;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -79,7 +79,7 @@ public final class PipelineDataSourceSink implements
PipelineSink {
this.importerConfig = importerConfig;
rateLimitAlgorithm = importerConfig.getRateLimitAlgorithm();
this.dataSourceManager = dataSourceManager;
- pipelineSqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
importerConfig.getDataSourceConfig().getDatabaseType().getType());
+ pipelineSqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
importerConfig.getDataSourceConfig().getDatabaseType().getType());
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index faf961bf0c8..cc8327ca6e6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
@@ -55,7 +55,7 @@ public final class InventoryRecordsCountCalculator {
public static long getTableRecordsCount(final InventoryDumperConfiguration
dumperConfig, final PipelineDataSourceWrapper dataSource) {
String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
String actualTableName = dumperConfig.getActualTableName();
- PipelineSQLBuilder pipelineSQLBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
dataSource.getDatabaseType().getType());
+ PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
dataSource.getDatabaseType().getType());
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
try {
if (sql.isPresent()) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index ca9537187d5..5a0dae236ef 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -48,7 +48,7 @@ import
org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -203,7 +203,7 @@ public final class InventoryTaskSplitter {
private Range<Long> getUniqueKeyValuesRange(final
InventoryIncrementalJobItemContext jobItemContext, final DataSource dataSource,
final InventoryDumperConfiguration dumperConfig) {
String uniqueKey = dumperConfig.getUniqueKeyColumns().get(0).getName();
- String sql =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
jobItemContext.getJobConfig().getSourceDatabaseType())
+ String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobItemContext.getJobConfig().getSourceDatabaseType())
.buildUniqueKeyMinMaxValuesSQL(dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName())),
dumperConfig.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index 2b88928f2bd..79af4bd3485 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -36,7 +36,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.checker.
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.position.PositionInitializer;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.BranchDatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.datasource.pool.creator.DataSourcePoolCreator;
@@ -71,7 +71,7 @@ public final class PipelineJobPreparerUtils {
if ("H2".equalsIgnoreCase(databaseType)) {
return TypedSPILoader.findService(IncrementalDumperCreator.class,
databaseType).isPresent();
}
- return
PipelineTypedSPILoader.findDatabaseTypedService(IncrementalDumperCreator.class,
databaseType).isPresent();
+ return
DatabaseTypedSPILoader.findService(IncrementalDumperCreator.class,
databaseType).isPresent();
}
/**
@@ -82,7 +82,7 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException if prepare target schema fail
*/
public static void prepareTargetSchema(final String databaseType, final
PrepareTargetSchemasParameter prepareTargetSchemasParam) throws SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer =
PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class,
databaseType);
+ Optional<DataSourcePreparer> dataSourcePreparer =
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
@@ -114,7 +114,7 @@ public final class PipelineJobPreparerUtils {
* @throws SQLException SQL exception
*/
public static void prepareTargetTables(final String databaseType, final
PrepareTargetTablesParameter prepareTargetTablesParam) throws SQLException {
- Optional<DataSourcePreparer> dataSourcePreparer =
PipelineTypedSPILoader.findDatabaseTypedService(DataSourcePreparer.class,
databaseType);
+ Optional<DataSourcePreparer> dataSourcePreparer =
DatabaseTypedSPILoader.findService(DataSourcePreparer.class, databaseType);
if (!dataSourcePreparer.isPresent()) {
log.info("dataSourcePreparer null, ignore prepare target");
return;
@@ -143,7 +143,7 @@ public final class PipelineJobPreparerUtils {
}
String databaseType =
dumperConfig.getDataSourceConfig().getDatabaseType().getType();
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
- return
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class,
databaseType).init(dataSource, dumperConfig.getJobId());
+ return DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType).init(dataSource, dumperConfig.getJobId());
}
/**
@@ -156,7 +156,7 @@ public final class PipelineJobPreparerUtils {
if (dataSources.isEmpty()) {
return;
}
- DataSourceChecker dataSourceChecker =
PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class,
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+ DataSourceChecker dataSourceChecker =
DatabaseTypedSPILoader.findService(DataSourceChecker.class,
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
dataSourceChecker.checkConnection(dataSources);
dataSourceChecker.checkPrivilege(dataSources);
dataSourceChecker.checkVariable(dataSources);
@@ -174,7 +174,7 @@ public final class PipelineJobPreparerUtils {
log.info("target data source is empty, skip check");
return;
}
- DataSourceChecker dataSourceChecker =
PipelineTypedSPILoader.findDatabaseTypedService(DataSourceChecker.class,
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
+ DataSourceChecker dataSourceChecker =
DatabaseTypedSPILoader.findService(DataSourceChecker.class,
databaseType).orElseGet(() -> new BasicDataSourceChecker(databaseType));
dataSourceChecker.checkConnection(targetDataSources);
dataSourceChecker.checkTargetTable(targetDataSources,
importerConfig.getTableNameSchemaNameMapping(),
importerConfig.getLogicTableNames());
}
@@ -188,7 +188,7 @@ public final class PipelineJobPreparerUtils {
*/
public static void destroyPosition(final String jobId, final
PipelineDataSourceConfiguration pipelineDataSourceConfig) throws SQLException {
DatabaseType databaseType = pipelineDataSourceConfig.getDatabaseType();
- PositionInitializer positionInitializer =
PipelineTypedSPILoader.getDatabaseTypedService(PositionInitializer.class,
databaseType.getType());
+ PositionInitializer positionInitializer =
DatabaseTypedSPILoader.getService(PositionInitializer.class,
databaseType.getType());
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup database type:{}, data source type:{}",
databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
index 6bbd0f0424a..909612efb90 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/AbstractDataSourcePreparer.java
@@ -25,7 +25,7 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.metadata.generator.PipelineDDLGenerator;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.parser.SQLParserEngine;
@@ -57,7 +57,7 @@ public abstract class AbstractDataSourcePreparer implements
DataSourcePreparer {
}
CreateTableConfiguration createTableConfig =
param.getCreateTableConfig();
String defaultSchema =
DatabaseTypeEngine.getDefaultSchemaName(targetDatabaseType).orElse(null);
- PipelineSQLBuilder sqlBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
targetDatabaseType.getType());
+ PipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
targetDatabaseType.getType());
Collection<String> createdSchemaNames = new HashSet<>();
for (CreateTableEntry each :
createTableConfig.getCreateTableEntries()) {
String targetSchemaName =
each.getTargetName().getSchemaName().getOriginal();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
index 1517b226a81..a7ae7bfbe73 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/checker/AbstractDataSourceChecker.java
@@ -23,7 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import
org.apache.shardingsphere.data.pipeline.spi.check.datasource.DataSourceChecker;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -65,7 +65,7 @@ public abstract class AbstractDataSourceChecker implements
DataSourceChecker {
}
private boolean checkEmpty(final DataSource dataSource, final String
schemaName, final String tableName) throws SQLException {
- String sql =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
+ String sql =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
getDatabaseType()).buildCheckEmptySQL(schemaName, tableName);
try (
Connection connection = dataSource.getConnection();
PreparedStatement preparedStatement =
connection.prepareStatement(sql);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index 28d9617a341..5853056800f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -45,7 +45,7 @@ import
org.apache.shardingsphere.data.pipeline.core.preparer.PipelineJobPreparer
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
@@ -146,7 +146,7 @@ public final class CDCJobPreparer {
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
jobItemContext.getJobProcessContext().getPipelineChannelCreator(),
taskProgress);
channelProgressPairs.add(new CDCChannelProgressPair(channel,
jobItemContext));
- Dumper dumper =
PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class,
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+ Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
.createIncrementalDumper(dumperConfig,
dumperConfig.getPosition(), channel, jobItemContext.getSourceMetaDataLoader());
boolean needSorting = needSorting(ImporterType.INCREMENTAL,
hasGlobalCSN(importerConfig.getDataSourceConfig().getDatabaseType()));
Importer importer = importerUsed.get() ? null
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index de2dc1b6c04..d1f59ab2c52 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -73,7 +73,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -396,7 +396,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
- PipelineSQLBuilder pipelineSQLBuilder =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
jobConfig.getTargetDatabaseType());
+ PipelineSQLBuilder pipelineSQLBuilder =
DatabaseTypedSPILoader.getService(PipelineSQLBuilder.class,
jobConfig.getTargetDatabaseType());
TableNameSchemaNameMapping mapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
try (
PipelineDataSourceWrapper dataSource =
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
index a8cab249a97..f5c2cc09d3d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/prepare/MigrationJobPreparer.java
@@ -56,7 +56,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.IncrementalDumperCreator;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
+import org.apache.shardingsphere.infra.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.lock.GlobalLockNames;
import org.apache.shardingsphere.infra.lock.LockContext;
@@ -161,7 +161,7 @@ public final class MigrationJobPreparer {
CreateTableConfiguration createTableConfig =
jobItemContext.getTaskConfig().getCreateTableConfig();
PipelineDataSourceManager dataSourceManager =
jobItemContext.getDataSourceManager();
PrepareTargetSchemasParameter prepareTargetSchemasParam = new
PrepareTargetSchemasParameter(
-
PipelineTypedSPILoader.getDatabaseTypedService(DatabaseType.class,
targetDatabaseType), createTableConfig, dataSourceManager);
+ DatabaseTypedSPILoader.getService(DatabaseType.class,
targetDatabaseType), createTableConfig, dataSourceManager);
PipelineJobPreparerUtils.prepareTargetSchema(targetDatabaseType,
prepareTargetSchemasParam);
ShardingSphereMetaData metaData =
PipelineContextManager.getContext(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())).getContextManager().getMetaDataContexts().getMetaData();
SQLParserEngine sqlParserEngine =
PipelineJobPreparerUtils.getSQLParserEngine(metaData,
jobConfig.getTargetDatabaseName());
@@ -193,7 +193,7 @@ public final class MigrationJobPreparer {
ExecuteEngine incrementalExecuteEngine =
jobItemContext.getJobProcessContext().getIncrementalExecuteEngine();
IncrementalTaskProgress taskProgress =
PipelineTaskUtils.createIncrementalTaskProgress(dumperConfig.getPosition(),
jobItemContext.getInitProgress());
PipelineChannel channel =
PipelineTaskUtils.createIncrementalChannel(importerConfig.getConcurrency(),
pipelineChannelCreator, taskProgress);
- Dumper dumper =
PipelineTypedSPILoader.getDatabaseTypedService(IncrementalDumperCreator.class,
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
+ Dumper dumper =
DatabaseTypedSPILoader.getService(IncrementalDumperCreator.class,
dumperConfig.getDataSourceConfig().getDatabaseType().getType())
.createIncrementalDumper(dumperConfig,
dumperConfig.getPosition(), channel, sourceMetaDataLoader);
Collection<Importer> importers = createImporters(importerConfig,
jobItemContext.getSink(), channel, jobItemContext);
PipelineTask incrementalTask = new
IncrementalTask(dumperConfig.getDataSourceName(), incrementalExecuteEngine,
dumper, importers, taskProgress);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
deleted file mode 100644
index aefb6eff3de..00000000000
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/spi/PipelineTypedSPILoaderTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.test.it.data.pipeline.core.util.spi;
-
-import
org.apache.shardingsphere.data.pipeline.mysql.ingest.MySQLColumnValueReader;
-import
org.apache.shardingsphere.data.pipeline.spi.ingest.dumper.ColumnValueReader;
-import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
-import org.apache.shardingsphere.data.pipeline.util.spi.PipelineTypedSPILoader;
-import org.junit.jupiter.api.Test;
-
-import java.util.Optional;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-class PipelineTypedSPILoaderTest {
-
- @Test
- void assertFindDatabaseTypedService() {
- Optional<PipelineSQLBuilder> actual =
PipelineTypedSPILoader.findDatabaseTypedService(PipelineSQLBuilder.class,
"MariaDB");
- assertTrue(actual.isPresent());
- assertThat(actual.get().getType(), is("MySQL"));
- }
-
- @Test
- void assertGetPipelineSQLBuilder() {
- PipelineSQLBuilder actual =
PipelineTypedSPILoader.getDatabaseTypedService(PipelineSQLBuilder.class,
"MariaDB");
- assertNotNull(actual);
- assertThat(actual.getType(), is("MySQL"));
- }
-
- @Test
- void assertFindColumnValueReaderByUnknown() {
- Optional<ColumnValueReader> actual =
PipelineTypedSPILoader.findDatabaseTypedService(ColumnValueReader.class,
"Unknown");
- assertFalse(actual.isPresent());
- }
-
- @Test
- void assertGetColumnValueReaderByBranchDB() {
- ColumnValueReader actual =
PipelineTypedSPILoader.getDatabaseTypedService(ColumnValueReader.class,
"MariaDB");
- assertNotNull(actual);
- assertThat(actual.getClass().getName(),
is(MySQLColumnValueReader.class.getName()));
- }
-}