This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ca2495ba196 Minor refactoring on PipelineDistributedBarrier (#37729)
ca2495ba196 is described below
commit ca2495ba19690c4ca07abcf3acdb22b869dfe5e3
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 13 18:01:36 2026 +0800
Minor refactoring on PipelineDistributedBarrier (#37729)
* Add log in PipelineDistributedBarrier
* Improve InnerCountDownLatchHolder
* Improve test
---
.../core/util/PipelineDistributedBarrier.java | 36 +++++++++++++++++-----
.../pipeline/cases/PipelineContainerComposer.java | 8 +++--
.../core/util/PipelineDistributedBarrierTest.java | 2 +-
3 files changed, 34 insertions(+), 12 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 4d1938f82e7..9339412584b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -76,8 +76,9 @@ public final class PipelineDistributedBarrier {
* @param totalCount total count
*/
public void register(final String barrierPath, final int totalCount) {
+ log.info("Register, barrier path: {}, total count: {}", barrierPath,
totalCount);
getRepository().persist(barrierPath, "");
- countDownLatchHolders.computeIfAbsent(barrierPath, key -> new
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+ countDownLatchHolders.computeIfAbsent(barrierPath, key -> new
InnerCountDownLatchHolder(totalCount));
}
/**
@@ -88,8 +89,10 @@ public final class PipelineDistributedBarrier {
*/
public void persistEphemeralChildrenNode(final String barrierPath, final
int shardingItem) {
if (!getRepository().isExisted(barrierPath)) {
+ log.info("Persist ephemeral children node, barrier path not
existed: {}, sharding item: {}", barrierPath, shardingItem);
return;
}
+ log.info("Persist ephemeral children node, barrier path: {}, sharding
item: {}", barrierPath, shardingItem);
String key = String.join("/", barrierPath,
Integer.toString(shardingItem));
getRepository().delete(key);
getRepository().persistEphemeral(key, "");
@@ -101,10 +104,11 @@ public final class PipelineDistributedBarrier {
* @param barrierPath barrier path
*/
public void unregister(final String barrierPath) {
+ log.info("Unregister, barrier path: {}", barrierPath);
getRepository().delete(barrierPath);
InnerCountDownLatchHolder holder =
countDownLatchHolders.remove(barrierPath);
if (null != holder) {
- holder.getCountDownLatch().countDown();
+ holder.releaseLatch();
}
}
@@ -119,12 +123,15 @@ public final class PipelineDistributedBarrier {
public boolean await(final String barrierPath, final long timeout, final
TimeUnit timeUnit) {
InnerCountDownLatchHolder holder =
countDownLatchHolders.get(barrierPath);
if (null == holder) {
+ log.info("Await failed, barrier path not registered: {}",
barrierPath);
return false;
}
try {
- boolean result = holder.getCountDownLatch().await(timeout,
timeUnit);
+ boolean result = holder.awaitLatchReleasing(timeout, timeUnit);
if (!result) {
- log.info("await timeout, barrier path: {}, timeout: {}, time
unit: {}", barrierPath, timeout, timeUnit);
+ log.warn("Await timeout, barrier path: {}, timeout: {}, time
unit: {}", barrierPath, timeout, timeUnit);
+ } else {
+ log.info("Await success, barrier path: {}", barrierPath);
}
return result;
} catch (final InterruptedException ignored) {
@@ -140,22 +147,35 @@ public final class PipelineDistributedBarrier {
*/
public void notifyChildrenNodeCountCheck(final String nodePath) {
if (Strings.isNullOrEmpty(nodePath)) {
+ log.info("Notify children node count check, node path is null or
empty");
return;
}
String barrierPath = nodePath.substring(0, nodePath.lastIndexOf('/'));
InnerCountDownLatchHolder holder =
countDownLatchHolders.get(barrierPath);
- if (null != holder &&
getRepository().getChildrenKeys(barrierPath).size() == holder.getTotalCount()) {
- holder.getCountDownLatch().countDown();
+ if (null != holder) {
+ int childrenSize =
getRepository().getChildrenKeys(barrierPath).size();
+ log.info("Notify children node count check, barrier path: {},
children size: {}, total count: {}", barrierPath, childrenSize,
holder.getTotalCount());
+ if (childrenSize == holder.getTotalCount()) {
+ holder.releaseLatch();
+ }
}
}
@RequiredArgsConstructor
- @Getter
private static final class InnerCountDownLatchHolder {
+ @Getter
private final int totalCount;
- private final CountDownLatch countDownLatch;
+ private final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ public boolean awaitLatchReleasing(final long timeout, final TimeUnit
timeUnit) throws InterruptedException {
+ return countDownLatch.await(timeout, timeUnit);
+ }
+
+ public void releaseLatch() {
+ countDownLatch.countDown();
+ }
}
private class PersistRepositoryLazyInitializer extends
LazyInitializer<ClusterPersistRepository> {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 9194cb29e0b..440ca60a805 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -186,10 +186,12 @@ public final class PipelineContainerComposer implements
AutoCloseable {
}
private List<Map<String, Object>> queryJobs(final Connection connection,
final String jobTypeName) {
+ String sql = String.format("SHOW %s LIST", jobTypeName);
try (Statement statement = connection.createStatement()) {
- return
transformResultSetToList(statement.executeQuery(String.format("SHOW %s LIST",
jobTypeName)));
+ log.info("Execute SQL: {}", sql);
+ return transformResultSetToList(statement.executeQuery(sql));
} catch (final SQLException ex) {
- log.warn("{} execute failed, message {}", String.format("SHOW %s
LIST", jobTypeName), ex.getMessage());
+ log.warn("{} execute failed, message {}", sql, ex.getMessage());
return Collections.emptyList();
}
}
@@ -200,10 +202,10 @@ public final class PipelineContainerComposer implements
AutoCloseable {
}
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("DROP DATABASE IF EXISTS %s",
PROXY_DATABASE));
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
} catch (final SQLException ex) {
log.warn("Drop proxy database failed, error={}", ex.getMessage());
}
+ Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
}
private void createProxyDatabase(final Connection connection) throws
SQLException {
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 5422ce2f32c..e19beb01328 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -63,7 +63,7 @@ class PipelineDistributedBarrierTest {
PipelineDistributedBarrier instance =
PipelineDistributedBarrier.getInstance(contextKey);
String barrierEnablePath =
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
instance.register(barrierEnablePath, 1);
- instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
+ instance.persistEphemeralChildrenNode(barrierEnablePath, 0);
boolean actual = instance.await(barrierEnablePath, 1L,
TimeUnit.SECONDS);
assertFalse(actual);
instance.notifyChildrenNodeCountCheck(barrierEnablePath + "/0");