This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2495ba196 Minor refactoring on PipelineDistributedBarrier (#37729)
ca2495ba196 is described below

commit ca2495ba19690c4ca07abcf3acdb22b869dfe5e3
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 13 18:01:36 2026 +0800

    Minor refactoring on PipelineDistributedBarrier (#37729)
    
    * Add log in PipelineDistributedBarrier
    
    * Improve InnerCountDownLatchHolder
    
    * Improve test
---
 .../core/util/PipelineDistributedBarrier.java      | 36 +++++++++++++++++-----
 .../pipeline/cases/PipelineContainerComposer.java  |  8 +++--
 .../core/util/PipelineDistributedBarrierTest.java  |  2 +-
 3 files changed, 34 insertions(+), 12 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
index 4d1938f82e7..9339412584b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -76,8 +76,9 @@ public final class PipelineDistributedBarrier {
      * @param totalCount total count
      */
     public void register(final String barrierPath, final int totalCount) {
+        log.info("Register, barrier path: {}, total count: {}", barrierPath, 
totalCount);
         getRepository().persist(barrierPath, "");
-        countDownLatchHolders.computeIfAbsent(barrierPath, key -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+        countDownLatchHolders.computeIfAbsent(barrierPath, key -> new 
InnerCountDownLatchHolder(totalCount));
     }
     
     /**
@@ -88,8 +89,10 @@ public final class PipelineDistributedBarrier {
      */
     public void persistEphemeralChildrenNode(final String barrierPath, final 
int shardingItem) {
         if (!getRepository().isExisted(barrierPath)) {
+            log.info("Persist ephemeral children node, barrier path not 
existed: {}, sharding item: {}", barrierPath, shardingItem);
             return;
         }
+        log.info("Persist ephemeral children node, barrier path: {}, sharding 
item: {}", barrierPath, shardingItem);
         String key = String.join("/", barrierPath, 
Integer.toString(shardingItem));
         getRepository().delete(key);
         getRepository().persistEphemeral(key, "");
@@ -101,10 +104,11 @@ public final class PipelineDistributedBarrier {
      * @param barrierPath barrier path
      */
     public void unregister(final String barrierPath) {
+        log.info("Unregister, barrier path: {}", barrierPath);
         getRepository().delete(barrierPath);
         InnerCountDownLatchHolder holder = 
countDownLatchHolders.remove(barrierPath);
         if (null != holder) {
-            holder.getCountDownLatch().countDown();
+            holder.releaseLatch();
         }
     }
     
@@ -119,12 +123,15 @@ public final class PipelineDistributedBarrier {
     public boolean await(final String barrierPath, final long timeout, final 
TimeUnit timeUnit) {
         InnerCountDownLatchHolder holder = 
countDownLatchHolders.get(barrierPath);
         if (null == holder) {
+            log.info("Await failed, barrier path not registered: {}", 
barrierPath);
             return false;
         }
         try {
-            boolean result = holder.getCountDownLatch().await(timeout, 
timeUnit);
+            boolean result = holder.awaitLatchReleasing(timeout, timeUnit);
             if (!result) {
-                log.info("await timeout, barrier path: {}, timeout: {}, time 
unit: {}", barrierPath, timeout, timeUnit);
+                log.warn("Await timeout, barrier path: {}, timeout: {}, time 
unit: {}", barrierPath, timeout, timeUnit);
+            } else {
+                log.info("Await success, barrier path: {}", barrierPath);
             }
             return result;
         } catch (final InterruptedException ignored) {
@@ -140,22 +147,35 @@ public final class PipelineDistributedBarrier {
      */
     public void notifyChildrenNodeCountCheck(final String nodePath) {
         if (Strings.isNullOrEmpty(nodePath)) {
+            log.info("Notify children node count check, node path is null or 
empty");
             return;
         }
         String barrierPath = nodePath.substring(0, nodePath.lastIndexOf('/'));
         InnerCountDownLatchHolder holder = 
countDownLatchHolders.get(barrierPath);
-        if (null != holder && 
getRepository().getChildrenKeys(barrierPath).size() == holder.getTotalCount()) {
-            holder.getCountDownLatch().countDown();
+        if (null != holder) {
+            int childrenSize = 
getRepository().getChildrenKeys(barrierPath).size();
+            log.info("Notify children node count check, barrier path: {}, 
children size: {}, total count: {}", barrierPath, childrenSize, 
holder.getTotalCount());
+            if (childrenSize == holder.getTotalCount()) {
+                holder.releaseLatch();
+            }
         }
     }
     
     @RequiredArgsConstructor
-    @Getter
     private static final class InnerCountDownLatchHolder {
         
+        @Getter
         private final int totalCount;
         
-        private final CountDownLatch countDownLatch;
+        private final CountDownLatch countDownLatch = new CountDownLatch(1);
+        
+        public boolean awaitLatchReleasing(final long timeout, final TimeUnit 
timeUnit) throws InterruptedException {
+            return countDownLatch.await(timeout, timeUnit);
+        }
+        
+        public void releaseLatch() {
+            countDownLatch.countDown();
+        }
     }
     
     private class PersistRepositoryLazyInitializer extends 
LazyInitializer<ClusterPersistRepository> {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 9194cb29e0b..440ca60a805 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -186,10 +186,12 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     }
     
     private List<Map<String, Object>> queryJobs(final Connection connection, 
final String jobTypeName) {
+        String sql = String.format("SHOW %s LIST", jobTypeName);
         try (Statement statement = connection.createStatement()) {
-            return 
transformResultSetToList(statement.executeQuery(String.format("SHOW %s LIST", 
jobTypeName)));
+            log.info("Execute SQL: {}", sql);
+            return transformResultSetToList(statement.executeQuery(sql));
         } catch (final SQLException ex) {
-            log.warn("{} execute failed, message {}", String.format("SHOW %s 
LIST", jobTypeName), ex.getMessage());
+            log.warn("{} execute failed, message {}", sql, ex.getMessage());
             return Collections.emptyList();
         }
     }
@@ -200,10 +202,10 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         }
         try (Statement statement = connection.createStatement()) {
             statement.execute(String.format("DROP DATABASE IF EXISTS %s", 
PROXY_DATABASE));
-            Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
         } catch (final SQLException ex) {
             log.warn("Drop proxy database failed, error={}", ex.getMessage());
         }
+        Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
     }
     
     private void createProxyDatabase(final Connection connection) throws 
SQLException {
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
index 5422ce2f32c..e19beb01328 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -63,7 +63,7 @@ class PipelineDistributedBarrierTest {
         PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance(contextKey);
         String barrierEnablePath = 
PipelineMetaDataNode.getJobBarrierEnablePath(jobId);
         instance.register(barrierEnablePath, 1);
-        instance.persistEphemeralChildrenNode(barrierEnablePath, 1);
+        instance.persistEphemeralChildrenNode(barrierEnablePath, 0);
         boolean actual = instance.await(barrierEnablePath, 1L, 
TimeUnit.SECONDS);
         assertFalse(actual);
         instance.notifyChildrenNodeCountCheck(barrierEnablePath + "/0");

Reply via email to