This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new c2989a4ef6d Refactor ShardingColumnsExtractor from SPI to class 
(#23744)
c2989a4ef6d is described below

commit c2989a4ef6d53e317202a6d41793f9aea817c1d9
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Jan 28 11:08:18 2023 +0800

    Refactor ShardingColumnsExtractor from SPI to class (#23744)
---
 ....pipeline.spi.sharding.ShardingColumnsExtractor | 18 ----------
 .../spi/sharding/ShardingColumnsExtractor.java     | 41 ----------------------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  5 ++-
 kernel/data-pipeline/core/pom.xml                  |  5 +++
 ...YamlJobItemIncrementalTasksProgressSwapper.java |  2 +-
 .../core/sharding/ShardingColumnsExtractor.java    | 20 +++++------
 .../migration/api/impl/MigrationJobAPI.java        |  4 +--
 7 files changed, 20 insertions(+), 75 deletions(-)

diff --git 
a/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
 
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
deleted file mode 100644
index c52c2307e57..00000000000
--- 
a/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
- 
-org.apache.shardingsphere.sharding.data.pipeline.ShardingColumnsExtractorImpl
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
deleted file mode 100644
index 1556c89e0b8..00000000000
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.data.pipeline.spi.sharding;
-
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPI;
-import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Sharding columns extractor.
- */
-public interface ShardingColumnsExtractor extends TypedSPI {
-    
-    /**
-     * Get sharding columns map.
-     *
-     * @param yamlRuleConfigs YAML rule configurations
-     * @param logicTableNames logic table names
-     * @return sharding columns map
-     */
-    Map<LogicTableName, Set<String>> 
getShardingColumnsMap(Collection<YamlRuleConfiguration> yamlRuleConfigs, 
Set<LogicTableName> logicTableNames);
-}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index b8980368373..4aa5b30e55a 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -56,15 +56,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncremental
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobCreationWithInvalidShardingCountException;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.util.JobDataNodeLineConvertUtil;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.infra.datasource.props.DataSourcePropertiesCreator;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
-import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
@@ -205,7 +204,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
         JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        Map<LogicTableName, Set<String>> shardingColumnsMap = 
TypedSPILoader.getService(ShardingColumnsExtractor.class, "Sharding")
+        Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
                 
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
logicalTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
         return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
diff --git a/kernel/data-pipeline/core/pom.xml 
b/kernel/data-pipeline/core/pom.xml
index 3b6625a4826..ff39d34cf12 100644
--- a/kernel/data-pipeline/core/pom.xml
+++ b/kernel/data-pipeline/core/pom.xml
@@ -53,6 +53,11 @@
             <artifactId>shardingsphere-parser-core</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.shardingsphere</groupId>
+            <artifactId>shardingsphere-sharding-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!-- TODO remove cluster mode dependency after 3 modes have equivalent 
features -->
         <dependency>
             <groupId>org.apache.shardingsphere</groupId>
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
index b8f77601a19..26536ec0e51 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobItemIncrementalTasksProgressSwapper.java
@@ -59,7 +59,7 @@ public final class YamlJobItemIncrementalTasksProgressSwapper 
{
             return new JobItemIncrementalTasksProgress(null);
         }
         IncrementalTaskProgress taskProgress = new IncrementalTaskProgress();
-        // TODO databaseType
+        // TODO consider to remove parameter databaseType
         PositionInitializer positionInitializer = 
TypedSPILoader.getService(PositionInitializer.class, databaseType);
         
taskProgress.setPosition(positionInitializer.init(yamlProgress.getPosition()));
         taskProgress.setIncrementalTaskDelay(yamlProgress.getDelay());
diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sharding/ShardingColumnsExtractor.java
similarity index 92%
rename from 
features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sharding/ShardingColumnsExtractor.java
index ef055c4f6f6..1e28d26ea3d 100644
--- 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sharding/ShardingColumnsExtractor.java
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.sharding.data.pipeline;
+package org.apache.shardingsphere.data.pipeline.core.sharding;
 
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
 import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
 import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
@@ -37,11 +36,17 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Sharding columns extractor implementation.
+ * Sharding columns extractor.
  */
-public final class ShardingColumnsExtractorImpl implements 
ShardingColumnsExtractor {
+public final class ShardingColumnsExtractor {
     
-    @Override
+    /**
+     * Get sharding columns map.
+     *
+     * @param yamlRuleConfigs YAML rule configurations
+     * @param logicTableNames logic table names
+     * @return sharding columns map
+     */
     public Map<LogicTableName, Set<String>> getShardingColumnsMap(final 
Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<LogicTableName> 
logicTableNames) {
         ShardingRuleConfiguration shardingRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
         Set<String> defaultDatabaseShardingColumns = 
extractShardingColumns(shardingRuleConfig.getDefaultDatabaseShardingStrategy());
@@ -78,9 +83,4 @@ public final class ShardingColumnsExtractorImpl implements 
ShardingColumnsExtrac
         }
         return Collections.emptySet();
     }
-    
-    @Override
-    public String getType() {
-        return "Sharding";
-    }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 636a0e29f3f..47fdd215e20 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -62,6 +62,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.connection.Unregis
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineSchemaUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataUtil;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.sharding.ShardingColumnsExtractor;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
@@ -70,7 +71,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobType;
 import org.apache.shardingsphere.data.pipeline.spi.job.JobTypeFactory;
-import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -195,7 +195,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(tableNameSchemaMap);
         CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig);
         DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig.getJobId(), 
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, 
tableNameSchemaNameMapping);
-        Map<LogicTableName, Set<String>> shardingColumnsMap = 
TypedSPILoader.getService(ShardingColumnsExtractor.class, 
"Sharding").getShardingColumnsMap(
+        Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new 
LogicTableName(jobConfig.getTargetTableName())));
         ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
         return new 
MigrationTaskConfiguration(jobConfig.getSourceResourceName(), 
createTableConfig, dumperConfig, importerConfig);

Reply via email to