This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 64f2ef306f0 Support changing table name at migrating (#20668)
64f2ef306f0 is described below

commit 64f2ef306f0bf583eb41245acc93a6353a229a40
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 31 17:09:52 2022 +0800

    Support changing table name at migrating (#20668)
    
    * Support changing table names when migrating
    
    * Fix codestyle
---
 .../handler/update/MigrateTableUpdater.java        |  1 -
 .../migration/MigrationDataConsistencyChecker.java | 37 ++++++++++++++--------
 .../scenario/migration/MigrationJobAPIImpl.java    |  5 +--
 .../pipeline/cases/base/BaseExtraSQLITCase.java    |  6 ++--
 .../data/pipeline/cases/base/BaseITCase.java       | 20 +++++++-----
 .../pipeline/cases/command/ExtraSQLCommand.java    |  6 ----
 .../cases/command/MigrationDistSQLCommand.java     |  9 ++++--
 .../cases/general/MySQLMigrationGeneralIT.java     |  6 ++--
 .../general/PostgreSQLMigrationGeneralIT.java      |  6 ++--
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  2 +-
 .../pipeline/cases/task/MySQLIncrementTask.java    | 18 ++++-------
 .../cases/task/PostgreSQLIncrementTask.java        |  6 ++--
 .../src/test/resources/env/common/command.xml      | 12 ++++---
 .../test/resources/env/scenario/general/mysql.xml  | 12 ++-----
 .../resources/env/scenario/general/postgresql.xml  | 12 ++-----
 15 files changed, 77 insertions(+), 81 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index c8658614503..23f41628a5d 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -39,7 +39,6 @@ public final class MigrateTableUpdater implements 
RALUpdater<MigrateTableStateme
         log.info("start migrate job by {}", sqlStatement);
         String targetDatabaseName = 
ObjectUtils.defaultIfNull(sqlStatement.getTargetDatabaseName(), databaseName);
         Preconditions.checkNotNull(targetDatabaseName, "Target database name 
is null. You could define it in DistSQL or select a database.");
-        
Preconditions.checkArgument(sqlStatement.getSourceTableName().equalsIgnoreCase(sqlStatement.getTargetTableName()),
 "Source table name and target table name must be the same for now.");
         CreateMigrationJobParameter createMigrationJobParameter = new 
CreateMigrationJobParameter(sqlStatement.getSourceResourceName(), 
sqlStatement.getSourceSchemaName(),
                 sqlStatement.getSourceTableName(), targetDatabaseName, 
sqlStatement.getTargetTableName());
         JOB_API.createJobAndStart(createMigrationJobParameter);
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
index 9b61a39e315..dd25752b284 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationDataConsistencyChecker.java
@@ -44,9 +44,11 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -69,14 +71,18 @@ public final class MigrationDataConsistencyChecker {
     
     private final String sourceTableName;
     
+    private final String targetTableName;
+    
     private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
     
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
-        this.sourceTableName = jobConfig.getSourceTableName();
-        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
 Collections.singletonList(jobConfig.getSourceTableName())));
+        sourceTableName = jobConfig.getSourceTableName();
+        targetTableName = jobConfig.getTargetTableName();
+        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(),
 
+                new HashSet<>(Arrays.asList(jobConfig.getSourceTableName(), 
jobConfig.getTargetTableName()))));
         this.readRateLimitAlgorithm = readRateLimitAlgorithm;
     }
     
