This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7ad26f88dc1 Extract waitJobPrepareSuccess and waitJobStatusReached to
PipelineE2EDistSQLFacade (#37785)
7ad26f88dc1 is described below
commit 7ad26f88dc19b64894d7778a538112d870e9bcd6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 20 11:19:30 2026 +0800
Extract waitJobPrepareSuccess and waitJobStatusReached to
PipelineE2EDistSQLFacade (#37785)
* Move PipelineContainerComposer.waitJobPrepareSuccess to
PipelineE2EDistSQLFacade
* Simplify PipelineE2EDistSQLFacade.waitJobPrepareSuccess param
* Move PipelineContainerComposer.waitJobStatusReached to
PipelineE2EDistSQLFacade
* Simplify PipelineE2EDistSQLFacade.waitJobStatusReached param
* Unify PipelineE2EDistSQLFacade var naming
---
.../pipeline/cases/PipelineContainerComposer.java | 39 ---------------
.../general/MySQLMigrationGeneralE2EIT.java | 2 +-
.../general/MySQLTimeTypesMigrationE2EIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 4 +-
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 2 +-
.../migration/general/RulesMigrationE2EIT.java | 2 +-
.../primarykey/IndexesMigrationE2EIT.java | 2 +-
.../primarykey/MariaDBMigrationE2EIT.java | 2 +-
.../pipeline/util/PipelineE2EDistSQLFacade.java | 56 ++++++++++++++++++----
9 files changed, 56 insertions(+), 55 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 0c81d57effa..a9349dad669 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -72,7 +72,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -434,44 +433,6 @@ public final class PipelineContainerComposer implements
AutoCloseable {
Awaitility.await().timeout(Duration.ofMinutes(1L)).pollDelay(Math.max(sleepSeconds,
0L), TimeUnit.SECONDS).until(() -> true);
}
- /**
- * Wait job prepare success.
- *
- * @param distSQL dist SQL
- */
- public void waitJobPrepareSuccess(final String distSQL) {
- for (int i = 0; i < 5; i++) {
- List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
- Set<String> statusSet = jobStatus.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
- if (statusSet.contains(JobStatus.PREPARING.name()) ||
statusSet.contains(JobStatus.RUNNING.name())) {
- sleepSeconds(2);
- continue;
- }
- break;
- }
- }
-
- /**
- * Wait job status reached.
- *
- * @param distSQL dist SQL
- * @param jobStatus job status
- * @param maxSleepSeconds max sleep seconds
- * @throws IllegalStateException if job status not reached
- */
- public void waitJobStatusReached(final String distSQL, final JobStatus
jobStatus, final int maxSleepSeconds) {
- for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds %
2 ? 0 : 1); i < count; i++) {
- List<Map<String, Object>> resultList =
queryForListWithLog(distSQL);
- log.info("Job status result: {}", resultList);
- Set<String> statusSet = resultList.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
- if (statusSet.stream().allMatch(each ->
each.equals(jobStatus.name()))) {
- return;
- }
- sleepSeconds(2);
- }
- throw new IllegalStateException("Job status not reached: " +
jobStatus);
- }
-
/**
* Query for list with log.
*
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 51609b79c2e..d80cbbb2790 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -80,7 +80,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
startMigration(containerComposer, "t_order_item", "t_order_item");
String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." +
SOURCE_TABLE_NAME);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
+ distSQLFacade.waitJobPrepareSuccess(orderJobId);
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index 2e5be76e0cf..e3cf1a19b96 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -55,7 +55,7 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
startMigration(containerComposer, "time_e2e", "time_e2e");
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitJobPrepareSuccess(jobId);
insertOneRecordWithZeroValue(containerComposer, 2);
distSQLFacade.waitIncrementTaskFinished(jobId);
distSQLFacade.loadAllSingleTables();
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index f841cde414b..a8f58477883 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -85,7 +85,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." +
SOURCE_TABLE_NAME);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitJobPrepareSuccess(jobId);
String qualifiedTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName,
new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
@@ -116,7 +116,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
private void checkOrderItemMigration(final PipelineE2EDistSQLFacade
distSQLFacade) throws SQLException {
String jobId =
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
PipelineContainerComposer containerComposer =
distSQLFacade.getContainerComposer();
- containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+ distSQLFacade.waitJobStatusReached(jobId,
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index 775c1fb9134..d43df9ad176 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -80,7 +80,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitJobStatusReached(String.format("SHOW
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+ distSQLFacade.waitJobStatusReached(jobId,
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
"postgres", "postgres")) {
connection.createStatement().execute(String.format("INSERT
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1,
"incremental"));
connection.createStatement().execute(String.format("UPDATE
t_order SET status='%s' WHERE order_id IN (1,2)",
RandomStringUtils.randomAlphanumeric(10)));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 9599287c1ef..8488c46b49e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -88,7 +88,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
+ distSQLFacade.waitJobPrepareSuccess(jobId);
distSQLFacade.waitIncrementTaskFinished(jobId);
distSQLFacade.loadAllSingleTables();
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 7acd479caf6..586dae287b8 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -242,7 +242,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
shardingColumn), 2);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
+ distSQLFacade.waitJobPrepareSuccess(jobId);
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
distSQLFacade.waitIncrementTaskFinished(jobId);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index ab566c1ef5b..97192a5a66e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -70,7 +70,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
createTargetOrderTableRule(containerComposer);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitJobPrepareSuccess(jobId);
containerComposer.sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", "a1");
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index cd19137fff9..18ee6da4845 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -26,7 +26,6 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineConta
import org.awaitility.Awaitility;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
@@ -165,6 +164,47 @@ public final class PipelineE2EDistSQLFacade {
containerComposer.proxyExecuteWithLog(String.format("DROP %s %s",
jobTypeName, jobId), 0);
}
+ /**
+ * Wait job prepare success.
+ *
+ * @param jobId job id
+ */
+ public void waitJobPrepareSuccess(final String jobId) {
+ String sql = buildShowJobStatusDistSQL(jobId);
+ for (int i = 0; i < 5; i++) {
+ List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
+ log.info("Wait job prepare success, job status records: {}",
jobStatusRecords);
+ Set<String> statusSet = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
+ if (statusSet.contains(JobStatus.PREPARING.name()) ||
statusSet.contains(JobStatus.RUNNING.name())) {
+ containerComposer.sleepSeconds(2);
+ continue;
+ }
+ break;
+ }
+ }
+
+ /**
+ * Wait job status reached.
+ *
+ * @param jobId job id
+ * @param jobStatus job status
+ * @param maxSleepSeconds max sleep seconds
+ * @throws IllegalStateException if job status not reached
+ */
+ public void waitJobStatusReached(final String jobId, final JobStatus
jobStatus, final int maxSleepSeconds) {
+ String sql = buildShowJobStatusDistSQL(jobId);
+ for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds %
2 ? 0 : 1); i < count; i++) {
+ List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
+ log.info("Wait job status reached, job status records: {}",
jobStatusRecords);
+ List<String> statusList = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toList());
+ if (statusList.stream().allMatch(each ->
each.equals(jobStatus.name()))) {
+ return;
+ }
+ containerComposer.sleepSeconds(2);
+ }
+ throw new IllegalStateException("Job status not reached: " +
jobStatus);
+ }
+
/**
* Wait increment task finished.
*
@@ -172,15 +212,15 @@ public final class PipelineE2EDistSQLFacade {
* @return result
*/
public List<Map<String, Object>> waitIncrementTaskFinished(final String
jobId) {
- String distSQL = buildShowJobStatusDistSQL(jobId);
+ String sql = buildShowJobStatusDistSQL(jobId);
for (int i = 0; i < 10; i++) {
- List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(distSQL);
- log.info("Show status result: {}", jobStatusRecords);
- Set<String> actualStatus = new HashSet<>(jobStatusRecords.size(),
1F);
- Collection<Integer> incrementalIdleSecondsList = new
LinkedList<>();
+ List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
+ log.info("Wait incremental task finished, job status records: {}",
jobStatusRecords);
+ Set<String> statusSet = new HashSet<>(jobStatusRecords.size(), 1F);
+ List<Integer> incrementalIdleSecondsList = new LinkedList<>();
for (Map<String, Object> each : jobStatusRecords) {
assertTrue(Strings.isNullOrEmpty((String)
each.get("error_message")), "error_message: `" + each.get("error_message") +
"`");
- actualStatus.add(each.get("status").toString());
+ statusSet.add(each.get("status").toString());
String incrementalIdleSeconds = (String)
each.get("incremental_idle_seconds");
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
}
@@ -188,7 +228,7 @@ public final class PipelineE2EDistSQLFacade {
containerComposer.sleepSeconds(3);
continue;
}
- if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+ if (1 == statusSet.size() &&
statusSet.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
return jobStatusRecords;
}
}