This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 23775c88db6 Add sharding columns in SQL condition for migration
incremental task (#21928)
23775c88db6 is described below
commit 23775c88db62d9cb00aec3f81b04fa04c6029372
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Nov 3 16:47:05 2022 +0800
Add sharding columns in SQL condition for migration incremental task
(#21928)
* Use tables rule for t_order in IT
* Fix ShardingSphereSQLException missed exception message
* Update table rule DistSQL
* Fix sharding algorithm "inline" case-sensitive
* Print exception
* Change sharding column; Change user_id generator
* Add ShardingColumnsExtractor and impl
* Use shardingColumnsMap for migration job
---
.../pipeline/ShardingColumnsExtractorImpl.java | 81 ++++++++++++++++++++++
....pipeline.spi.sharding.ShardingColumnsExtractor | 18 +++++
.../external/sql/ShardingSphereSQLException.java | 7 +-
.../ShardingSpherePipelineDataSourceCreator.java | 2 +-
.../spi/sharding/ShardingColumnsExtractor.java | 41 +++++++++++
.../sharding/ShardingColumnsExtractorFactory.java | 37 +++++-----
...RC32MatchDataConsistencyCalculateAlgorithm.java | 1 +
...DataMatchDataConsistencyCalculateAlgorithm.java | 1 +
.../scenario/migration/MigrationJobAPIImpl.java | 6 +-
.../migration/general/MySQLMigrationGeneralIT.java | 14 ++--
.../general/PostgreSQLMigrationGeneralIT.java | 13 ++--
.../framework/helper/ScalingCaseHelper.java | 6 +-
.../util/AutoIncrementKeyGenerateAlgorithm.java | 6 +-
.../resources/env/common/migration-command.xml | 4 +-
.../ShardingColumnsExtractorFactoryTest.java} | 28 +++-----
15 files changed, 200 insertions(+), 65 deletions(-)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
new file mode 100644
index 00000000000..0711d86e71f
--- /dev/null
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/data/pipeline/ShardingColumnsExtractorImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.sharding.data.pipeline;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
+import
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
+import
org.apache.shardingsphere.sharding.api.config.rule.ShardingTableRuleConfiguration;
+import
org.apache.shardingsphere.sharding.api.config.strategy.sharding.ComplexShardingStrategyConfiguration;
+import
org.apache.shardingsphere.sharding.api.config.strategy.sharding.ShardingStrategyConfiguration;
+import
org.apache.shardingsphere.sharding.api.config.strategy.sharding.StandardShardingStrategyConfiguration;
+import
org.apache.shardingsphere.sharding.yaml.swapper.ShardingRuleConfigurationConverter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Sharding columns extractor implementation.
+ */
+public final class ShardingColumnsExtractorImpl implements
ShardingColumnsExtractor {
+
+ @Override
+ public Map<LogicTableName, Set<String>> getShardingColumnsMap(final
Collection<YamlRuleConfiguration> yamlRuleConfigs, final Set<LogicTableName>
logicTableNames) {
+ ShardingRuleConfiguration shardingRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
+ Set<String> defaultDatabaseShardingColumns =
extractShardingColumns(shardingRuleConfig.getDefaultDatabaseShardingStrategy());
+ Set<String> defaultTableShardingColumns =
extractShardingColumns(shardingRuleConfig.getDefaultTableShardingStrategy());
+ Map<LogicTableName, Set<String>> result = new ConcurrentHashMap<>();
+ for (ShardingTableRuleConfiguration each :
shardingRuleConfig.getTables()) {
+ LogicTableName logicTableName = new
LogicTableName(each.getLogicTable());
+ if (!logicTableNames.contains(logicTableName)) {
+ continue;
+ }
+ Set<String> shardingColumns = new HashSet<>();
+ shardingColumns.addAll(null == each.getDatabaseShardingStrategy()
? defaultDatabaseShardingColumns :
extractShardingColumns(each.getDatabaseShardingStrategy()));
+ shardingColumns.addAll(null == each.getTableShardingStrategy() ?
defaultTableShardingColumns :
extractShardingColumns(each.getTableShardingStrategy()));
+ result.put(logicTableName, shardingColumns);
+ }
+ for (ShardingAutoTableRuleConfiguration each :
shardingRuleConfig.getAutoTables()) {
+ LogicTableName logicTableName = new
LogicTableName(each.getLogicTable());
+ if (!logicTableNames.contains(logicTableName)) {
+ continue;
+ }
+ ShardingStrategyConfiguration shardingStrategy =
each.getShardingStrategy();
+ Set<String> shardingColumns = new
HashSet<>(extractShardingColumns(shardingStrategy));
+ result.put(logicTableName, shardingColumns);
+ }
+ return result;
+ }
+
+ private Set<String> extractShardingColumns(final
ShardingStrategyConfiguration shardingStrategy) {
+ if (shardingStrategy instanceof StandardShardingStrategyConfiguration)
{
+ return new
HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration)
shardingStrategy).getShardingColumn()));
+ }
+ if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
+ return new
HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration)
shardingStrategy).getShardingColumns().split(",")));
+ }
+ return Collections.emptySet();
+ }
+}
diff --git
a/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
new file mode 100644
index 00000000000..c52c2307e57
--- /dev/null
+++
b/features/sharding/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractor
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.sharding.data.pipeline.ShardingColumnsExtractorImpl
diff --git
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
index 76c412c2224..80ae0ee691c 100644
---
a/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
+++
b/infra/util/src/main/java/org/apache/shardingsphere/infra/util/exception/external/sql/ShardingSphereSQLException.java
@@ -40,9 +40,14 @@ public abstract class ShardingSphereSQLException extends
ShardingSphereExternalE
}
public ShardingSphereSQLException(final String sqlState, final int
typeOffset, final int errorCode, final String reason, final Object...
messageArguments) {
+ this(sqlState, typeOffset, errorCode, null == reason ? null :
String.format(reason, messageArguments));
+ }
+
+ private ShardingSphereSQLException(final String sqlState, final int
typeOffset, final int errorCode, final String reason) {
+ super(reason);
this.sqlState = sqlState;
vendorCode = typeOffset * 10000 + errorCode;
- this.reason = null == reason ? null : String.format(reason,
messageArguments);
+ this.reason = reason;
}
/**
diff --git
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index 5ae9d20b1b0..908b5460d2f 100644
---
a/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++
b/jdbc/core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -59,7 +59,7 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
private void enableRangeQueryForInline(final YamlShardingRuleConfiguration
shardingRuleConfig) {
for (YamlAlgorithmConfiguration each :
shardingRuleConfig.getShardingAlgorithms().values()) {
- if ("INLINE".equals(each.getType())) {
+ if ("INLINE".equalsIgnoreCase(each.getType())) {
each.getProps().put("allow-range-query-with-inline-sharding",
Boolean.TRUE.toString());
}
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
new file mode 100644
index 00000000000..353bd5e1b9f
--- /dev/null
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
+
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
+import org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPI;
+import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Sharding columns extractor.
+ */
+public interface ShardingColumnsExtractor extends RequiredSPI {
+
+ /**
+ * Get sharding columns map.
+ *
+ * @param yamlRuleConfigs YAML rule configurations
+ * @param logicTableNames logic table names
+ * @return sharding columns map
+ */
+ Map<LogicTableName, Set<String>>
getShardingColumnsMap(Collection<YamlRuleConfiguration> yamlRuleConfigs,
Set<LogicTableName> logicTableNames);
+}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
similarity index 51%
copy from
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
index 9fba5b1d037..882bffd8595 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactory.java
@@ -15,28 +15,29 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.integration.data.pipeline.util;
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import org.apache.shardingsphere.infra.util.spi.ShardingSphereServiceLoader;
+import
org.apache.shardingsphere.infra.util.spi.type.required.RequiredSPIRegistry;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-public final class AutoIncrementKeyGenerateAlgorithm implements
KeyGenerateAlgorithm {
-
- private final AtomicLong idGen = new AtomicLong(1);
-
- @Override
- public Comparable<?> generateKey() {
- return idGen.getAndIncrement();
- }
+/**
+ * Sharding columns extractor factory.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public final class ShardingColumnsExtractorFactory {
- @Override
- public Properties getProps() {
- return null;
+ static {
+ ShardingSphereServiceLoader.register(ShardingColumnsExtractor.class);
}
- @Override
- public void init(final Properties props) {
+ /**
+ * Get sharding columns extractor instance.
+ *
+ * @return sharding columns extractor
+ */
+ public static ShardingColumnsExtractor getInstance() {
+ return
RequiredSPIRegistry.getRegisteredService(ShardingColumnsExtractor.class);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
index 493e006f77a..1026be07b83 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/CRC32MatchDataConsistencyCalculateAlgorithm.java
@@ -82,6 +82,7 @@ public final class
CRC32MatchDataConsistencyCalculateAlgorithm extends AbstractD
int recordsCount = resultSet.getInt(2);
return new CalculatedItem(crc32, recordsCount);
} catch (final SQLException ex) {
+ log.error("calculateCRC32 failed, tableName={}", logicTableName,
ex);
throw new
PipelineTableDataConsistencyCheckLoadingFailedException(logicTableName);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index 1415cee9d0c..1875a55f58a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -122,6 +122,7 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
}
return records.isEmpty() ? Optional.empty() : Optional.of(new
CalculatedResult(maxUniqueKeyValue, records.size(), records));
} catch (final SQLException ex) {
+ log.error("calculateChunk failed, schemaName={}, tableName={}",
parameter.getSchemaName(), parameter.getLogicTableName(), ex);
throw new
PipelineTableDataConsistencyCheckLoadingFailedException(parameter.getLogicTableName());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index 0e5b65d7b84..d20f547f4c1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -64,6 +64,7 @@ import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipe
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobAPIFactory;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.spi.sharding.ShardingColumnsExtractorFactory;
import
org.apache.shardingsphere.data.pipeline.spi.sqlbuilder.PipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
@@ -188,8 +189,9 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(tableNameSchemaMap);
CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig);
DumperConfiguration dumperConfig =
buildDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
- // TODO now shardingColumnsMap always empty,
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
Collections.emptyMap(), tableNameSchemaNameMapping);
+ Map<LogicTableName, Set<String>> shardingColumnsMap =
ShardingColumnsExtractorFactory.getInstance().getShardingColumnsMap(
+ ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), Collections.singleton(new
LogicTableName(jobConfig.getTargetTableName())));
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
shardingColumnsMap, tableNameSchemaNameMapping);
MigrationTaskConfiguration result = new
MigrationTaskConfiguration(jobConfig.getSourceResourceName(),
createTableConfig, dumperConfig, importerConfig);
log.info("buildTaskConfiguration, sourceResourceName={}, result={}",
jobConfig.getSourceResourceName(), result);
return result;
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
index 6cb495c3062..401fae91a1f 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/MySQLMigrationGeneralIT.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEn
import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -87,7 +86,7 @@ public final class MySQLMigrationGeneralIT extends
AbstractMigrationITCase {
createTargetOrderTableRule();
createTargetOrderTableEncryptRule();
createTargetOrderItemTableRule();
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
+ AutoIncrementKeyGenerateAlgorithm keyGenerateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
JdbcTemplate jdbcTemplate = new JdbcTemplate(getSourceDataSource());
Pair<List<Object[]>, List<Object[]>> dataPair =
ScalingCaseHelper.generateFullInsertData(keyGenerateAlgorithm,
parameterized.getDatabaseType(), 3000);
log.info("init data begin: {}", LocalDateTime.now());
@@ -96,19 +95,18 @@ public final class MySQLMigrationGeneralIT extends
AbstractMigrationITCase {
log.info("init data end: {}", LocalDateTime.now());
startMigration(getSourceTableOrderName(), getTargetTableOrderName());
startMigration("t_order_item", "t_order_item");
+ // TODO wait preparation done (binlog position got), else
insert/update/delete might be consumed in inventory task
startIncrementTask(new MySQLIncrementTask(jdbcTemplate,
getSourceTableOrderName(), keyGenerateAlgorithm, 30));
String orderJobId = getJobIdByTableName(getSourceTableOrderName());
String orderItemJobId = getJobIdByTableName("t_order_item");
assertMigrationSuccessById(orderJobId, "DATA_MATCH");
assertMigrationSuccessById(orderItemJobId, "DATA_MATCH");
assertMigrationSuccessById(orderItemJobId, "CRC32_MATCH");
- if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
- for (String each : listJobId()) {
- commitMigrationByJobId(each);
- }
- List<String> lastJobIds = listJobId();
- assertThat(lastJobIds.size(), is(0));
+ for (String each : listJobId()) {
+ commitMigrationByJobId(each);
}
+ List<String> lastJobIds = listJobId();
+ assertThat(lastJobIds.size(), is(0));
assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
index 34fe2e7816e..f4ab9cada7d 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralIT.java
@@ -27,7 +27,6 @@ import
org.apache.shardingsphere.integration.data.pipeline.env.enums.ITEnvTypeEn
import
org.apache.shardingsphere.integration.data.pipeline.framework.helper.ScalingCaseHelper;
import
org.apache.shardingsphere.integration.data.pipeline.framework.param.ScalingParameterized;
import
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -50,7 +49,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Slf4j
public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase {
- private static final KeyGenerateAlgorithm KEY_GENERATE_ALGORITHM = new
AutoIncrementKeyGenerateAlgorithm();
+ private static final AutoIncrementKeyGenerateAlgorithm
KEY_GENERATE_ALGORITHM = new AutoIncrementKeyGenerateAlgorithm();
private final ScalingParameterized parameterized;
@@ -100,13 +99,11 @@ public final class PostgreSQLMigrationGeneralIT extends
AbstractMigrationITCase
log.info("init data end: {}", LocalDateTime.now());
checkOrderMigration(jdbcTemplate);
checkOrderItemMigration();
- if (ENV.getItEnvType() == ITEnvTypeEnum.DOCKER) {
- for (String each : listJobId()) {
- commitMigrationByJobId(each);
- }
- List<String> lastJobIds = listJobId();
- assertThat(lastJobIds.size(), is(0));
+ for (String each : listJobId()) {
+ commitMigrationByJobId(each);
}
+ List<String> lastJobIds = listJobId();
+ assertThat(lastJobIds.size(), is(0));
assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, SCHEMA_NAME);
}
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
index fd8ed623eb7..8742fc131a7 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/framework/helper/ScalingCaseHelper.java
@@ -21,8 +21,8 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.integration.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import java.math.BigDecimal;
import java.sql.Timestamp;
@@ -57,7 +57,7 @@ public final class ScalingCaseHelper {
* @param insertRows insert rows
* @return insert data list
*/
- public static Pair<List<Object[]>, List<Object[]>>
generateFullInsertData(final KeyGenerateAlgorithm orderIdGenerate, final
DatabaseType databaseType, final int insertRows) {
+ public static Pair<List<Object[]>, List<Object[]>>
generateFullInsertData(final AutoIncrementKeyGenerateAlgorithm orderIdGenerate,
final DatabaseType databaseType, final int insertRows) {
if (insertRows < 0) {
return Pair.of(null, null);
}
@@ -65,7 +65,7 @@ public final class ScalingCaseHelper {
List<Object[]> orderItemData = new ArrayList<>(insertRows);
for (int i = 0; i < insertRows; i++) {
Comparable<?> orderId = orderIdGenerate.generateKey();
- int userId = generateInt(0, 6);
+ int userId = orderIdGenerate.generateKey();
LocalDateTime now = LocalDateTime.now();
int randomInt = generateInt(-100, 100);
int randomUnsignedInt = generateInt(0, 100);
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
index 9fba5b1d037..00011b10ff4 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++
b/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
@@ -20,14 +20,14 @@ package
org.apache.shardingsphere.integration.data.pipeline.util;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
public final class AutoIncrementKeyGenerateAlgorithm implements
KeyGenerateAlgorithm {
- private final AtomicLong idGen = new AtomicLong(1);
+ private final AtomicInteger idGen = new AtomicInteger(1);
@Override
- public Comparable<?> generateKey() {
+ public Integer generateKey() {
return idGen.getAndIncrement();
}
diff --git
a/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
b/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
index 11a0fa76b34..17757c96b60 100644
---
a/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
+++
b/test/integration-test/scaling/src/test/resources/env/common/migration-command.xml
@@ -66,10 +66,10 @@
<create-target-order-table-rule>
CREATE SHARDING TABLE RULE t_order(
STORAGE_UNITS(ds_2,ds_3,ds_4),
- SHARDING_COLUMN=order_id,
+ SHARDING_COLUMN=user_id,
TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6")),
KEY_GENERATE_STRATEGY(COLUMN=order_id,TYPE(NAME="snowflake"))
- )
+ );
</create-target-order-table-rule>
<create-target-order-item-table-rule>
diff --git
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
similarity index 57%
copy from
test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
copy to
test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
index 9fba5b1d037..4532c7a84c7 100644
---
a/test/integration-test/scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/spi/sharding/ShardingColumnsExtractorFactoryTest.java
@@ -15,28 +15,18 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.integration.data.pipeline.util;
+package org.apache.shardingsphere.data.pipeline.spi.sharding;
-import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
+import
org.apache.shardingsphere.sharding.data.pipeline.ShardingColumnsExtractorImpl;
+import org.junit.Test;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.MatcherAssert.assertThat;
-public final class AutoIncrementKeyGenerateAlgorithm implements
KeyGenerateAlgorithm {
+public final class ShardingColumnsExtractorFactoryTest {
- private final AtomicLong idGen = new AtomicLong(1);
-
- @Override
- public Comparable<?> generateKey() {
- return idGen.getAndIncrement();
- }
-
- @Override
- public Properties getProps() {
- return null;
- }
-
- @Override
- public void init(final Properties props) {
+ @Test
+ public void assertGetInstance() {
+ assertThat(ShardingColumnsExtractorFactory.getInstance(),
instanceOf(ShardingColumnsExtractorImpl.class));
}
}