This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 64f2ef306f0 Support changing table name at migrating (#20668)
64f2ef306f0 is described below
commit 64f2ef306f0bf583eb41245acc93a6353a229a40
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 31 17:09:52 2022 +0800
Support changing table name at migrating (#20668)
* Support changing table names when migrating
* Fix codestyle
---
.../handler/update/MigrateTableUpdater.java | 1 -
.../migration/MigrationDataConsistencyChecker.java | 37 ++++++++++++++--------
.../scenario/migration/MigrationJobAPIImpl.java | 5 +--
.../pipeline/cases/base/BaseExtraSQLITCase.java | 6 ++--
.../data/pipeline/cases/base/BaseITCase.java | 20 +++++++-----
.../pipeline/cases/command/ExtraSQLCommand.java | 6 ----
.../cases/command/MigrationDistSQLCommand.java | 9 ++++--
.../cases/general/MySQLMigrationGeneralIT.java | 6 ++--
.../general/PostgreSQLMigrationGeneralIT.java | 6 ++--
.../primarykey/TextPrimaryKeyMigrationIT.java | 2 +-
.../pipeline/cases/task/MySQLIncrementTask.java | 18 ++++-------
.../cases/task/PostgreSQLIncrementTask.java | 6 ++--
.../src/test/resources/env/common/command.xml | 12 ++++---
.../test/resources/env/scenario/general/mysql.xml | 12 ++-----
.../resources/env/scenario/general/postgresql.xml | 12 ++-----
15 files changed, 77 insertions(+), 81 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index c8658614503..23f41628a5d 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -39,7 +39,6 @@ public final class MigrateTableUpdater implements
RALUpdater<MigrateTableStateme
log.info("start migrate job by {}", sqlStatement);
String targetDatabaseName =
ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), databaseName);
Preconditions.checkNotNull(targetDatabaseName, "Target database name
is null. You could define it in DistSQL or select a database.");
-
Preconditions.checkArgument(sqlStatement.getSourceTableName().equalsIgnoreCase(sqlStatement.getTargetTableName()),
"Source table name and target table name must be the same for now.");
CreateMigrationJobParameter createMigrationJobParameter = new
CreateMigrationJobParameter(sqlStatement.getSourceResourceName(),
sqlStatement.getSourceSchemaName(),
sqlStatement.getSourceTableName(), targetDatabaseName,
sqlStatement.getTargetTableName());
JOB_API.createJobAndStart(createMigrationJobParameter);
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 9b61a39e315..dd25752b284 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -44,9 +44,11 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -69,14 +71,18 @@ public final class MigrationDataConsistencyChecker {
private final String sourceTableName;
+ private final String targetTableName;
+
private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
- this.sourceTableName = jobConfig.getSourceTableName();
- tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
Collections.singletonList(jobConfig.getSourceTableName())));
+ sourceTableName = jobConfig.getSourceTableName();
+ targetTableName = jobConfig.getTargetTableName();
+ tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
+ new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(),
jobConfig.getTargetTableName()))));
this.readRateLimitAlgorithm = readRateLimitAlgorithm;
}
@@ -107,9 +113,7 @@ public final class MigrationDataConsistencyChecker {
try (
PipelineDataSourceWrapper sourceDataSource =
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
PipelineDataSourceWrapper targetDataSource =
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
- for (String each : Collections.singletonList(sourceTableName)) {
- result.put(each, checkCount(each, sourceDataSource,
targetDataSource, executor));
- }
+ result.put(sourceTableName, checkCount(sourceDataSource,
targetDataSource, executor));
return result;
} catch (final SQLException ex) {
throw new PipelineDataConsistencyCheckFailedException("Count check
failed", ex);
@@ -119,17 +123,23 @@ public final class MigrationDataConsistencyChecker {
}
}
- private DataConsistencyCountCheckResult checkCount(final String table,
final PipelineDataSourceWrapper sourceDataSource, final
PipelineDataSourceWrapper targetDataSource,
+ private DataConsistencyCountCheckResult checkCount(final
PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper
targetDataSource,
final
ThreadPoolExecutor executor) {
+ Future<Long> sourceFuture = executor.submit(() ->
count(sourceDataSource, sourceTableName, sourceDataSource.getDatabaseType()));
+ Future<Long> targetFuture = executor.submit(() ->
count(targetDataSource, targetTableName, targetDataSource.getDatabaseType()));
+ long sourceCount;
+ long targetCount;
+ try {
+ sourceCount = sourceFuture.get();
+ } catch (final InterruptedException | ExecutionException ex) {
+ throw new
PipelineDataConsistencyCheckFailedException(String.format("Count check failed
for source table '%s'", sourceTableName), ex);
+ }
try {
- Future<Long> sourceFuture = executor.submit(() ->
count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
- Future<Long> targetFuture = executor.submit(() ->
count(targetDataSource, table, targetDataSource.getDatabaseType()));
- long sourceCount = sourceFuture.get();
- long targetCount = targetFuture.get();
- return new DataConsistencyCountCheckResult(sourceCount,
targetCount);
+ targetCount = targetFuture.get();
} catch (final InterruptedException | ExecutionException ex) {
- throw new
PipelineDataConsistencyCheckFailedException(String.format("Count check failed
for table '%s'", table), ex);
+ throw new
PipelineDataConsistencyCheckFailedException(String.format("Count check failed
for target table '%s'", targetTableName), ex);
}
+ return new DataConsistencyCountCheckResult(sourceCount, targetCount);
}
// TODO use digest (crc32, murmurhash)
@@ -171,7 +181,8 @@ public final class MigrationDataConsistencyChecker {
Collection<String> columnNames =
tableMetaData.getColumnNames();
PipelineColumnMetaData uniqueKey =
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
DataConsistencyCalculateParameter sourceParameter =
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
- DataConsistencyCalculateParameter targetParameter =
buildParameter(targetDataSource, tableNameSchemaNameMapping, each, columnNames,
targetDatabaseType, sourceDatabaseType, uniqueKey);
+ DataConsistencyCalculateParameter targetParameter =
buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName,
columnNames, targetDatabaseType, sourceDatabaseType,
+ uniqueKey);
Iterator<Object> sourceCalculatedResults =
calculator.calculate(sourceParameter).iterator();
Iterator<Object> targetCalculatedResults =
calculator.calculate(targetParameter).iterator();
boolean contentMatched = true;
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index c6826e5b62d..176440a3540 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -161,7 +161,8 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
PipelineDataSourceConfiguration targetDataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(),
config.getTarget().getParameter());
config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
}
- JobDataNodeEntry nodeEntry = new
JobDataNodeEntry(config.getSourceTableName(),
+ // target table name is logic table name, source table name is actual
table name.
+ JobDataNodeEntry nodeEntry = new
JobDataNodeEntry(config.getTargetTableName(),
Collections.singletonList(new
DataNode(config.getSourceResourceName(), config.getSourceTableName())));
String dataNodeLine = new
JobDataNodeLine(Collections.singletonList(nodeEntry)).marshal();
config.setTablesFirstDataNodes(dataNodeLine);
@@ -193,7 +194,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
public TaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
- tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()),
new LogicTableName(jobConfig.getSourceTableName()));
+ tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()),
new LogicTableName(jobConfig.getTargetTableName()));
Map<LogicTableName, String> tableNameSchemaMap =
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
Collections.singletonList(jobConfig.getTargetTableName()));
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(tableNameSchemaMap);
DumperConfiguration dumperConfig =
createDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
index f95ded78a51..0f260ebe5a2 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
@@ -44,14 +44,14 @@ public abstract class BaseExtraSQLITCase extends BaseITCase
{
protected void createSourceTableIndexList(final String schema) throws
SQLException {
if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS
idx_user_id ON %s.t_order ( user_id )", schema));
+ sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS
idx_user_id ON %s.t_order_copy ( user_id )", schema));
} else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
- sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON
%s.t_order ( user_id )", schema));
+ sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON
%s.t_order_copy ( user_id )", schema));
}
}
protected void createSourceCommentOnList(final String schema) throws
SQLException {
- sourceExecuteWithLog(String.format("COMMENT ON COLUMN
%s.t_order.user_id IS 'user id'", schema));
+ sourceExecuteWithLog(String.format("COMMENT ON COLUMN
%s.t_order_copy.user_id IS 'user id'", schema));
}
protected void createSourceOrderItemTable() throws SQLException {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index ad81f6cacd9..d7b2acea72b 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -229,19 +229,23 @@ public abstract class BaseITCase {
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
2);
}
- protected void startMigrationOrder(final boolean withSchema) throws
SQLException {
+ protected void startMigrationOrderCopy(final boolean withSchema) throws
SQLException {
if (withSchema) {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTableWithSchema(),
5);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
1);
} else {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 5);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(),
1);
}
}
+ protected void startMigrationOrder() throws SQLException {
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+ }
+
protected void startMigrationOrderItem(final boolean withSchema) throws
SQLException {
if (withSchema) {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
5);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
1);
} else {
-
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(),
5);
+
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(),
1);
}
}
@@ -333,12 +337,12 @@ public abstract class BaseITCase {
}
protected void stopMigrationByJobId(final String jobId) throws
SQLException {
- proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
+ proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
}
// TODO reopen later
protected void startMigrationByJobId(final String jobId) throws
SQLException {
- proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
+ proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
}
protected void commitMigrationByJobId(final String jobId) throws
SQLException {
@@ -352,7 +356,7 @@ public abstract class BaseITCase {
protected String getJobIdByTableName(final String tableName) {
List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
- return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find target table")).get("id").toString();
+ return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find " + tableName + " table")).get("id").toString();
}
@SneakyThrows(InterruptedException.class)
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
index 7e61813433b..1b3788ca445 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
@@ -40,10 +40,4 @@ public final class ExtraSQLCommand {
@XmlElement(name = "full-insert-order-item")
private String fullInsertOrderItem;
-
- @XmlElement(name = "update-table-order-status")
- private String updateTableOrderStatus;
-
- @XmlElement(name = "create-index-status")
- private String createIndexStatus;
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
index bc873e505e0..30b7a9c0b9e 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
@@ -50,12 +50,15 @@ public final class MigrationDistSQLCommand {
@XmlElement(name = "migration-order-single-table")
private String migrationOrderSingleTable;
+ @XmlElement(name = "migration-order-copy-single-table")
+ private String migrationOrderCopySingleTable;
+
+ @XmlElement(name = "migration-order-copy-single-table-with-schema")
+ private String migrationOrderCopySingleTableWithSchema;
+
@XmlElement(name = "migration-order-item-single-table")
private String migrationOrderItemSingleTable;
- @XmlElement(name = "migration-order-single-table-with-schema")
- private String migrationOrderSingleTableWithSchema;
-
@XmlElement(name = "migration-order-item-single-table-with-schema")
private String migrationOrderItemSingleTableWithSchema;
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index 6bd22ddefce..9e78f363a47 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -87,10 +87,10 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(),
dataPair.getLeft());
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
}
- startMigrationOrder(false);
+ startMigrationOrderCopy(false);
startMigrationOrderItem(false);
- startIncrementTask(new MySQLIncrementTask(jdbcTemplate,
keyGenerateAlgorithm, true, 20));
- String orderJobId = getJobIdByTableName("t_order");
+ startIncrementTask(new MySQLIncrementTask(jdbcTemplate,
keyGenerateAlgorithm, 20));
+ String orderJobId = getJobIdByTableName("t_order_copy");
String orderItemJobId = getJobIdByTableName("t_order_item");
assertMigrationSuccessById(orderJobId);
assertMigrationSuccessById(orderItemJobId);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index addbbbc66f0..38f9e6295fc 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -104,12 +104,12 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
}
private void checkOrderMigration(final JdbcTemplate jdbcTemplate) throws
SQLException {
- startMigrationOrder(true);
+ startMigrationOrderCopy(true);
startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate,
SCHEMA_NAME, false, 20));
- String jobId = getJobIdByTableName("t_order");
+ String jobId = getJobIdByTableName("t_order_copy");
waitMigrationFinished(jobId);
stopMigrationByJobId(jobId);
- sourceExecuteWithLog(String.format("INSERT INTO %s.t_order
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", SCHEMA_NAME,
KEY_GENERATE_ALGORITHM.generateKey(),
+ sourceExecuteWithLog(String.format("INSERT INTO %s.t_order_copy
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", SCHEMA_NAME,
KEY_GENERATE_ALGORITHM.generateKey(),
System.currentTimeMillis(), 1, "afterStop"));
startMigrationByJobId(jobId);
assertCheckMigrationSuccess(jobId);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 1ce7515cbda..8f747150a91 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -76,7 +76,7 @@ public class TextPrimaryKeyMigrationIT extends
BaseExtraSQLITCase {
addSourceResource();
addTargetResource();
createTargetOrderTableRule();
- startMigrationOrder(false);
+ startMigrationOrder();
String jobId = listJobId().get(0);
waitMigrationFinished(jobId);
stopMigrationByJobId(jobId);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index a0a049bc846..7cd5035d412 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -36,8 +36,6 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
- private final Boolean incrementOrderItemTogether;
-
private final int executeCountLimit;
@Override
@@ -46,15 +44,13 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
while (executeCount < executeCountLimit &&
!Thread.currentThread().isInterrupted()) {
Object orderPrimaryKey = insertOrder();
if (executeCount % 2 == 0) {
- jdbcTemplate.update("DELETE FROM t_order WHERE id = ?",
orderPrimaryKey);
+ jdbcTemplate.update("DELETE FROM t_order_copy WHERE id = ?",
orderPrimaryKey);
} else {
setNullToOrderFields(orderPrimaryKey);
updateOrderByPrimaryKey(orderPrimaryKey);
}
- if (incrementOrderItemTogether) {
- Object orderItemPrimaryKey = insertOrderItem();
- jdbcTemplate.update("UPDATE t_order_item SET status = ? WHERE
item_id = ?", "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
- }
+ Object orderItemPrimaryKey = insertOrderItem();
+ jdbcTemplate.update("UPDATE t_order_item SET status = ? WHERE
item_id = ?", "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
executeCount++;
}
log.info("MySQL increment task runnable execute successfully.");
@@ -64,7 +60,7 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
Object[] orderInsertDate = new
Object[]{primaryKeyGenerateAlgorithm.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6),
random.nextInt(1, 99), RandomStringUtils.randomAlphabetic(10)};
- jdbcTemplate.update("INSERT INTO t_order
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)",
orderInsertDate);
+ jdbcTemplate.update("INSERT INTO t_order_copy
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)",
orderInsertDate);
return orderInsertDate[0];
}
@@ -78,11 +74,11 @@ public final class MySQLIncrementTask extends
BaseIncrementTask {
private void updateOrderByPrimaryKey(final Object primaryKey) {
Object[] updateData = {"updated" + Instant.now().getEpochSecond(),
ThreadLocalRandom.current().nextInt(0, 100), primaryKey};
- jdbcTemplate.update("UPDATE t_order SET t_char = ?,t_unsigned_int = ?
WHERE id = ?", updateData);
- jdbcTemplate.update("UPDATE t_order SET t_char = null,t_unsigned_int =
299,t_datetime='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
+ jdbcTemplate.update("UPDATE t_order_copy SET t_char = ?,t_unsigned_int
= ? WHERE id = ?", updateData);
+ jdbcTemplate.update("UPDATE t_order_copy SET t_char =
null,t_unsigned_int = 299,t_datetime='0000-00-00 00:00:00' WHERE id = ?",
primaryKey);
}
private void setNullToOrderFields(final Object primaryKey) {
- jdbcTemplate.update("UPDATE t_order SET t_char = null, t_unsigned_int
= null WHERE id = ?", primaryKey);
+ jdbcTemplate.update("UPDATE t_order_copy SET t_char = null,
t_unsigned_int = null WHERE id = ?", primaryKey);
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index 352e5b00cc0..25bc7ddb446 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -56,7 +56,7 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
while (executeCount < executeCountLimit &&
!Thread.currentThread().isInterrupted()) {
Object orderPrimaryKey = insertOrder();
if (executeCount % 2 == 0) {
- jdbcTemplate.update(prefixSchema("DELETE FROM ${schema}t_order
WHERE id = ?", schema), orderPrimaryKey);
+ jdbcTemplate.update(prefixSchema("DELETE FROM
${schema}t_order_copy WHERE id = ?", schema), orderPrimaryKey);
} else {
updateOrderByPrimaryKey(orderPrimaryKey);
}
@@ -74,7 +74,7 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
ThreadLocalRandom random = ThreadLocalRandom.current();
String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
Object[] orderInsertDate = new
Object[]{KEY_GENERATE_ALGORITHM.generateKey(),
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
- jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
+ jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_copy
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
return orderInsertDate[0];
}
@@ -88,7 +88,7 @@ public final class PostgreSQLIncrementTask extends
BaseIncrementTask {
private void updateOrderByPrimaryKey(final Object primaryKey) {
Object[] updateData = {"updated" + Instant.now().getEpochSecond(),
primaryKey};
- jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order SET status =
? WHERE id = ?", schema), updateData);
+ jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order_copy SET
status = ? WHERE id = ?", schema), updateData);
}
private String prefixSchema(final String sql, final String schema) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
index fea5de0e5c4..6944f9100da 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
@@ -81,14 +81,18 @@
);
</create-target-order-item-table-rule>
+ <migration-order-copy-single-table>
+ MIGRATE TABLE ds_0.t_order_copy INTO sharding_db.t_order;
+ </migration-order-copy-single-table>
+
+ <migration-order-copy-single-table-with-schema>
+ MIGRATE TABLE ds_0.test.t_order_copy INTO sharding_db.t_order;
+ </migration-order-copy-single-table-with-schema>
+
<migration-order-single-table>
MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
</migration-order-single-table>
- <migration-order-single-table-with-schema>
- MIGRATE TABLE ds_0.test.t_order INTO sharding_db.t_order;
- </migration-order-single-table-with-schema>
-
<migration-order-item-single-table>
MIGRATE TABLE ds_0.t_order_item INTO sharding_db.t_order_item;
</migration-order-item-single-table>
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
index 3e247f6afe2..3c57a1f9d53 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
@@ -16,7 +16,7 @@
-->
<command>
<create-table-order>
- CREATE TABLE `t_order` (
+ CREATE TABLE `t_order_copy` (
`id` bigint NOT NULL COMMENT 'pk id',
`order_id` bigint NOT NULL,
`user_id` int NOT NULL,
@@ -64,7 +64,7 @@
<full-insert-order>
INSERT INTO
-
t_order(id,order_id,user_id,status,t_mediumint,t_smallint,t_tinyint,t_unsigned_int,t_unsigned_mediumint,
+
t_order_copy(id,order_id,user_id,status,t_mediumint,t_smallint,t_tinyint,t_unsigned_int,t_unsigned_mediumint,
t_unsigned_smallint,t_unsigned_tinyint,t_float,t_double,t_decimal,
t_timestamp,t_datetime,t_date,t_time,t_year,
t_bit,t_binary,t_varbinary,t_blob,t_mediumblob,t_char,t_text,t_mediumtext,t_enum,t_set,
t_json)
VALUES
@@ -74,12 +74,4 @@
<full-insert-order-item>
INSERT INTO t_order_item(item_id,order_id,user_id,status)
VALUES(?,?,?,?)
</full-insert-order-item>
-
- <update-table-order-status>
- UPDATE t_order SET status= 'unlock'
- </update-table-order-status>
-
- <create-index-status>
- CREATE index idx_order_status ON t_order (status)
- </create-index-status>
</command>
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
index 9dd2018da57..668d493f96e 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -16,7 +16,7 @@
-->
<command>
<create-table-order>
- CREATE TABLE test.t_order (
+ CREATE TABLE test.t_order_copy (
id int8 NOT NULL,
order_id int8 NOT NULL,
user_id int4 NOT NULL,
@@ -56,19 +56,11 @@
<full-insert-order>
INSERT INTO
-
test.t_order(id,order_id,user_id,status,t_int2,t_numeric,t_bool,t_char,t_varchar,t_float,t_double,t_timestmap,t_timestamptz)
+
test.t_order_copy(id,order_id,user_id,status,t_int2,t_numeric,t_bool,t_char,t_varchar,t_float,t_double,t_timestmap,t_timestamptz)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
</full-insert-order>
<full-insert-order-item>
INSERT INTO test.t_order_item(item_id,order_id,user_id,status)
VALUES(?,?,?,?)
</full-insert-order-item>
-
- <update-table-order-status>
- UPDATE test.t_order SET status= 'unlock'
- </update-table-order-status>
-
- <create-index-status>
- CREATE INDEX "idx_user_status" ON test.t_order ( status )
- </create-index-status>
</command>