This is an automated email from the ASF dual-hosted git repository.

yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 23775c88db6 Add sharding columns in SQL condition for migration 
incremental task (#21928)
23775c88db6 is described below

commit 23775c88db62d9cb00aec3f81b04fa04c6029372
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Nov 3 16:47:05 2022 +0800

    Add sharding columns in SQL condition for migration incremental task 
(#21928)
    
    * Use tables rule for t_order in IT
    
    * Fix ShardingSphereSQLException missed exception message
    
    * Update table rule DistSQL
    
    * Fix sharding algorithm "inline" case-sensitive
    
    * Print exception
    
    * Change sharding column; Change user_id generator
    
    * Add ShardingColumnsExtractor and impl
    
    * Use shardingColumnsMap for migration job
---
 .../pipeline/ShardingColumnsExtractorImpl.java     | 81 ++++++++++++++++++++++
 ....pipeline.spi.sharding.ShardingColumnsExtractor | 18 +++++
 .../external/sql/ShardingSphereSQLException.java   |  7 +-
 .../ShardingSpherePipelineDataSourceCreator.java   |  2 +-
 .../spi/sharding/ShardingColumnsExtractor.java     | 41 +++++++++++
 .../sharding/ShardingColumnsExtractorFactory.java  | 37 +++++-----
 ...RC32MatchDataConsistencyCalculateAlgorithm.java |  1 +
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  1 +
 .../scenario/migration/MigrationJobAPIImpl.java    |  6 +-
 .../migration/general/MySQLMigrationGeneralIT.java | 14 ++--
 .../general/PostgreSQLMigrationGeneralIT.java      | 13 ++--
 .../framework/helper/ScalingCaseHelper.java        |  6 +-
 .../util/AutoIncrementKeyGenerateAlgorithm.java    |  6 +-
 .../resources/env/common/migration-command.xml     |  4 +-
 .../ShardingColumnsExtractorFactoryTest.java}      | 28 +++-----
 15 files changed, 200 insertions(+), 65 deletions(-)

diff --git 
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
new file mode 100644
index 00000000000..0711d86e71f
--- /dev/null
+++ 
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.data.pipeline;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
+import 
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+import 
org.apache.shardingsphere.sharding.api.config.strategy.sharding.ComplexShardingStrategyConfiguration;
+import 
org.apache.shardingsphere.sharding.api.config.strategy.sharding.ShardingStrategyConfiguration;
+import 
org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
+import 
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Sharding columns extractor implementation.
+ */
+public final class ShardingColumnsExtractorImpl implements 
ShardingColumnsExtractor {
+    
+    @Override
+    public Map<LogicTableName, Set<String>> getShardingColumnsMap(final 
Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<LogicTableName> 
logicTableNames) {
+        ShardingRuleConfiguration shardingRuleConfig = 
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
+        Set<String> defaultDatabaseShardingColumns = 
extractShardingColumns(shardingRuleConfig.getDefaultDatabaseShardingStrategy());
+        Set<String> defaultTableShardingColumns = 
extractShardingColumns(shardingRuleConfig.getDefaultTableShardingStrategy());
+        Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
+        for (ShardingTableRuleConfiguration each : 
shardingRuleConfig.getTables()) {
+            LogicTableName logicTableName = new 
LogicTableName(each.getLogicTable());
+            if (!logicTableNames.contains(logicTableName)) {
+                continue;
+            }
+            Set<String> shardingColumns = new HashSet<>();
+            shardingColumns.addAll(null == each.getDatabaseShardingStrategy() 
? defaultDatabaseShardingColumns : 
extractShardingColumns(each.getDatabaseShardingStrategy()));
+            shardingColumns.addAll(null == each.getTableShardingStrategy() ? 
defaultTableShardingColumns : 
extractShardingColumns(each.getTableShardingStrategy()));
+            result.put(logicTableName, shardingColumns);
+        }
+        for (ShardingAutoTableRuleConfiguration each : 
shardingRuleConfig.getAutoTables()) {
+            LogicTableName logicTableName = new 
LogicTableName(each.getLogicTable());
+            if (!logicTableNames.contains(logicTableName)) {
+                continue;
+            }
+            ShardingStrategyConfiguration shardingStrategy = 
each.getShardingStrategy();
+            Set<String> shardingColumns = new 
HashSet<>(extractShardingColumns(shardingStrategy));
+            result.put(logicTableName, shardingColumns);
+        }
+        return result;
+    }
+    
+    private Set<String> extractShardingColumns(final 
ShardingStrategyConfiguration shardingStrategy) {
+        if (shardingStrategy instanceof StandardShardingStrategyConfiguration) 
{
+            return new 
HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration) 
shardingStrategy).getShardingColumn()));
+        }
+        if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
+            return new 
HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration) 
shardingStrategy).getShardingColumns().split(",")));
+        }
+        return Collections.emptySet();
+    }
+}
diff --git 
a/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
 
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
new file mode 100644
index 00000000000..c52c2307e57
--- /dev/null
+++ 
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ 
+org.apache.shardingsphere.sharding.data.pipeline.ShardingColumnsExtractorImpl
diff --git 
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
 
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
index 76c412c2224..80ae0ee691c 100644
--- 
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
+++ 
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
@@ -40,9 +40,14 @@ public abstract class ShardingSphereSQLException extends 
ShardingSphereExternalE
     }
     
     public ShardingSphereSQLException(final String sqlState, final int 
typeOffset, final int errorCode, final String reason, final Object... 
messageArguments) {
+        this(sqlState, typeOffset, errorCode, null == reason ? null : 
String.format(reason, messageArguments));
+    }
+    
+    private ShardingSphereSQLException(final String sqlState, final int 
typeOffset, final int errorCode, final String reason) {
+        super(reason);
         this.sqlState = sqlState;
         vendorCode = typeOffset * 10000 + errorCode;
-        this.reason = null == reason ? null : String.format(reason, 
messageArguments);
+        this.reason = reason;
     }
     
     /**
diff --git 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 5ae9d20b1b0..908b5460d2f 100644
--- 
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -59,7 +59,7 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
     
     private void enableRangeQueryForInline(final YamlShardingRuleConfiguration 
shardingRuleConfig) {
         for (YamlAlgorithmConfiguration each : 
shardingRuleConfig.getShardingAlgorithms().values()) {
-            if ("INLINE".equals(each.getType())) {
+            if ("INLINE".equalsIgnoreCase(each.getType())) {
                 each.getProps().put("allow-range-query-with-inline-sharding", 
Boolean.TRUE.toString());
             }
         }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
new file mode 100644
index 00000000000..353bd5e1b9f
--- /dev/null
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+import 
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Sharding columns extractor.
+ */
+public interface ShardingColumnsExtractor extends RequiredSPI {
+    
+    /**
+     * Get sharding columns map.
+     *
+     * @param yamlRuleConfigs YAML rule configurations
+     * @param logicTableNames logic table names
+     * @return sharding columns map
+     */
+    Map<LogicTableName, Set<String>> 
getShardingColumnsMap(Collection<YamlRuleConfiguration> yamlRuleConfigs, 
Set<LogicTableName> logicTableNames);
+}
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
similarity index 51%
copy from 
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
copy to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
index 9fba5b1d037..882bffd8595 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
@@ -15,28 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.util;
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
 
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import 
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
 
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-public final class AutoIncrementKeyGenerateAlgorithm implements 
KeyGenerateAlgorithm {
-    
-    private final AtomicLong idGen = new AtomicLong(1);
-    
-    @Override
-    public Comparable<?> generateKey() {
-        return idGen.getAndIncrement();
-    }
+/**
+ * Sharding columns extractor factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingColumnsExtractorFactory {
     
-    @Override
-    public Properties getProps() {
-        return null;
+    static {
+        ShardingSphereServiceLoader.register(ShardingColumnsExtractor.class);
     }
     
-    @Override
-    public void init(final Properties props) {
+    /**
+     * Get sharding columns extractor instance.
+     *
+     * @return sharding columns extractor
+     */
+    public static ShardingColumnsExtractor getInstance() {
+        return 
RequiredSPIRegistry.getRegisteredService(ShardingColumnsExtractor.class);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 493e006f77a..1026be07b83 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -82,6 +82,7 @@ public final class 
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
             int recordsCount = resultSet.getInt(2);
             return new CalculatedItem(crc32, recordsCount);
         } catch (final SQLException ex) {
+            log.error("calculateCRC32 failed, tableName={}", logicTableName, 
ex);
             throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 1415cee9d0c..1875a55f58a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -122,6 +122,7 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
             }
             return records.isEmpty() ? Optional.empty() : Optional.of(new 
CalculatedResult(maxUniqueKeyValue, records.size(), records));
         } catch (final SQLException ex) {
+            log.error("calculateChunk failed, schemaName={}, tableName={}", 
parameter.getSchemaName(), parameter.getLogicTableName(), ex);
             throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
         }
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 0e5b65d7b84..d20f547f4c1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -64,6 +64,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipe
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
 import 
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -188,8 +189,9 @@ public final class MigrationJobAPIImpl extends 
AbstractInventoryIncrementalJobAP
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(tableNameSchemaMap);
         CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig);
         DumperConfiguration dumperConfig = 