@@ -107,9 +113,7 @@ public final class MigrationDataConsistencyChecker {
         try (
                 PipelineDataSourceWrapper sourceDataSource = 
PipelineDataSourceFactory.newInstance(sourceDataSourceConfig);
                 PipelineDataSourceWrapper targetDataSource = 
PipelineDataSourceFactory.newInstance(targetDataSourceConfig)) {
-            for (String each : Collections.singletonList(sourceTableName)) {
-                result.put(each, checkCount(each, sourceDataSource, 
targetDataSource, executor));
-            }
+            result.put(sourceTableName, checkCount(sourceDataSource, 
targetDataSource, executor));
             return result;
         } catch (final SQLException ex) {
             throw new PipelineDataConsistencyCheckFailedException("Count check 
failed", ex);
@@ -119,17 +123,23 @@ public final class MigrationDataConsistencyChecker {
         }
     }
     
-    private DataConsistencyCountCheckResult checkCount(final String table, 
final PipelineDataSourceWrapper sourceDataSource, final 
PipelineDataSourceWrapper targetDataSource,
+    private DataConsistencyCountCheckResult checkCount(final 
PipelineDataSourceWrapper sourceDataSource, final PipelineDataSourceWrapper 
targetDataSource,
                                                        final 
ThreadPoolExecutor executor) {
+        Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, sourceTableName, sourceDataSource.getDatabaseType()));
+        Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, targetTableName, targetDataSource.getDatabaseType()));
+        long sourceCount;
+        long targetCount;
+        try {
+            sourceCount = sourceFuture.get();
+        } catch (final InterruptedException | ExecutionException ex) {
+            throw new 
PipelineDataConsistencyCheckFailedException(String.format("Count check failed 
for source table '%s'", sourceTableName), ex);
+        }
         try {
-            Future<Long> sourceFuture = executor.submit(() -> 
count(sourceDataSource, table, sourceDataSource.getDatabaseType()));
-            Future<Long> targetFuture = executor.submit(() -> 
count(targetDataSource, table, targetDataSource.getDatabaseType()));
-            long sourceCount = sourceFuture.get();
-            long targetCount = targetFuture.get();
-            return new DataConsistencyCountCheckResult(sourceCount, 
targetCount);
+            targetCount = targetFuture.get();
         } catch (final InterruptedException | ExecutionException ex) {
-            throw new 
PipelineDataConsistencyCheckFailedException(String.format("Count check failed 
for table '%s'", table), ex);
+            throw new 
PipelineDataConsistencyCheckFailedException(String.format("Count check failed 
for target table '%s'", targetTableName), ex);
         }
+        return new DataConsistencyCountCheckResult(sourceCount, targetCount);
     }
     
     // TODO use digest (crc32, murmurhash)
@@ -171,7 +181,8 @@ public final class MigrationDataConsistencyChecker {
                 Collection<String> columnNames = 
tableMetaData.getColumnNames();
                 PipelineColumnMetaData uniqueKey = 
tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0));
                 DataConsistencyCalculateParameter sourceParameter = 
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
-                DataConsistencyCalculateParameter targetParameter = 
buildParameter(targetDataSource, tableNameSchemaNameMapping, each, columnNames, 
targetDatabaseType, sourceDatabaseType, uniqueKey);
+                DataConsistencyCalculateParameter targetParameter = 
buildParameter(targetDataSource, tableNameSchemaNameMapping, targetTableName, 
columnNames, targetDatabaseType, sourceDatabaseType,
+                        uniqueKey);
                 Iterator<Object> sourceCalculatedResults = 
calculator.calculate(sourceParameter).iterator();
                 Iterator<Object> targetCalculatedResults = 
calculator.calculate(targetParameter).iterator();
                 boolean contentMatched = true;
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index c6826e5b62d..176440a3540 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -161,7 +161,8 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
             PipelineDataSourceConfiguration targetDataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(config.getTarget().getType(),
 config.getTarget().getParameter());
             
config.setTargetDatabaseType(targetDataSourceConfig.getDatabaseType().getType());
         }
-        JobDataNodeEntry nodeEntry = new 
JobDataNodeEntry(config.getSourceTableName(),
+        // target table name is logic table name, source table name is actual 
table name.
+        JobDataNodeEntry nodeEntry = new 
JobDataNodeEntry(config.getTargetTableName(),
                 Collections.singletonList(new 
DataNode(config.getSourceResourceName(), config.getSourceTableName())));
         String dataNodeLine = new 
JobDataNodeLine(Collections.singletonList(nodeEntry)).marshal();
         config.setTablesFirstDataNodes(dataNodeLine);
@@ -193,7 +194,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     public TaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
         Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
-        tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), 
new LogicTableName(jobConfig.getSourceTableName()));
+        tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), 
new LogicTableName(jobConfig.getTargetTableName()));
         Map<LogicTableName, String> tableNameSchemaMap = 
