This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3a3dc2ce4e1 Improve consistency job finished check at pipeline E2E
(#26009)
3a3dc2ce4e1 is described below
commit 3a3dc2ce4e14fd394de103150c013b3cb4708759
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Jun 2 16:05:23 2023 +0800
Improve consistency job finished check at pipeline E2E (#26009)
* Wait consistency check job really finished at E2E
* Use Awaitility replace Thread sleep
* Improve E2E increment task finished check flag
---
.../data/pipeline/cases/PipelineContainerComposer.java | 9 +++------
.../pipeline/cases/migration/AbstractMigrationE2EIT.java | 15 ++++++++++-----
.../migration/general/MySQLMigrationGeneralE2EIT.java | 5 ++++-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 13 ++++++++-----
4 files changed, 25 insertions(+), 17 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 83bc0b6f8fa..bde28c399e8 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -377,13 +377,14 @@ public final class PipelineContainerComposer implements
AutoCloseable {
* @throws RuntimeException runtime exception
*/
public List<Map<String, Object>> queryForListWithLog(final String sql) {
+ log.info("Query SQL: {}", sql);
int retryNumber = 0;
while (retryNumber <= 3) {
try (Connection connection = proxyDataSource.getConnection()) {
ResultSet resultSet =
connection.createStatement().executeQuery(sql);
return transformResultSetToList(resultSet);
} catch (final SQLException ex) {
- log.error("Data access error.", ex);
+ log.error("Data access error: ", ex);
}
Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() ->
true);
retryNumber++;
@@ -427,13 +428,9 @@ public final class PipelineContainerComposer implements
AutoCloseable {
*
* @param distSQL dist SQL
* @return result
- * @throws InterruptedException interrupted exception
*/
// TODO use DAO to query via DistSQL
- public List<Map<String, Object>> waitIncrementTaskFinished(final String
distSQL) throws InterruptedException {
- if (null != increaseTaskThread) {
- TimeUnit.SECONDS.timedJoin(increaseTaskThread, 30);
- }
+ public List<Map<String, Object>> waitIncrementTaskFinished(final String
distSQL) {
for (int i = 0; i < 10; i++) {
List<Map<String, Object>> listJobStatus =
queryForListWithLog(distSQL);
log.info("show status result: {}", listJobStatus);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
index a6554ee7580..eecce00fac5 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -97,7 +98,8 @@ public abstract class AbstractMigrationE2EIT {
}
protected void createTargetOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
-
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
2);
+
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
0);
+ Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L,
TimeUnit.MILLISECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
protected void createTargetOrderTableEncryptRule(final
PipelineContainerComposer containerComposer) throws SQLException {
@@ -105,7 +107,9 @@ public abstract class AbstractMigrationE2EIT {
}
protected void createTargetOrderItemTableRule(final
PipelineContainerComposer containerComposer) throws SQLException {
-
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
2);
+
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
0);
+ Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(500L,
TimeUnit.MILLISECONDS)
+ .until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order_item").isEmpty());
}
protected void startMigration(final PipelineContainerComposer
containerComposer, final String sourceTableName, final String targetTableName)
throws SQLException {
@@ -146,17 +150,18 @@ public abstract class AbstractMigrationE2EIT {
containerComposer.proxyExecuteWithLog(String.format("CHECK MIGRATION
'%s' BY TYPE (NAME='%s')", jobId, algorithmType), 0);
// TODO Need to add after the stop then to start, can continue the
consistency check from the previous progress
List<Map<String, Object>> resultList = Collections.emptyList();
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < 30; i++) {
resultList =
containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK
STATUS '%s'", jobId));
if (resultList.isEmpty()) {
Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() ->
true);
continue;
}
List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
- if (checkEndTimeList.size() == resultList.size()) {
+ Set<String> finishedPercentages = resultList.stream().map(map ->
map.get("finished_percentage").toString()).collect(Collectors.toSet());
+ if (checkEndTimeList.size() == resultList.size() && 1 ==
finishedPercentages.size() && finishedPercentages.contains("100")) {
break;
} else {
- Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() ->
true);
+ Awaitility.await().pollDelay(1L, TimeUnit.SECONDS).until(() ->
true);
}
}
log.info("check job results: {}", resultList);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index bddf7db7799..52703047d55 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -79,6 +79,9 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
+
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+ containerComposer.assertProxyOrderRecordExist("t_order", 10000);
assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH");
String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH");
@@ -94,7 +97,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException, InterruptedException {
+ private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
List<Map<String, Object>> jobStatus =
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index da030243900..4b79eca1492 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -79,9 +79,12 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> listJobId(containerComposer).size() > 0);
String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- containerComposer.startIncrementTask(new E2EIncrementalTask(
- containerComposer.getSourceDataSource(), String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME),
- new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
+ String schemaTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
+ containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), schemaTableName,
new SnowflakeKeyGenerateAlgorithm(),
+ containerComposer.getDatabaseType(), 20));
+
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", schemaTableName));
+ containerComposer.assertProxyOrderRecordExist(schemaTableName,
10000);
checkOrderMigration(containerComposer, jobId);
checkOrderItemMigration(containerComposer);
for (String each : listJobId(containerComposer)) {
@@ -94,7 +97,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException,
InterruptedException {
+ private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
stopMigrationByJobId(containerComposer, jobId);
// must refresh firstly, otherwise proxy can't get schema and table
info
@@ -109,7 +112,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
- private void checkOrderItemMigration(final PipelineContainerComposer
containerComposer) throws SQLException, InterruptedException {
+ private void checkOrderItemMigration(final PipelineContainerComposer
containerComposer) throws SQLException {
startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
String jobId = getJobIdByTableName(containerComposer,
"ds_0.test.t_order_item");
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));