This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2212b39c0a1 Improve PipelineContainerComposer cleanUpPipelineJobs and
sleepSeconds (#37768)
2212b39c0a1 is described below
commit 2212b39c0a11ce5bcaa29399ecaa52766353cb8f
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 19 14:52:23 2026 +0800
Improve PipelineContainerComposer cleanUpPipelineJobs and sleepSeconds
(#37768)
* Simplify PipelineE2EDistSQLFacade constructor
* Improve PipelineContainerComposer.cleanUpPipelineJobsWithType, in case of
job status is not available
* Improve PipelineContainerComposer.sleepSeconds of Awaitility
---
.../pipeline/cases/PipelineContainerComposer.java | 19 +++++++------------
.../pipeline/util/PipelineE2EDistSQLFacade.java | 5 ++---
2 files changed, 9 insertions(+), 15 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 28f0bdfc838..975902d9144 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -171,9 +171,10 @@ public final class PipelineContainerComposer implements
AutoCloseable {
String jobTypeName = jobType.getType();
for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
String jobId = each.get("id").toString();
- Map<String, Object> jobInfo =
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName,
jobId)).get(0);
- String status = jobInfo.get("status").toString();
+ List<Map<String, Object>> jobInfos =
queryForListWithLog(String.format("SHOW %s STATUS '%s'", jobTypeName, jobId));
+ String status = !jobInfos.isEmpty() ?
jobInfos.get(0).get("status").toString() : "";
String sql = String.format("%s %s '%s'", getOperationType(jobType,
status), jobTypeName, jobId);
+ log.info("Clean up job, sql: {}", sql);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
}
@@ -181,17 +182,10 @@ public final class PipelineContainerComposer implements
AutoCloseable {
}
private String getOperationType(final PipelineJobType<?> jobType, final
String status) {
- if (JobStatus.FINISHED.name().equals(status)) {
- return isSupportCommit(jobType) ? "COMMIT" : "DROP";
- }
- return isSupportRollback(jobType) ? "ROLLBACK" : "DROP";
- }
-
- private boolean isSupportCommit(final PipelineJobType<?> jobType) {
- return !(jobType instanceof CDCJobType);
+ return isSupportCommitRollback(jobType) ?
(JobStatus.FINISHED.name().equals(status) ? "COMMIT" : "ROLLBACK") : "DROP";
}
- private boolean isSupportRollback(final PipelineJobType<?> jobType) {
+ private boolean isSupportCommitRollback(final PipelineJobType<?> jobType) {
return !(jobType instanceof CDCJobType);
}
@@ -246,7 +240,8 @@ public final class PipelineContainerComposer implements
AutoCloseable {
if (seconds <= 0) {
return;
}
- Awaitility.await().pollDelay(seconds, TimeUnit.SECONDS).until(() ->
true);
+ // Awaitility: WaitConstraint defaultWaitConstraint =
AtMostWaitConstraint.TEN_SECONDS
+ Awaitility.waitAtMost(seconds + 1,
TimeUnit.SECONDS).pollDelay(seconds, TimeUnit.SECONDS).until(() -> true);
}
/**
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 27081b183fa..2b7c1b5c693 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import org.awaitility.Awaitility;
@@ -29,7 +28,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-@RequiredArgsConstructor
@Getter
public final class PipelineE2EDistSQLFacade {
@@ -43,7 +41,8 @@ public final class PipelineE2EDistSQLFacade {
private final String jobTypeName;
public PipelineE2EDistSQLFacade(final PipelineContainerComposer
containerComposer, final PipelineJobType<?> jobType) {
- this(containerComposer, jobType.getType());
+ this.containerComposer = containerComposer;
+ jobTypeName = jobType.getType();
}
/**