This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e980e13de7f Refactor ShardingColumnsExtractor as TypedSPI (#23736)
e980e13de7f is described below
commit e980e13de7f06f94f428d06fb5c17bafe58555be
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Jan 27 11:00:30 2023 +0800
Refactor ShardingColumnsExtractor as TypedSPI (#23736)
---
.../sharding/data/pipeline/ShardingColumnsExtractorImpl.java | 5 +++++
.../data/pipeline/spi/sharding/ShardingColumnsExtractor.java | 3 ++-
.../apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 5 +++--
.../data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 4 ++--
4 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
index 0711d86e71f..ef055c4f6f6 100644
---
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
@@ -78,4 +78,9 @@ public final class ShardingColumnsExtractorImpl implements
ShardingColumnsExtrac
}
return Collections.emptySet();
}
+
+ @Override
+ public String getType() {
+ return "Sharding";
+ }
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
index 353bd5e1b9f..3c9a95d638a 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.data.pipeline.spi.sharding;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import java.util.Collection;
@@ -28,7 +29,7 @@ import java.util.Set;
/**
* Sharding columns extractor.
*/
-public interface ShardingColumnsExtractor extends RequiredSPI {
+public interface ShardingColumnsExtractor extends TypedSPI, RequiredSPI {
/**
* Get sharding columns map.
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index bd57673c3f1..09e8aac196e 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -64,7 +64,7 @@ import
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
+import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -81,6 +81,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -205,7 +206,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- Map<LogicTableName, Set<String>> shardingColumnsMap =
RequiredSPIRegistry.getService(ShardingColumnsExtractor.class)
+ Map<LogicTableName, Set<String>> shardingColumnsMap =
TypedSPIRegistry.getService(ShardingColumnsExtractor.class, "Sharding", new
Properties())
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
logicalTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 3a49262abba..88a9d76b6cd 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -86,7 +86,6 @@ import
org.apache.shardingsphere.infra.datasource.props.DataSourceProperties;
import
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPIRegistry;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
@@ -109,6 +108,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
@@ -196,7 +196,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(tableNameSchemaMap);
CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig);
DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
- Map<LogicTableName, Set<String>> shardingColumnsMap =
RequiredSPIRegistry.getService(ShardingColumnsExtractor.class).getShardingColumnsMap(
+ Map<LogicTableName, Set<String>> shardingColumnsMap =
TypedSPIRegistry.getService(ShardingColumnsExtractor.class, "Sharding", new
Properties()).getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new
LogicTableName(jobConfig.getTargetTableName())));
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
return new
MigrationTaskConfiguration(jobConfig.getSourceResourceName(),
createTableConfig, dumperConfig, importerConfig);