buildDumperConfiguration(jobConfig.getJobId(), 
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, 
tableNameSchemaNameMapping);
-        // TODO now shardingColumnsMap always empty,
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
Collections.emptyMap(), tableNameSchemaNameMapping);
+        Map<LogicTableName, Set<String>> shardingColumnsMap = 
ShardingColumnsExtractorFactory.getInstance().getShardingColumnsMap(
+                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new 
LogicTableName(jobConfig.getTargetTableName())));
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, tableNameSchemaNameMapping);
         MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(jobConfig.getSourceResourceName(), 
createTableConfig, dumperConfig, importerConfig);
         log.info("buildTaskConfiguration, sourceResourceName={}, result={}", 
jobConfig.getSourceResourceName(), result);
         return result;
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 6cb495c3062..401fae91a1f 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEn
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -87,7 +86,7 @@ public final class MySQLMigrationGeneralIT extends 
AbstractMigrationITCase {
         createTargetOrderTableRule();
         createTargetOrderTableEncryptRule();
         createTargetOrderItemTableRule();
-        KeyGenerateAlgorithm keyGenerateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
+        AutoIncrementKeyGenerateAlgorithm keyGenerateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
         JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
         Pair<List<Object[]>, List<Object[]>> dataPair = 
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm, 
parameterized.getDatabaseType(), 3000);
         log.info("init data begin: {}", LocalDateTime.now());
