This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ad26f88dc1 Extract waitJobPrepareSuccess and waitJobStatusReached to 
PipelineE2EDistSQLFacade (#37785)
7ad26f88dc1 is described below

commit 7ad26f88dc19b64894d7778a538112d870e9bcd6
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 20 11:19:30 2026 +0800

    Extract waitJobPrepareSuccess and waitJobStatusReached to 
PipelineE2EDistSQLFacade (#37785)
    
    * Move PipelineContainerComposer.waitJobPrepareSuccess to 
PipelineE2EDistSQLFacade
    
    * Simplify PipelineE2EDistSQLFacade.waitJobPrepareSuccess param
    
    * Move PipelineContainerComposer.waitJobStatusReached to 
PipelineE2EDistSQLFacade
    
    * Simplify PipelineE2EDistSQLFacade.waitJobStatusReached param
    
    * Unify PipelineE2EDistSQLFacade var naming
---
 .../pipeline/cases/PipelineContainerComposer.java  | 39 ---------------
 .../general/MySQLMigrationGeneralE2EIT.java        |  2 +-
 .../general/MySQLTimeTypesMigrationE2EIT.java      |  2 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  4 +-
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  2 +-
 .../migration/general/RulesMigrationE2EIT.java     |  2 +-
 .../primarykey/IndexesMigrationE2EIT.java          |  2 +-
 .../primarykey/MariaDBMigrationE2EIT.java          |  2 +-
 .../pipeline/util/PipelineE2EDistSQLFacade.java    | 56 ++++++++++++++++++----
 9 files changed, 56 insertions(+), 55 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 0c81d57effa..a9349dad669 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -72,7 +72,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -434,44 +433,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         
Awaitility.await().timeout(Duration.ofMinutes(1L)).pollDelay(Math.max(sleepSeconds,
 0L), TimeUnit.SECONDS).until(() -> true);
     }
     
-    /**
-     * Wait job prepare success.
-     *
-     * @param distSQL dist SQL
-     */
-    public void waitJobPrepareSuccess(final String distSQL) {
-        for (int i = 0; i < 5; i++) {
-            List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
-            Set<String> statusSet = jobStatus.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toSet());
-            if (statusSet.contains(JobStatus.PREPARING.name()) || 
statusSet.contains(JobStatus.RUNNING.name())) {
-                sleepSeconds(2);
-                continue;
-            }
-            break;
-        }
-    }
-    
-    /**
-     * Wait job status reached.
-     *
-     * @param distSQL dist SQL
-     * @param jobStatus job status
-     * @param maxSleepSeconds max sleep seconds
-     * @throws IllegalStateException if job status not reached
-     */
-    public void waitJobStatusReached(final String distSQL, final JobStatus 
jobStatus, final int maxSleepSeconds) {
-        for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds % 
2 ? 0 : 1); i < count; i++) {
-            List<Map<String, Object>> resultList = 
queryForListWithLog(distSQL);
-            log.info("Job status result: {}", resultList);
-            Set<String> statusSet = resultList.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toSet());
-            if (statusSet.stream().allMatch(each -> 
each.equals(jobStatus.name()))) {
-                return;
-            }
-            sleepSeconds(2);
-        }
-        throw new IllegalStateException("Job status not reached: " + 
jobStatus);
-    }
-    
     /**
      * Query for list with log.
      *
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 51609b79c2e..d80cbbb2790 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -80,7 +80,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             startMigration(containerComposer, "t_order_item", "t_order_item");
             String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." + 
SOURCE_TABLE_NAME);
-            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", orderJobId));
+            distSQLFacade.waitJobPrepareSuccess(orderJobId);
             containerComposer.startIncrementTask(
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index 2e5be76e0cf..e3cf1a19b96 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -55,7 +55,7 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             startMigration(containerComposer, "time_e2e", "time_e2e");
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             String jobId = distSQLFacade.listJobIds().get(0);
-            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitJobPrepareSuccess(jobId);
             insertOneRecordWithZeroValue(containerComposer, 2);
             distSQLFacade.waitIncrementTaskFinished(jobId);
             distSQLFacade.loadAllSingleTables();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index f841cde414b..a8f58477883 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -85,7 +85,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." + 
SOURCE_TABLE_NAME);
-            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitJobPrepareSuccess(jobId);
             String qualifiedTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, 
new SnowflakeKeyGenerateAlgorithm(),
                     containerComposer.getDatabaseType(), 20));
@@ -116,7 +116,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     private void checkOrderItemMigration(final PipelineE2EDistSQLFacade 
distSQLFacade) throws SQLException {
         String jobId = 
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
         PipelineContainerComposer containerComposer = 
distSQLFacade.getContainerComposer();
-        containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION 
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+        distSQLFacade.waitJobStatusReached(jobId, 
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index 775c1fb9134..d43df9ad176 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -80,7 +80,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
-            containerComposer.waitJobStatusReached(String.format("SHOW 
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+            distSQLFacade.waitJobStatusReached(jobId, 
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
             try (Connection connection = DriverManager.getConnection(jdbcUrl, 
"postgres", "postgres")) {
                 connection.createStatement().execute(String.format("INSERT 
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, 
"incremental"));
                 connection.createStatement().execute(String.format("UPDATE 
t_order SET status='%s' WHERE order_id IN (1,2)", 
RandomStringUtils.randomAlphanumeric(10)));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 9599287c1ef..8488c46b49e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -88,7 +88,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
         String jobId = distSQLFacade.listJobIds().get(0);
-        containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
+        distSQLFacade.waitJobPrepareSuccess(jobId);
         distSQLFacade.waitIncrementTaskFinished(jobId);
         distSQLFacade.loadAllSingleTables();
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 7acd479caf6..586dae287b8 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -242,7 +242,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
 shardingColumn), 2);
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         String jobId = distSQLFacade.listJobIds().get(0);
-        containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
+        distSQLFacade.waitJobPrepareSuccess(jobId);
         DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
         incrementalTaskFn.accept(jdbcDataSource);
         distSQLFacade.waitIncrementTaskFinished(jobId);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index ab566c1ef5b..97192a5a66e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -70,7 +70,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             createTargetOrderTableRule(containerComposer);
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             String jobId = distSQLFacade.listJobIds().get(0);
-            containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitJobPrepareSuccess(jobId);
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index cd19137fff9..18ee6da4845 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineConta
 import org.awaitility.Awaitility;
 
 import java.sql.SQLException;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -165,6 +164,47 @@ public final class PipelineE2EDistSQLFacade {
         containerComposer.proxyExecuteWithLog(String.format("DROP %s %s", 
jobTypeName, jobId), 0);
     }
     
+    /**
+     * Wait job prepare success.
+     *
+     * @param jobId job id
+     */
+    public void waitJobPrepareSuccess(final String jobId) {
+        String sql = buildShowJobStatusDistSQL(jobId);
+        for (int i = 0; i < 5; i++) {
+            List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
+            log.info("Wait job prepare success, job status records: {}", 
jobStatusRecords);
+            Set<String> statusSet = jobStatusRecords.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toSet());
+            if (statusSet.contains(JobStatus.PREPARING.name()) || 
statusSet.contains(JobStatus.RUNNING.name())) {
+                containerComposer.sleepSeconds(2);
+                continue;
+            }
+            break;
+        }
+    }
+    
+    /**
+     * Wait job status reached.
+     *
+     * @param jobId job id
+     * @param jobStatus job status
+     * @param maxSleepSeconds max sleep seconds
+     * @throws IllegalStateException if job status not reached
+     */
+    public void waitJobStatusReached(final String jobId, final JobStatus 
jobStatus, final int maxSleepSeconds) {
+        String sql = buildShowJobStatusDistSQL(jobId);
+        for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds % 
2 ? 0 : 1); i < count; i++) {
+            List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
+            log.info("Wait job status reached, job status records: {}", 
jobStatusRecords);
+            List<String> statusList = jobStatusRecords.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toList());
+            if (statusList.stream().allMatch(each -> 
each.equals(jobStatus.name()))) {
+                return;
+            }
+            containerComposer.sleepSeconds(2);
+        }
+        throw new IllegalStateException("Job status not reached: " + 
jobStatus);
+    }
+    
     /**
      * Wait increment task finished.
      *
@@ -172,15 +212,15 @@ public final class PipelineE2EDistSQLFacade {
      * @return result
      */
     public List<Map<String, Object>> waitIncrementTaskFinished(final String 
jobId) {
-        String distSQL = buildShowJobStatusDistSQL(jobId);
+        String sql = buildShowJobStatusDistSQL(jobId);
         for (int i = 0; i < 10; i++) {
-            List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(distSQL);
-            log.info("Show status result: {}", jobStatusRecords);
-            Set<String> actualStatus = new HashSet<>(jobStatusRecords.size(), 
1F);
-            Collection<Integer> incrementalIdleSecondsList = new 
LinkedList<>();
+            List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
+            log.info("Wait incremental task finished, job status records: {}", 
jobStatusRecords);
+            Set<String> statusSet = new HashSet<>(jobStatusRecords.size(), 1F);
+            List<Integer> incrementalIdleSecondsList = new LinkedList<>();
             for (Map<String, Object> each : jobStatusRecords) {
                 assertTrue(Strings.isNullOrEmpty((String) 
each.get("error_message")), "error_message: `" + each.get("error_message") + 
"`");
-                actualStatus.add(each.get("status").toString());
+                statusSet.add(each.get("status").toString());
                 String incrementalIdleSeconds = (String) 
each.get("incremental_idle_seconds");
                 
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
             }
@@ -188,7 +228,7 @@ public final class PipelineE2EDistSQLFacade {
                 containerComposer.sleepSeconds(3);
                 continue;
             }
-            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+            if (1 == statusSet.size() && 
statusSet.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
                 return jobStatusRecords;
             }
         }

Reply via email to