This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 39850eb7a18 Improve migration data consistency checker. (#20407)
39850eb7a18 is described below
commit 39850eb7a188b59fbcf65fb55b098b07b7f6132f
Author: Xinze Guo <[email protected]>
AuthorDate: Mon Aug 22 20:09:04 2022 +0800
Improve migration data consistency checker. (#20407)
* Improve migration data consistency checker.
* Fix codestyle
* Fix codestyle
---
.../ShardingSpherePipelineDataSourceCreator.java | 3 ++-
.../check/consistency/DataConsistencyChecker.java | 16 +++++------
...DataMatchDataConsistencyCalculateAlgorithm.java | 3 ++-
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 2 +-
...TableUtil.java => PipelineSchemaTableUtil.java} | 31 +++-------------------
.../scenario/migration/MigrationJobAPIImpl.java | 6 ++---
.../data/pipeline/cases/base/BaseITCase.java | 13 ++++++---
.../cases/general/MySQLMigrationGeneralIT.java | 12 ++++++++-
.../general/PostgreSQLMigrationGeneralIT.java | 12 ++++++++-
.../primarykey/TextPrimaryKeyMigrationIT.java | 2 +-
10 files changed, 50 insertions(+), 50 deletions(-)
diff --git
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
index c3ecd5cecc2..908b5460d2f 100644
---
a/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
+++
b/shardingsphere-jdbc/shardingsphere-jdbc-core/src/main/java/org/apache/shardingsphere/driver/data/pipeline/datasource/creator/ShardingSpherePipelineDataSourceCreator.java
@@ -44,10 +44,11 @@ public final class ShardingSpherePipelineDataSourceCreator
implements PipelineDa
YamlRootConfiguration rootConfig = (YamlRootConfiguration)
pipelineDataSourceConfig;
YamlShardingRuleConfiguration shardingRuleConfig =
ShardingRuleConfigurationConverter.findYamlShardingRuleConfiguration(rootConfig.getRules());
enableRangeQueryForInline(shardingRuleConfig);
+ String databaseName = null;
Map<String, DataSource> dataSourceMap = new
YamlDataSourceConfigurationSwapper().swapToDataSources(rootConfig.getDataSources(),
false);
Collection<RuleConfiguration> ruleConfigs = new
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(rootConfig.getRules());
try {
- return
ShardingSphereDataSourceFactory.createDataSource(rootConfig.getDatabaseName(),
dataSourceMap, ruleConfigs, null);
+ return
ShardingSphereDataSourceFactory.createDataSource(databaseName, dataSourceMap,
ruleConfigs, null);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
index 50edd3ca848..741d7c8a305 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/DataConsistencyChecker.java
@@ -32,8 +32,9 @@ import
org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineDataConsistencyCheckFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.PipelineSQLBuilderFactory;
-import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.type.DatabaseType;
@@ -53,7 +54,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
@@ -82,9 +82,7 @@ public final class DataConsistencyChecker {
public DataConsistencyChecker(final MigrationJobConfiguration jobConfig,
final JobRateLimitAlgorithm readRateLimitAlgorithm) {
this.jobConfig = jobConfig;
logicTableNames =
Collections.singletonList(jobConfig.getTargetTableName());
- // TODO need get from actual data source.
- Map<String, List<String>> schemaTablesMap =
SchemaTableUtil.getSchemaTablesMap(jobConfig.getTargetDatabaseName(),
Collections.singleton(jobConfig.getTargetTableName()));
- tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(schemaTablesMap));
+ tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap()));
this.readRateLimitAlgorithm = readRateLimitAlgorithm;
}
@@ -172,12 +170,12 @@ public final class DataConsistencyChecker {
String sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType().getType();
String targetDatabaseType =
targetDataSourceConfig.getDatabaseType().getType();
for (String each : logicTableNames) {
- ShardingSphereTable table =
getTableMetaData(jobConfig.getTargetDatabaseName(), each);
- if (null == table) {
+ PipelineTableMetaData tableMetaData = new
PipelineTableMetaDataLoader(sourceDataSource).getTableMetaData(tableNameSchemaNameMapping.getSchemaName(each),
each);
+ if (null == tableMetaData) {
throw new PipelineDataConsistencyCheckFailedException("Can
not get metadata for table " + each);
}
- Collection<String> columnNames = table.getColumns().keySet();
- String uniqueKey = table.getPrimaryKeyColumns().get(0);
+ Collection<String> columnNames =
tableMetaData.getColumnNames();
+ String uniqueKey = tableMetaData.getPrimaryKeyColumns().get(0);
DataConsistencyCalculateParameter sourceParameter =
buildParameter(sourceDataSource, tableNameSchemaNameMapping, each, columnNames,
sourceDatabaseType, targetDatabaseType, uniqueKey);
DataConsistencyCalculateParameter targetParameter =
buildParameter(targetDataSource, tableNameSchemaNameMapping, each, columnNames,
targetDatabaseType, sourceDatabaseType, uniqueKey);
Iterator<Object> sourceCalculatedResults =
calculator.calculate(sourceParameter).iterator();
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
index f1437b7d125..244d7779ebb 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/check/consistency/algorithm/DataMatchDataConsistencyCalculateAlgorithm.java
@@ -120,7 +120,8 @@ public final class
DataMatchDataConsistencyCalculateAlgorithm extends AbstractSt
String logicTableName = parameter.getLogicTableName();
String schemaName =
parameter.getTableNameSchemaNameMapping().getSchemaName(logicTableName);
String uniqueKey = parameter.getUniqueKey();
- String cacheKey = schemaName.toLowerCase() + "." +
logicTableName.toLowerCase();
+ String cacheKey =
DatabaseTypeFactory.getInstance(parameter.getDatabaseType()).isSchemaAvailable()
? schemaName.toLowerCase() + "." + logicTableName.toLowerCase()
+ : logicTableName.toLowerCase();
if (null == parameter.getPreviousCalculatedResult()) {
return firstSQLCache.computeIfAbsent(cacheKey, s ->
sqlBuilder.buildChunkedQuerySQL(schemaName, logicTableName, uniqueKey, true));
} else {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index 369d1261b14..190062f858c 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -183,7 +183,7 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
}
@Override
- public String buildChunkedQuerySQL(final @NonNull String schemaName, final
@NonNull String tableName, final @NonNull String uniqueKey, final boolean
firstQuery) {
+ public String buildChunkedQuerySQL(final String schemaName, final @NonNull
String tableName, final @NonNull String uniqueKey, final boolean firstQuery) {
if (firstQuery) {
return "SELECT * FROM " + decorate(schemaName, tableName) + "
ORDER BY " + quote(uniqueKey) + " ASC LIMIT ?";
} else {
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
similarity index 69%
rename from
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
rename to
shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
index fe42737b2b7..98fcbe165fd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/SchemaTableUtil.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineSchemaTableUtil.java
@@ -20,10 +20,8 @@ package org.apache.shardingsphere.data.pipeline.core.util;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSourceResourceException;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -31,39 +29,16 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
- * Schema table util.
+ * Pipeline schema table util.
*/
@Slf4j
-public final class SchemaTableUtil {
+public final class PipelineSchemaTableUtil {
- /**
- * Get schema table map.
- *
- * @param databaseName database name
- * @param logicTables logic tables
- * @return schema table map
- */
- public static Map<String, List<String>> getSchemaTablesMap(final String
databaseName, final Set<String> logicTables) {
- // TODO get by search_path
- ShardingSphereDatabase database =
PipelineContext.getContextManager().getMetaDataContexts().getMetaData().getDatabase(databaseName);
- Map<String, List<String>> result = new LinkedHashMap<>();
- database.getSchemas().forEach((schemaName, schema) -> {
- for (String each : schema.getAllTableNames()) {
- if (!logicTables.contains(each)) {
- continue;
- }
- result.computeIfAbsent(schemaName, unused -> new
LinkedList<>()).add(each);
- }
- });
- log.info("getSchemaTablesMap, result={}", result);
- return result;
+ private PipelineSchemaTableUtil() {
}
/**
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index a7908548509..7ce1ec50bbd 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -61,7 +61,7 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.AddMigrationSource
import
org.apache.shardingsphere.data.pipeline.core.exception.DropMigrationSourceResourceException;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
-import org.apache.shardingsphere.data.pipeline.core.util.SchemaTableUtil;
+import
org.apache.shardingsphere.data.pipeline.core.util.PipelineSchemaTableUtil;
import
org.apache.shardingsphere.data.pipeline.spi.check.consistency.DataConsistencyCalculateAlgorithm;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
@@ -171,7 +171,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
public TaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
Map<ActualTableName, LogicTableName> tableNameMap = new
LinkedHashMap<>();
tableNameMap.put(new ActualTableName(jobConfig.getSourceTableName()),
new LogicTableName(jobConfig.getSourceTableName()));
- Map<LogicTableName, String> tableNameSchemaMap =
TableNameSchemaNameMapping.convert(SchemaTableUtil.getSchemaTablesMapFromActual(jobConfig.getSource(),
jobConfig.getSourceTableName()));
+ Map<LogicTableName, String> tableNameSchemaMap =
TableNameSchemaNameMapping.convert(jobConfig.getSchemaTablesMap());
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(tableNameSchemaMap);
DumperConfiguration dumperConfig =
createDumperConfiguration(jobConfig.getJobId(),
jobConfig.getSourceDataSourceName(), jobConfig.getSource(), tableNameMap,
tableNameSchemaNameMapping);
// TODO now shardingColumnsMap always empty,
@@ -493,7 +493,7 @@ public final class MigrationJobAPIImpl extends
AbstractPipelineJobAPIImpl implem
result.setTargetDatabaseType(targetPipelineDataSource.getDatabaseType().getType());
result.setTargetDatabaseName(targetDatabaseName);
result.setTargetTableName(parameter.getTargetTableName());
-
result.setSchemaTablesMap(SchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
+
result.setSchemaTablesMap(PipelineSchemaTableUtil.getSchemaTablesMapFromActual(PIPELINE_DATA_SOURCE_CONFIG_SWAPPER.swapToObject(sourcePipelineDataSourceConfiguration),
parameter.getSourceTableName()));
extendYamlJobConfiguration(result);
MigrationJobConfiguration jobConfiguration = new
YamlMigrationJobConfigurationSwapper().swapToObject(result);
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
index b0dcc2f8433..76a0b4e5473 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/base/BaseITCase.java
@@ -283,6 +283,7 @@ public abstract class BaseITCase {
}
protected List<Map<String, Object>> queryForListWithLog(final String sql) {
+ log.info("proxy query for list:{}", sql);
int retryNumber = 0;
while (retryNumber <= 3) {
try (Connection connection = proxyDataSource.getConnection()) {
@@ -316,15 +317,19 @@ public abstract class BaseITCase {
getIncreaseTaskThread().start();
}
- protected void stopMigration(final String jobId) {
+ protected void stopMigrationByJobId(final String jobId) {
proxyExecuteWithLog(String.format("STOP MIGRATION '%s'", jobId), 5);
}
// TODO reopen later
- protected void startMigrationByJob(final String jobId) {
+ protected void startMigrationByJobId(final String jobId) {
proxyExecuteWithLog(String.format("START MIGRATION '%s'", jobId), 10);
}
+ protected void cleanMigrationByJobId(final String jobId) {
+ proxyExecuteWithLog(String.format("CLEAN MIGRATION '%s'", jobId), 1);
+ }
+
protected List<String> listJobId() {
List<Map<String, Object>> jobList = queryForListWithLog("SHOW
MIGRATION LIST");
return jobList.stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
@@ -357,7 +362,8 @@ public abstract class BaseITCase {
}
}
- protected void assertGreaterThanInitTableInitRows(final int tableInitRows,
final String schema) {
+ protected void assertGreaterThanOrderTableInitRows(final int
tableInitRows, final String schema) {
+ proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
String countSQL = StringUtils.isBlank(schema) ? "SELECT COUNT(*) as
count FROM t_order" : String.format("SELECT COUNT(*) as count FROM %s.t_order",
schema);
Map<String, Object> actual = queryForListWithLog(countSQL).get(0);
assertTrue("actual count " + actual.get("count"),
Integer.parseInt(actual.get("count").toString()) > tableInitRows);
@@ -376,7 +382,6 @@ public abstract class BaseITCase {
}
boolean secondCheckJobResult = checkJobIncrementTaskFinished(jobId);
log.info("second check job result: {}", secondCheckJobResult);
- proxyExecuteWithLog("REFRESH TABLE METADATA", 2);
List<Map<String, Object>> checkScalingResults =
queryForListWithLog(String.format("CHECK MIGRATION '%s' BY TYPE
(NAME='DATA_MATCH')", jobId));
log.info("checkScalingResults: {}", checkScalingResults);
for (Map<String, Object> entry : checkScalingResults) {
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
index d7a8fa4f61e..cbeef5e5d5d 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/MySQLMigrationGeneralIT.java
@@ -37,6 +37,9 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
/**
* General scaling test case, includes multiple cases.
*/
@@ -84,6 +87,12 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
startMigrationOrderItem();
checkOrderMigration(keyGenerateAlgorithm, jdbcTemplate);
checkOrderItemMigration();
+ for (String each : listJobId()) {
+ cleanMigrationByJobId(each);
+ }
+ List<String> lastJobIds = listJobId();
+ assertThat(lastJobIds.size(), is(0));
+ assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "");
}
private void checkOrderMigration(final KeyGenerateAlgorithm
keyGenerateAlgorithm, final JdbcTemplate jdbcTemplate) {
@@ -92,7 +101,7 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
String jobId = getJobIdByTableName("t_order");
waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
- assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "");
+ stopMigrationByJobId(jobId);
}
private void checkOrderItemMigration() {
@@ -100,5 +109,6 @@ public final class MySQLMigrationGeneralIT extends
BaseExtraSQLITCase {
String jobId = getJobIdByTableName("t_order_item");
waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
+ stopMigrationByJobId(jobId);
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
index 6cd69231d09..4d15ee9af87 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/general/PostgreSQLMigrationGeneralIT.java
@@ -37,6 +37,9 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
/**
* PostgreSQL general scaling test case. include openGauss type, same process.
*/
@@ -86,6 +89,12 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
jdbcTemplate.batchUpdate(getExtraSQLCommand().getFullInsertOrderItem(),
dataPair.getRight());
checkOrderMigration(jdbcTemplate);
checkOrderItemMigration();
+ for (String each : listJobId()) {
+ cleanMigrationByJobId(each);
+ }
+ List<String> lastJobIds = listJobId();
+ assertThat(lastJobIds.size(), is(0));
+ assertGreaterThanOrderTableInitRows(TABLE_INIT_ROW_COUNT, "test");
}
private void checkOrderMigration(final JdbcTemplate jdbcTemplate) {
@@ -94,7 +103,7 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
String jobId = getJobIdByTableName("t_order");
waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
- assertGreaterThanInitTableInitRows(TABLE_INIT_ROW_COUNT, "test");
+ stopMigrationByJobId(jobId);
}
private void checkOrderItemMigration() {
@@ -102,5 +111,6 @@ public final class PostgreSQLMigrationGeneralIT extends
BaseExtraSQLITCase {
String jobId = getJobIdByTableName("t_order_item");
waitMigrationFinished(jobId);
assertCheckScalingSuccess(jobId);
+ stopMigrationByJobId(jobId);
}
}
diff --git
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
index fbe0c01c3ce..cf751ea098d 100644
---
a/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
+++
b/shardingsphere-test/shardingsphere-integration-test/shardingsphere-integration-test-scaling/src/test/java/org/apache/shardingsphere/integration/data/pipeline/cases/primarykey/TextPrimaryKeyMigrationIT.java
@@ -75,7 +75,7 @@ public class TextPrimaryKeyMigrationIT extends
BaseExtraSQLITCase {
startMigrationOrder();
String jobId = listJobId().get(0);
waitMigrationFinished(jobId);
- stopMigration(jobId);
+ stopMigrationByJobId(jobId);
assertCheckScalingSuccess(jobId);
}