This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 39850eb7a18 Improve migration data consistency checker. (#20407)
39850eb7a18 is described below

commit 39850eb7a188b59fbcf65fb55b098b07b7f6132f
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Aug 22 20:09:04 2022 +0800

    Improve migration data consistency checker. (#20407)
    
    * Improve migration data consistency checker.
    
    * Fix codestyle
    
    * Fix codestyle
---
 .../ShardingSpherePipelineDataSourceCreator.java   |  3 ++-
 .../check/consistency/DataConsistencyChecker.java  | 16 +++++------
 ...DataMatchDataConsistencyCalculateAlgorithm.java |  3 ++-
 .../sqlbuilder/AbstractPipelineSQLBuilder.java     |  2 +-
 ...TableUtil.java => PipelineSchemaTableUtil.java} | 31 +++-------------------
 .../scenario/migration/MigrationJobAPIImpl.java    |  6 ++---
 .../data/pipeline/cases/base/BaseITCase.java       | 13 ++++++---
 .../cases/general/MySQLMigrationGeneralIT.java     | 12 ++++++++-
 .../general/PostgreSQLMigrationGeneralIT.java      | 12 ++++++++-
 .../primarykey/TextPrimaryKeyMigrationIT.java      |  2 +-
 10 files changed, 50 insertions(+), 50 deletions(-)

diff --git 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index c3ecd5cecc2..908b5460d2f 100644
--- 
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++ 
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -44,10 +44,11 @@ public final class ShardingSpherePipelineDataSourceCreator 
implements PipelineDa
         YamlRootConfiguration rootConfig = (YamlRootConfiguration) 
pipelineDataSourceConfig;
         YamlShardingRuleConfiguration shardingRuleConfig = 
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
         enableRangeQueryForInline(shardingRuleConfig);
+        String databaseName = null;
         Map<String, DataSource> dataSourceMap = new 
YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(),
 false);
         Collection<RuleConfiguration> ruleConfigs = new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
         try {
-            return 
ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(), 
dataSourceMap, ruleConfigs, null);
+            return 
ShardingSphereDataSourceFactory.createDataSource(databaseName, dataSourceMap, 
ruleConfigs, null);
             // CHECKSTYLE:OFF
         } catch (final Exception ex) {
             // CHECKSTYLE:ON
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 50edd3ca848..741d7c8a305 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -32,8 +32,9 @@ import 
org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -53,7 +54,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -82,9 +82,7 @@ public final class DataConsistencyChecker {
     public DataConsistencyChecker(final MigrationJobConfiguration jobConfig, 
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
         this.jobConfig = jobConfig;
         logicTableNames = 
Collections.singletonList(jobConfig.getTargetTableName());
-        // TODO need get from actual data source.
-        Map<String, List<String>> schemaTablesMap = 
SchemaTableUtil.getSchemaTablesMap(jobConfig.getTargetDatabaseName(), 
Collections.singleton(jobConfig.getTargetTableName()));
-        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(schemaTablesMap));
+        tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
         this.readRateLimitAlgorithm = readRateLimitAlgorithm;
     }
     
@@ -172,12 +170,12 @@ public final class DataConsistencyChecker {
             String sourceDatabaseType = 
sourceDataSourceConfig.getDatabaseType().getType();
             String targetDatabaseType = 
targetDataSourceConfig.getDatabaseType().getType();
             for (String each : logicTableNames) {
-                ShardingSphereTable table = 
getTableMetaData(jobConfig.getTargetDatabaseName(), each);
-                if (null == table) {
+                PipelineTableMetaData tableMetaData = new 
PipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
 each);
+                if (null == tableMetaData) {
                     throw new PipelineDataConsistencyCheckFailedException("Can 
not get metadata for table " + each);
                 }
-                Collection<String> columnNames = table.getColumns().keySet();
-                String uniqueKey = table.getPrimaryKeyColumns().get(0);
+                Collection<String> columnNames = 
tableMetaData.getColumnNames();
+                String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
                 DataConsistencyCalculateParameter sourceParameter = 
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames, 
sourceDatabaseType, targetDatabaseType, uniqueKey);
                 DataConsistencyCalculateParameter targetParameter = 
buildParameter(targetDataSource, tableNameSchemaNameMapping, each, columnNames, 
targetDatabaseType, sourceDatabaseType, uniqueKey);
                 Iterator<Object> sourceCalculatedResults = 
calculator.calculate(sourceParameter).iterator();
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f1437b7d125..244d7779ebb 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -120,7 +120,8 @@ public final class 
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
         String logicTableName = parameter.getLogicTableName();
         String schemaName = 
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
         String uniqueKey = parameter.getUniqueKey();
-        String cacheKey = schemaName.toLowerCase() + "." + 
logicTableName.toLowerCase();
+        String cacheKey = 
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
 ? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
+                : logicTableName.toLowerCase();
         if (null == parameter.getPreviousCalculatedResult()) {
             return firstSQLCache.computeIfAbsent(cacheKey, s -> 
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
         } else {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 369d1261b14..190062f858c 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -183,7 +183,7 @@ public abstract class AbstractPipelineSQLBuilder implements 
PipelineSQLBuilder {
     }
     
     @Override
-    public String buildChunkedQuerySQL(final @NonNull String schemaName, final 
@NonNull String tableName, final @NonNull String uniqueKey, final boolean 
firstQuery) {
+    public String buildChunkedQuerySQL(final String schemaName, final @NonNull 
String tableName, final @NonNull String uniqueKey, final boolean firstQuery) {
         if (firstQuery) {
             return "SELECT * FROM " + decorate(schemaName, tableName) + " 
ORDER BY " + quote(uniqueKey) + " ASC LIMIT ?";
         } else {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
similarity index 69%
rename from 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
rename to 
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
index fe42737b2b7..98fcbe165fd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
@@ -20,10 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -31,39 +29,16 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
- * Schema table util.
+ * Pipeline schema table util.
  */
 @Slf4j
-public final class SchemaTableUtil {
+public final class PipelineSchemaTableUtil {
     
-    /**
-     * Get schema table map.
-     *
-     * @param databaseName database name
-     * @param logicTables logic tables
-     * @return schema table map
-     */
-    public static Map<String, List<String>> getSchemaTablesMap(final String 
databaseName, final Set<String> logicTables) {
-        // TODO get by search_path
-        ShardingSphereDatabase database = 
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
-        Map<String, List<String>> result = new LinkedHashMap<>();
-        database.getSchemas().forEach((schemaName, schema) -> {
-            for (String each : schema.getAllTableNames()) {
-                if (!logicTables.contains(each)) {
-                    continue;
-                }
-                result.computeIfAbsent(schemaName, unused -> new 
LinkedList<>()).add(each);
-            }
-        });
-        log.info("getSchemaTablesMap, result={}", result);
-        return result;
+    private PipelineSchemaTableUtil() {
     }
     
     /**
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a7908548509..7ce1ec50bbd 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -61,7 +61,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSource
 import 
org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -171,7 +171,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
     public TaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
         tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()), 
new LogicTableName(jobConfig.getSourceTableName()));
-        Map<LogicTableName, String> tableNameSchemaMap = 
TableNameSchemaNameMapping.convert(SchemaTableUtil.getSchemaTablesMapFromActual(jobConfig.getSource(),
 jobConfig.getSourceTableName()));
+        Map<LogicTableName, String> tableNameSchemaMap = 
TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap());
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(tableNameSchemaMap);
         DumperConfiguration dumperConfig = 
createDumperConfiguration(jobConfig.getJobId(), 
jobConfig.getSourceDataSourceName(), jobConfig.getSource(), tableNameMap, 
tableNameSchemaNameMapping);
         // TODO now shardingColumnsMap always empty, 
@@ -493,7 +493,7 @@ public final class MigrationJobAPIImpl extends 
AbstractPipelineJobAPIImpl implem
         
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
         result.setTargetDatabaseName(targetDatabaseName);
         result.setTargetTableName(parameter.getTargetTableName());
-        
result.setSchemaTablesMap(SchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
+        
result.setSchemaTablesMap(PipelineSchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
                 parameter.getSourceTableName()));
         extendYamlJobConfiguration(result);
         MigrationJobConfiguration jobConfiguration = new 
YamlMigrationJobConfigurationSwapper().swapToObject(result);
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index b0dcc2f8433..76a0b4e5473 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -283,6 +283,7 @@ public abstract class BaseITCase {
     }
     
     protected List<Map<String, Object>> queryForListWithLog(final String sql) {
+        log.info("proxy query for list:{}", sql);
         int retryNumber = 0;
         while (retryNumber <= 3) {
             try (Connection connection = proxyDataSource.getConnection()) {
@@ -316,15 +317,19 @@ public abstract class BaseITCase {
         getIncreaseTaskThread().start();
     }
     
-    protected void stopMigration(final String jobId) {
+    protected void stopMigrationByJobId(final String jobId) {
         proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
     }
     
     // TODO reopen later
-    protected void startMigrationByJob(final String jobId) {
+    protected void startMigrationByJobId(final String jobId) {
         proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
     }
     
+    protected void cleanMigrationByJobId(final String jobId) {
+        proxyExecuteWithLog(String.format("CLEAN MIGRATION '%s'", jobId), 1);
+    }
+    
     protected List<String> listJobId() {
         List<Map<String, Object>> jobList = queryForListWithLog("SHOW 
MIGRATION LIST");
         return jobList.stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
@@ -357,7 +362,8 @@ public abstract class BaseITCase {
         }
     }
     
-    protected void assertGreaterThanInitTableInitRows(final int tableInitRows, 
final String schema) {
+    protected void assertGreaterThanOrderTableInitRows(final int 
tableInitRows, final String schema) {
+        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as 
count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order", 
schema);
         Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
         assertTrue("actual count " + actual.get("count"), 
Integer.parseInt(actual.get("count").toString()) > tableInitRows);
@@ -376,7 +382,6 @@ public abstract class BaseITCase {
         }
         boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
         log.info("second check job result: {}", secondCheckJobResult);
-        proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
         List<Map<String, Object>> checkScalingResults = 
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE 
(NAME='DATA_MATCH')", jobId));
         log.info("checkScalingResults: {}", checkScalingResults);
         for (Map<String, Object> entry : checkScalingResults) {
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index d7a8fa4f61e..cbeef5e5d5d 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -37,6 +37,9 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 /**
  * General scaling test case, includes multiple cases.
  */
@@ -84,6 +87,12 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         startMigrationOrderItem();
         checkOrderMigration(keyGenerateAlgorithm, jdbcTemplate);
         checkOrderItemMigration();
+        for (String each : listJobId()) {
+            cleanMigrationByJobId(each);
+        }
+        List<String> lastJobIds = listJobId();
+        assertThat(lastJobIds.size(), is(0));
+        assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
     }
     
     private void checkOrderMigration(final KeyGenerateAlgorithm 
keyGenerateAlgorithm, final JdbcTemplate jdbcTemplate) {
@@ -92,7 +101,7 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         String jobId = getJobIdByTableName("t_order");
         waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
-        assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
+        stopMigrationByJobId(jobId);
     }
     
     private void checkOrderItemMigration() {
@@ -100,5 +109,6 @@ public final class MySQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         String jobId = getJobIdByTableName("t_order_item");
         waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
+        stopMigrationByJobId(jobId);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index 6cd69231d09..4d15ee9af87 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -37,6 +37,9 @@ import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
 /**
  * PostgreSQL general scaling test case. include openGauss type, same process.
  */
@@ -86,6 +89,12 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(), 
dataPair.getRight());
         checkOrderMigration(jdbcTemplate);
         checkOrderItemMigration();
+        for (String each : listJobId()) {
+            cleanMigrationByJobId(each);
+        }
+        List<String> lastJobIds = listJobId();
+        assertThat(lastJobIds.size(), is(0));
+        assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "test");
     }
     
     private void checkOrderMigration(final JdbcTemplate jdbcTemplate) {
@@ -94,7 +103,7 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         String jobId = getJobIdByTableName("t_order");
         waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
-        assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
+        stopMigrationByJobId(jobId);
     }
     
     private void checkOrderItemMigration() {
@@ -102,5 +111,6 @@ public final class PostgreSQLMigrationGeneralIT extends 
BaseExtraSQLITCase {
         String jobId = getJobIdByTableName("t_order_item");
         waitMigrationFinished(jobId);
         assertCheckScalingSuccess(jobId);
+        stopMigrationByJobId(jobId);
     }
 }
diff --git 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index fbe0c01c3ce..cf751ea098d 100644
--- 
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++ 
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -75,7 +75,7 @@ public class TextPrimaryKeyMigrationIT extends 
BaseExtraSQLITCase {
         startMigrationOrder();
         String jobId = listJobId().get(0);
         waitMigrationFinished(jobId);
-        stopMigration(jobId);
+        stopMigrationByJobId(jobId);
         assertCheckScalingSuccess(jobId);
     }
     

Reply via email to