@@ -96,19 +95,18 @@ public final class MySQLMigrationGeneralIT extends 
AbstractMigrationITCase {
         log.info("init data end: {}", LocalDateTime.now());
         startMigration(getSourceTableOrderName(), getTargetTableOrderName());
         startMigration("t_order_item", "t_order_item");
+        // TODO wait preparation done (binlog position got), else 
insert/update/delete might be consumed in inventory task
         startIncrementTask(new MySQLIncrementTask(jdbcTemplate, 
getSourceTableOrderName(), keyGenerateAlgorithm, 30));
         String orderJobId = getJobIdByTableName(getSourceTableOrderName());
         String orderItemJobId = getJobIdByTableName("t_order_item");
         assertMigrationSuccessById(orderJobId, "DATA_MATCH");
         assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
         assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
-        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
-            for (String each : listJobId()) {
-                commitMigrationByJobId(each);
-            }
-            List<String> lastJobIds = listJobId();
-            assertThat(lastJobIds.size(), is(0));
+        for (String each : listJobId()) {
+            commitMigrationByJobId(each);
         }
+        List<String> lastJobIds = listJobId();
+        assertThat(lastJobIds.size(), is(0));
         assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
     }
     
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 34fe2e7816e..f4ab9cada7d 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -27,7 +27,6 @@ import 
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEn
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
 import 
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
 import 
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -50,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 @Slf4j
 public final class PostgreSQLMigrationGeneralIT extends 
