This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2212b39c0a1 Improve PipelineContainerComposer cleanUpPipelineJobs and 
sleepSeconds (#37768)
2212b39c0a1 is described below

commit 2212b39c0a11ce5bcaa29399ecaa52766353cb8f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 19 14:52:23 2026 +0800

    Improve PipelineContainerComposer cleanUpPipelineJobs and sleepSeconds 
(#37768)
    
    * Simplify PipelineE2EDistSQLFacade constructor
    
    * Improve PipelineContainerComposer.cleanUpPipelineJobsWithType, in case of 
job status is not available
    
    * Improve PipelineContainerComposer.sleepSeconds of Awaitility
---
 .../pipeline/cases/PipelineContainerComposer.java     | 19 +++++++------------
 .../pipeline/util/PipelineE2EDistSQLFacade.java       |  5 ++---
 2 files changed, 9 insertions(+), 15 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 28f0bdfc838..975902d9144 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -171,9 +171,10 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         String jobTypeName = jobType.getType();
         for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
             String jobId = each.get("id").toString();
-            Map<String, Object> jobInfo = 
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, 
jobId)).get(0);
-            String status = jobInfo.get("status").toString();
+            List<Map<String, Object>> jobInfos = 
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId));
+            String status = !jobInfos.isEmpty() ? 
jobInfos.get(0).get("status").toString() : "";
             String sql = String.format("%s %s '%s'", getOperationType(jobType, 
status), jobTypeName, jobId);
+            log.info("Clean up job, sql: {}", sql);
             try (Statement statement = connection.createStatement()) {
                 statement.execute(sql);
             }
@@ -181,17 +182,10 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     }
     
     private String getOperationType(final PipelineJobType<?> jobType, final 
String status) {
-        if (JobStatus.FINISHED.name().equals(status)) {
-            return isSupportCommit(jobType) ? "COMMIT" : "DROP";
-        }
-        return isSupportRollback(jobType) ? "ROLLBACK" : "DROP";
-    }
-    
-    private boolean isSupportCommit(final PipelineJobType<?> jobType) {
-        return !(jobType instanceof CDCJobType);
+        return isSupportCommitRollback(jobType) ? 
(JobStatus.FINISHED.name().equals(status) ? "COMMIT" : "ROLLBACK") : "DROP";
     }
     
-    private boolean isSupportRollback(final PipelineJobType<?> jobType) {
+    private boolean isSupportCommitRollback(final PipelineJobType<?> jobType) {
         return !(jobType instanceof CDCJobType);
     }
     
@@ -246,7 +240,8 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         if (seconds <= 0) {
             return;
         }
-        Awaitility.await().pollDelay(seconds, TimeUnit.SECONDS).until(() -> 
true);
+        // Awaitility: WaitConstraint defaultWaitConstraint = 
AtMostWaitConstraint.TEN_SECONDS
+        Awaitility.waitAtMost(seconds + 1, 
TimeUnit.SECONDS).pollDelay(seconds, TimeUnit.SECONDS).until(() -> true);
     }
     
     /**
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 27081b183fa..2b7c1b5c693 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import org.awaitility.Awaitility;
@@ -29,7 +28,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-@RequiredArgsConstructor
 @Getter
 public final class PipelineE2EDistSQLFacade {
     
@@ -43,7 +41,8 @@ public final class PipelineE2EDistSQLFacade {
     private final String jobTypeName;
     
     public PipelineE2EDistSQLFacade(final PipelineContainerComposer 
containerComposer, final PipelineJobType<?> jobType) {
-        this(containerComposer, jobType.getType());
+        this.containerComposer = containerComposer;
+        jobTypeName = jobType.getType();
     }
     
     /**

Reply via email to