TableNameSchemaNameMapping.convert(jobConfig.getSourceSchemaName(), 
Collections.singletonList(jobConfig.getTargetTableName()));
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(tableNameSchemaMap);
         DumperConfiguration dumperConfig = 
createDumperConfiguration(jobConfig.getJobId(), 
jobConfig.getSourceResourceName(), jobConfig.getSource(), tableNameMap, 
tableNameSchemaNameMapping);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
index f95ded78a51..0f260ebe5a2 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseExtraSQLITCase.java
@@ -44,14 +44,14 @@ public abstract class BaseExtraSQLITCase extends BaseITCase 
{
     
     protected void createSourceTableIndexList(final String schema) throws 
SQLException {
         if (DatabaseTypeUtil.isPostgreSQL(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS 
idx_user_id ON %s.t_order ( user_id )", schema));
+            sourceExecuteWithLog(String.format("CREATE INDEX IF NOT EXISTS 
idx_user_id ON %s.t_order_copy ( user_id )", schema));
         } else if (DatabaseTypeUtil.isOpenGauss(getDatabaseType())) {
-            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON 
%s.t_order ( user_id )", schema));
+            sourceExecuteWithLog(String.format("CREATE INDEX idx_user_id ON 
%s.t_order_copy ( user_id )", schema));
         }
     }
     
     protected void createSourceCommentOnList(final String schema) throws 
SQLException {
-        sourceExecuteWithLog(String.format("COMMENT ON COLUMN 
%s.t_order.user_id IS 'user id'", schema));
+        sourceExecuteWithLog(String.format("COMMENT ON COLUMN 
%s.t_order_copy.user_id IS 'user id'", schema));
     }
     
     protected void createSourceOrderItemTable() throws SQLException {
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index ad81f6cacd9..d7b2acea72b 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -229,19 +229,23 @@ public abstract class BaseITCase {
         
proxyExecuteWithLog(migrationDistSQLCommand.getCreateTargetOrderItemTableRule(),
 2);
     }
     
-    protected void startMigrationOrder(final boolean withSchema) throws 
SQLException {
+    protected void startMigrationOrderCopy(final boolean withSchema) throws 
SQLException {
         if (withSchema) {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTableWithSchema(),
 5);
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTableWithSchema(),
 1);
         } else {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 5);
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderCopySingleTable(), 
1);
         }
     }
     
+    protected void startMigrationOrder() throws SQLException {
+        
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderSingleTable(), 1);
+    }
+    
     protected void startMigrationOrderItem(final boolean withSchema) throws 
SQLException {
         if (withSchema) {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
 5);
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTableWithSchema(),
 1);
         } else {
-            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 
5);
+            
proxyExecuteWithLog(migrationDistSQLCommand.getMigrationOrderItemSingleTable(), 
1);
         }
     }
     
@@ -333,12 +337,12 @@ public abstract class BaseITCase {
     }
     
     protected void stopMigrationByJobId(final String jobId) throws 
SQLException {
-        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
+        proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 1);
     }
     
     // TODO reopen later
     protected void startMigrationByJobId(final String jobId) throws 
SQLException {
-        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
+        proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 1);
     }
     
     protected void commitMigrationByJobId(final String jobId) throws 
SQLException {
@@ -352,7 +356,7 @@ public abstract class BaseITCase {
     
     protected String getJobIdByTableName(final String tableName) {
         List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
-        return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find target table")).get("id").toString();
+        return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
     }
     
     @SneakyThrows(InterruptedException.class)
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
index 7e61813433b..1b3788ca445 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/ExtraSQLCommand.java
@@ -40,10 +40,4 @@ public final class ExtraSQLCommand {
     
     @XmlElement(name = "full-insert-order-item")
     private String fullInsertOrderItem;
-    
-    @XmlElement(name = "update-table-order-status")
-    private String updateTableOrderStatus;
-    
-    @XmlElement(name = "create-index-status")
-    private String createIndexStatus;
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
index bc873e505e0..30b7a9c0b9e 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/command/MigrationDistSQLCommand.java
@@ -50,12 +50,15 @@ public final class MigrationDistSQLCommand {
     @XmlElement(name = "migration-order-single-table")
     private String migrationOrderSingleTable;
     
+    @XmlElement(name = "migration-order-copy-single-table")
+    private String migrationOrderCopySingleTable;
+    
+    @XmlElement(name = "migration-order-copy-single-table-with-schema")
+    private String migrationOrderCopySingleTableWithSchema;
+    
     @XmlElement(name = "migration-order-item-single-table")
     private String migrationOrderItemSingleTable;
     
-    @XmlElement(name = "migration-order-single-table-with-schema")
-    private String migrationOrderSingleTableWithSchema;
-    
     @XmlElement(name = "migration-order-item-single-table-with-schema")
     private String migrationOrderItemSingleTableWithSchema;
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index 6bd22ddefce..9e78f363a47 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -87,10 +87,10 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
             
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrder(), 
dataPair.getLeft());
             
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
         }
-        startMigrationOrder(false);
+        startMigrationOrderCopy(false);
         startMigrationOrderItem(false);
-        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, 
keyGenerateAlgorithm, true, 20));
-        String orderJobId = getJobIdByTableName("t_order");
+        startIncrementTask(new MySQLIncrementTask(jdbcTemplate, 
keyGenerateAlgorithm, 20));
+        String orderJobId = getJobIdByTableName("t_order_copy");
         String orderItemJobId = getJobIdByTableName("t_order_item");
         assertMigrationSuccessById(orderJobId);
         assertMigrationSuccessById(orderItemJobId);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index addbbbc66f0..38f9e6295fc 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -104,12 +104,12 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
     }
     
     private void checkOrderMigration(final JdbcTemplate jdbcTemplate) throws 
SQLException {
-        startMigrationOrder(true);
+        startMigrationOrderCopy(true);
         startIncrementTask(new PostgreSQLIncrementTask(jdbcTemplate, 
SCHEMA_NAME, false, 20));
-        String jobId = getJobIdByTableName("t_order");
+        String jobId = getJobIdByTableName("t_order_copy");
         waitMigrationFinished(jobId);
         stopMigrationByJobId(jobId);
-        sourceExecuteWithLog(String.format("INSERT INTO %s.t_order 
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", SCHEMA_NAME, 
KEY_GENERATE_ALGORITHM.generateKey(),
+        sourceExecuteWithLog(String.format("INSERT INTO %s.t_order_copy 
(id,order_id,user_id,status) VALUES (%s, %s, %s, '%s')", SCHEMA_NAME, 
KEY_GENERATE_ALGORITHM.generateKey(),
                 System.currentTimeMillis(), 1, "afterStop"));
         startMigrationByJobId(jobId);
         assertCheckMigrationSuccess(jobId);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index 1ce7515cbda..8f747150a91 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -76,7 +76,7 @@ public class TextPrimaryKeyMigrationIT extends 
BaseExtraSQLITCase {
         addSourceResource();
         addTargetResource();
         createTargetOrderTableRule();
-        startMigrationOrder(false);
+        startMigrationOrder();
         String jobId = listJobId().get(0);
         waitMigrationFinished(jobId);
         stopMigrationByJobId(jobId);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
index a0a049bc846..7cd5035d412 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/MySQLIncrementTask.java
@@ -36,8 +36,6 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
     
     private final KeyGenerateAlgorithm primaryKeyGenerateAlgorithm;
     
-    private final Boolean incrementOrderItemTogether;
-    
     private final int executeCountLimit;
     
     @Override
@@ -46,15 +44,13 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
         while (executeCount < executeCountLimit && 
!Thread.currentThread().isInterrupted()) {
             Object orderPrimaryKey = insertOrder();
             if (executeCount % 2 == 0) {
-                jdbcTemplate.update("DELETE FROM t_order WHERE id = ?", 
orderPrimaryKey);
+                jdbcTemplate.update("DELETE FROM t_order_copy WHERE id = ?", 
orderPrimaryKey);
             } else {
                 setNullToOrderFields(orderPrimaryKey);
                 updateOrderByPrimaryKey(orderPrimaryKey);
             }
-            if (incrementOrderItemTogether) {
-                Object orderItemPrimaryKey = insertOrderItem();
-                jdbcTemplate.update("UPDATE t_order_item SET status = ? WHERE 
item_id = ?", "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
-            }
+            Object orderItemPrimaryKey = insertOrderItem();
+            jdbcTemplate.update("UPDATE t_order_item SET status = ? WHERE 
item_id = ?", "updated" + Instant.now().getEpochSecond(), orderItemPrimaryKey);
             executeCount++;
         }
         log.info("MySQL increment task runnable execute successfully.");
@@ -64,7 +60,7 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         Object[] orderInsertDate = new 
Object[]{primaryKeyGenerateAlgorithm.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6),
                 random.nextInt(1, 99), RandomStringUtils.randomAlphabetic(10)};
-        jdbcTemplate.update("INSERT INTO t_order 
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)", 
orderInsertDate);
+        jdbcTemplate.update("INSERT INTO t_order_copy 
(id,order_id,user_id,t_unsigned_int,status) VALUES (?, ?, ?, ?, ?)", 
orderInsertDate);
         return orderInsertDate[0];
     }
     
@@ -78,11 +74,11 @@ public final class MySQLIncrementTask extends 
BaseIncrementTask {
     
     private void updateOrderByPrimaryKey(final Object primaryKey) {
         Object[] updateData = {"updated" + Instant.now().getEpochSecond(), 
ThreadLocalRandom.current().nextInt(0, 100), primaryKey};
-        jdbcTemplate.update("UPDATE t_order SET t_char = ?,t_unsigned_int = ? 
WHERE id = ?", updateData);
-        jdbcTemplate.update("UPDATE t_order SET t_char = null,t_unsigned_int = 
299,t_datetime='0000-00-00 00:00:00' WHERE id = ?", primaryKey);
+        jdbcTemplate.update("UPDATE t_order_copy SET t_char = ?,t_unsigned_int 
= ? WHERE id = ?", updateData);
+        jdbcTemplate.update("UPDATE t_order_copy SET t_char = 
null,t_unsigned_int = 299,t_datetime='0000-00-00 00:00:00' WHERE id = ?", 
primaryKey);
     }
     
     private void setNullToOrderFields(final Object primaryKey) {
-        jdbcTemplate.update("UPDATE t_order SET t_char = null, t_unsigned_int 
= null WHERE id = ?", primaryKey);
+        jdbcTemplate.update("UPDATE t_order_copy SET t_char = null, 
t_unsigned_int = null WHERE id = ?", primaryKey);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
index 352e5b00cc0..25bc7ddb446 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/task/PostgreSQLIncrementTask.java
@@ -56,7 +56,7 @@ public final class PostgreSQLIncrementTask extends 
BaseIncrementTask {
         while (executeCount < executeCountLimit && 
!Thread.currentThread().isInterrupted()) {
             Object orderPrimaryKey = insertOrder();
             if (executeCount % 2 == 0) {
-                jdbcTemplate.update(prefixSchema("DELETE FROM ${schema}t_order 
WHERE id = ?", schema), orderPrimaryKey);
+                jdbcTemplate.update(prefixSchema("DELETE FROM 
${schema}t_order_copy WHERE id = ?", schema), orderPrimaryKey);
             } else {
                 updateOrderByPrimaryKey(orderPrimaryKey);
             }
@@ -74,7 +74,7 @@ public final class PostgreSQLIncrementTask extends 
BaseIncrementTask {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         String status = random.nextInt() % 2 == 0 ? null : "NOT-NULL";
         Object[] orderInsertDate = new 
Object[]{KEY_GENERATE_ALGORITHM.generateKey(), 
ScalingCaseHelper.generateSnowflakeKey(), random.nextInt(0, 6), status};
-        jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order 
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
+        jdbcTemplate.update(prefixSchema("INSERT INTO ${schema}t_order_copy 
(id,order_id,user_id,status) VALUES (?, ?, ?, ?)", schema), orderInsertDate);
         return orderInsertDate[0];
     }
     
@@ -88,7 +88,7 @@ public final class PostgreSQLIncrementTask extends 
BaseIncrementTask {
     
     private void updateOrderByPrimaryKey(final Object primaryKey) {
         Object[] updateData = {"updated" + Instant.now().getEpochSecond(), 
primaryKey};
-        jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order SET status = 
? WHERE id = ?", schema), updateData);
+        jdbcTemplate.update(prefixSchema("UPDATE ${schema}t_order_copy SET 
status = ? WHERE id = ?", schema), updateData);
     }
     
     private String prefixSchema(final String sql, final String schema) {
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
index fea5de0e5c4..6944f9100da 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/common/command.xml
@@ -81,14 +81,18 @@
         );
     </create-target-order-item-table-rule>
     
+    <migration-order-copy-single-table>
+        MIGRATE TABLE ds_0.t_order_copy INTO sharding_db.t_order;
+    </migration-order-copy-single-table>
+    
+    <migration-order-copy-single-table-with-schema>
+        MIGRATE TABLE ds_0.test.t_order_copy INTO sharding_db.t_order;
+    </migration-order-copy-single-table-with-schema>
+    
     <migration-order-single-table>
         MIGRATE TABLE ds_0.t_order INTO sharding_db.t_order;
     </migration-order-single-table>
     
-    <migration-order-single-table-with-schema>
-        MIGRATE TABLE ds_0.test.t_order INTO sharding_db.t_order;
-    </migration-order-single-table-with-schema>
-    
     <migration-order-item-single-table>
         MIGRATE TABLE ds_0.t_order_item INTO sharding_db.t_order_item;
     </migration-order-item-single-table>
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
index 3e247f6afe2..3c57a1f9d53 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/mysql.xml
@@ -16,7 +16,7 @@
   -->
 <command>
     <create-table-order>
-        CREATE TABLE `t_order` (
+        CREATE TABLE `t_order_copy` (
         `id` bigint NOT NULL COMMENT 'pk id',
         `order_id` bigint NOT NULL,
         `user_id` int NOT NULL,
@@ -64,7 +64,7 @@
     
     <full-insert-order>
         INSERT INTO
-        
t_order(id,order_id,user_id,status,t_mediumint,t_smallint,t_tinyint,t_unsigned_int,t_unsigned_mediumint,
+        
t_order_copy(id,order_id,user_id,status,t_mediumint,t_smallint,t_tinyint,t_unsigned_int,t_unsigned_mediumint,
         t_unsigned_smallint,t_unsigned_tinyint,t_float,t_double,t_decimal, 
t_timestamp,t_datetime,t_date,t_time,t_year,
         
t_bit,t_binary,t_varbinary,t_blob,t_mediumblob,t_char,t_text,t_mediumtext,t_enum,t_set,
 t_json) 
         VALUES
@@ -74,12 +74,4 @@
     <full-insert-order-item>
         INSERT INTO t_order_item(item_id,order_id,user_id,status) 
VALUES(?,?,?,?)
     </full-insert-order-item>
-
-    <update-table-order-status>
-        UPDATE t_order SET status= 'unlock'
-    </update-table-order-status>
-
-    <create-index-status>
-        CREATE index idx_order_status ON t_order (status)
-    </create-index-status>
 </command>
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
index 9dd2018da57..668d493f96e 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/resources/env/scenario/general/postgresql.xml
@@ -16,7 +16,7 @@
   -->
 <command>
     <create-table-order>
-        CREATE TABLE test.t_order (
+        CREATE TABLE test.t_order_copy (
         id int8 NOT NULL,
         order_id int8 NOT NULL,
         user_id int4 NOT NULL,
@@ -56,19 +56,11 @@
     
     <full-insert-order>
         INSERT INTO
-        
test.t_order(id,order_id,user_id,status,t_int2,t_numeric,t_bool,t_char,t_varchar,t_float,t_double,t_timestmap,t_timestamptz)
+        
test.t_order_copy(id,order_id,user_id,status,t_int2,t_numeric,t_bool,t_char,t_varchar,t_float,t_double,t_timestmap,t_timestamptz)
         VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)
     </full-insert-order>
     
     <full-insert-order-item>
         INSERT INTO test.t_order_item(item_id,order_id,user_id,status) 
VALUES(?,?,?,?)
     </full-insert-order-item>
-
-    <update-table-order-status>
-        UPDATE test.t_order SET status= 'unlock'
-    </update-table-order-status>
-
-    <create-index-status>
-        CREATE INDEX "idx_user_status" ON test.t_order ( status )
-    </create-index-status>
 </command>

Reply via email to