AbstractMigrationITCase {
     
-    private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new 
AutoIncrementKeyGenerateAlgorithm();
+    private static final AutoIncrementKeyGenerateAlgorithm 
KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
     
     private final ScalingParameterized parameterized;
     
@@ -100,13 +99,11 @@ public final class PostgreSQLMigrationGeneralIT extends 
AbstractMigrationITCase
         log.info("init data end: {}", LocalDateTime.now());
         checkOrderMigration(jdbcTemplate);
         checkOrderItemMigration();
-        if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
-            for (String each : listJobId()) {
-                commitMigrationByJobId(each);
-            }
-            List<String> lastJobIds = listJobId();
-            assertThat(lastJobIds.size(), is(0));
+        for (String each : listJobId()) {
+            commitMigrationByJobId(each);
         }
+        List<String> lastJobIds = listJobId();
+        assertThat(lastJobIds.size(), is(0));
         assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, SCHEMA_NAME);
     }
     
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index fd8ed623eb7..8742fc131a7 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -21,8 +21,8 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -57,7 +57,7 @@ public final class ScalingCaseHelper {
      * @param insertRows insert rows
      * @return insert data list
      */
-    public static Pair<List<Object[]>, List<Object[]>> 
generateFullInsertData(final KeyGenerateAlgorithm orderIdGenerate, final 
DatabaseType databaseType, final int insertRows) {
+    public static Pair<List<Object[]>, List<Object[]>> 
generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate, 
final DatabaseType databaseType, final int insertRows) {
         if (insertRows < 0) {
             return Pair.of(null, null);
         }
@@ -65,7 +65,7 @@ public final class ScalingCaseHelper {
         List<Object[]> orderItemData = new ArrayList<>(insertRows);
         for (int i = 0; i < insertRows; i++) {
             Comparable<?> orderId = orderIdGenerate.generateKey();
-            int userId = generateInt(0, 6);
+            int userId = orderIdGenerate.generateKey();
             LocalDateTime now = LocalDateTime.now();
             int randomInt = generateInt(-100, 100);
             int randomUnsignedInt = generateInt(0, 100);
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
index 9fba5b1d037..00011b10ff4 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++ 
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
@@ -20,14 +20,14 @@ package 
org.apache.shardingsphere.integration.data.pipeline.util;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public final class AutoIncrementKeyGenerateAlgorithm implements 
KeyGenerateAlgorithm {
     
-    private final AtomicLong idGen = new AtomicLong(1);
+    private final AtomicInteger idGen = new AtomicInteger(1);
     
     @Override
-    public Comparable<?> generateKey() {
+    public Integer generateKey() {
         return idGen.getAndIncrement();
     }
     
diff --git 
a/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
 
b/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
index 11a0fa76b34..17757c96b60 100644
--- 
a/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
+++ 
b/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
@@ -66,10 +66,10 @@
     <create-target-order-table-rule>
         CREATE SHARDING TABLE RULE t_order(
         STORAGE_UNITS(ds_2,ds_3,ds_4),
-        SHARDING_COLUMN=order_id,
+        SHARDING_COLUMN=user_id,
         TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
         KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
-        )
+        );
     </create-target-order-table-rule>
     
     <create-target-order-item-table-rule>
diff --git 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
similarity index 57%
copy from 
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
copy to 
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
index 9fba5b1d037..4532c7a84c7 100644
--- 
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++ 
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
@@ -15,28 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.integration.data.pipeline.util;
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
 
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.sharding.data.pipeline.ShardingColumnsExtractorImpl;
+import org.junit.Test;
 
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
 
-public final class AutoIncrementKeyGenerateAlgorithm implements 
KeyGenerateAlgorithm {
+public final class ShardingColumnsExtractorFactoryTest {
     
-    private final AtomicLong idGen = new AtomicLong(1);
-    
-    @Override
-    public Comparable<?> generateKey() {
-        return idGen.getAndIncrement();
-    }
-    
-    @Override
-    public Properties getProps() {
-        return null;
-    }
-    
-    @Override
-    public void init(final Properties props) {
+    @Test
+    public void assertGetInstance() {
+        assertThat(ShardingColumnsExtractorFactory.getInstance(), 
instanceOf(ShardingColumnsExtractorImpl.class));
     }
 }

Reply via email to