This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7782e303e2c The method of start scaling and stop scaling change async 
to sync (#20222)
7782e303e2c is described below

commit 7782e303e2c3c5e957de35c8646df04abd005805
Author: Xinze Guo <[email protected]>
AuthorDate: Wed Aug 17 11:49:09 2022 +0800

    The method of start scaling and stop scaling change async to sync (#20222)
---
 .../core/api/impl/AbstractPipelineJobAPIImpl.java  |  12 ++
 .../pipeline/core/execute/PipelineJobExecutor.java |  11 +-
 .../core/metadata/node/PipelineMetaDataNode.java   |  20 +++
 .../core/util/PipelineDistributedBarrier.java      | 156 +++++++++++++++++++++
 .../pipeline/scenario/migration/MigrationJob.java  |   7 +
 .../core/util/PipelineDistributedBarrierTest.java  |  65 +++++++++
 6 files changed, 270 insertions(+), 1 deletion(-)

diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index 7b2797395a9..a9d8f53cfb1 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobNotFoun
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineVerifyFailedException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -36,6 +37,7 @@ import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Abstract pipeline job API impl.
@@ -45,6 +47,8 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
     @Override
     public final String marshalJobId(final PipelineJobId pipelineJobId) {
         return PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + 
marshalJobIdLeftPart(pipelineJobId);
@@ -85,6 +89,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     @Override
     public void startDisabledJob(final String jobId) {
         log.info("Start disabled pipeline job {}", jobId);
+        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         if (!jobConfigPOJO.isDisabled()) {
             throw new PipelineVerifyFailedException("Job is already started.");
@@ -92,16 +97,23 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
         jobConfigPOJO.setDisabled(false);
         jobConfigPOJO.getProps().remove("stop_time");
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId);
+        pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
     
     @Override
     public void stop(final String jobId) {
         log.info("Stop pipeline job {}", jobId);
+        
pipelineDistributedBarrier.removeParentNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(jobId));
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         jobConfigPOJO.setDisabled(true);
         jobConfigPOJO.getProps().setProperty("stop_time", 
LocalDateTime.now().format(DATE_TIME_FORMATTER));
         // TODO updateJobConfiguration might doesn't work
         
PipelineAPIFactory.getJobConfigurationAPI().updateJobConfiguration(jobConfigPOJO);
+        String barrierPath = 
PipelineMetaDataNode.getScalingJobBarrierDisablePath(jobId);
+        pipelineDistributedBarrier.register(barrierPath, 
jobConfigPOJO.getShardingTotalCount());
+        pipelineDistributedBarrier.await(barrierPath, 5, TimeUnit.SECONDS);
     }
     
     @Override
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
index 7c050174342..b8d93725514 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/execute/PipelineJobExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.DataPipelineConstants;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobProgressDetector;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobPreparer;
@@ -32,6 +33,7 @@ import 
org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
 
 import java.util.Optional;
 import java.util.concurrent.ExecutorService;
@@ -46,11 +48,18 @@ public final class PipelineJobExecutor extends 
AbstractLifecycleExecutor {
     
     private static final Pattern CONFIG_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/config");
     
+    private static final Pattern BARRIER_MATCH_PATTERN = 
Pattern.compile(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/(j\\d{2}[0-9a-f]+)/barrier/(enable|disable)/\\d+");
+    
     private final ExecutorService executor = Executors.newFixedThreadPool(20);
     
     @Override
     protected void doStart() {
-        
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 event -> getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, 
optional)));
+        
PipelineAPIFactory.getGovernanceRepositoryAPI().watch(DataPipelineConstants.DATA_PIPELINE_ROOT,
 event -> {
+            if (BARRIER_MATCH_PATTERN.matcher(event.getKey()).matches() && 
event.getType() == Type.ADDED) {
+                
PipelineDistributedBarrier.getInstance().checkChildrenNodeCount(event);
+            }
+            getJobConfigPOJO(event).ifPresent(optional -> processEvent(event, 
optional));
+        });
     }
     
     private Optional<JobConfigurationPOJO> getJobConfigPOJO(final 
DataChangedEvent event) {
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
index b31a0064bed..9e75d46ef13 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/node/PipelineMetaDataNode.java
@@ -87,4 +87,24 @@ public final class PipelineMetaDataNode {
     public static String getScalingCheckResultPath(final String jobId) {
         return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "check", "result");
     }
+    
+    /**
+     * Get scaling job barrier enable path.
+     *
+     * @param jobId job id
+     * @return job barrier path.
+     */
+    public static String getScalingJobBarrierEnablePath(final String jobId) {
+        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "barrier", "enable");
+    }
+    
+    /**
+     * Get scaling job barrier disable path.
+     *
+     * @param jobId job id
+     * @return job barrier path.
+     */
+    public static String getScalingJobBarrierDisablePath(final String jobId) {
+        return String.join("/", DataPipelineConstants.DATA_PIPELINE_ROOT, 
jobId, "barrier", "disable");
+    }
 }
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
new file mode 100644
index 00000000000..038fe1f90c7
--- /dev/null
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrier.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.concurrent.LazyInitializer;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Pipeline distributed barrier.
+ */
+@Slf4j
+public final class PipelineDistributedBarrier {
+    
+    private static final PipelineDistributedBarrier INSTANCE = new 
PipelineDistributedBarrier();
+    
+    private static final LazyInitializer<ClusterPersistRepository> 
REPOSITORY_LAZY_INITIALIZER = new LazyInitializer<ClusterPersistRepository>() {
+        @Override
+        protected ClusterPersistRepository initialize() {
+            return (ClusterPersistRepository) 
PipelineContext.getContextManager().getMetaDataContexts().getPersistService().getRepository();
+        }
+    };
+    
+    private final Map<String, InnerCountDownLatchHolder> countDownLatchMap = 
new ConcurrentHashMap<>();
+    
+    @SneakyThrows
+    private static ClusterPersistRepository getRepository() {
+        return REPOSITORY_LAZY_INITIALIZER.get();
+    }
+    
+    /**
+     * Get instance.
+     *
+     * @return instance
+     */
+    public static PipelineDistributedBarrier getInstance() {
+        return INSTANCE;
+    }
+    
+    /**
+     * Register count down latch.
+     *
+     * @param parentPath parent path
+     * @param totalCount total count
+     */
+    public void register(final String parentPath, final int totalCount) {
+        getRepository().persist(parentPath, "");
+        countDownLatchMap.computeIfAbsent(parentPath, k -> new 
InnerCountDownLatchHolder(totalCount, new CountDownLatch(1)));
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     * @param shardingItem sharding item
+     */
+    public void persistEphemeralChildrenNode(final String parentPath, final 
int shardingItem) {
+        String key = String.join("/", parentPath, 
Integer.toString(shardingItem));
+        getRepository().delete(key);
+        getRepository().persistEphemeral(key, "");
+    }
+    
+    /**
+     * Persist ephemeral children node.
+     *
+     * @param parentPath parent path
+     */
+    public void removeParentNode(final String parentPath) {
+        getRepository().delete(String.join("/", parentPath));
+        InnerCountDownLatchHolder holder = 
countDownLatchMap.remove(parentPath);
+        if (null != holder) {
+            holder.getCountDownLatch().countDown();
+        }
+    }
+    
+    /**
+     * Await unitl all children node is ready.
+     *
+     * @param parentPath parent path
+     * @param timeout timeout
+     * @param timeUnit time unit
+     * @return true if the count reached zero and false if the waiting time 
elapsed before the count reached zero
+     */
+    public boolean await(final String parentPath, final long timeout, final 
TimeUnit timeUnit) {
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (null == holder) {
+            return false;
+        }
+        try {
+            boolean result = holder.getCountDownLatch().await(timeout, 
timeUnit);
+            if (!result) {
+                log.info("await timeout, parent path: {}, timeout: {}, time 
unit: {}", parentPath, timeout, timeUnit);
+            }
+            return result;
+        } catch (final InterruptedException ignored) {
+        }
+        return false;
+    }
+    
+    /**
+     * Check child node count equal shardingCount.
+     *
+     * @param event event
+     */
+    public void checkChildrenNodeCount(final DataChangedEvent event) {
+        if (StringUtils.isBlank(event.getKey())) {
+            return;
+        }
+        String parentPath = event.getKey().substring(0, 
event.getKey().lastIndexOf("/"));
+        InnerCountDownLatchHolder holder = countDownLatchMap.get(parentPath);
+        if (null == holder) {
+            return;
+        }
+        List<String> childrenKeys = 
getRepository().getChildrenKeys(parentPath);
+        log.info("children keys: {}, total count: {}", childrenKeys, 
holder.getTotalCount());
+        if (childrenKeys.size() == holder.getTotalCount()) {
+            holder.getCountDownLatch().countDown();
+        }
+    }
+    
+    @RequiredArgsConstructor
+    @Getter
+    private static class InnerCountDownLatchHolder {
+        
+        private final int totalCount;
+        
+        private final CountDownLatch countDownLatch;
+    }
+}
diff --git 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 001da102255..f730c8f3758 100644
--- 
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -31,7 +31,9 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineIgnoredExc
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
 
@@ -44,6 +46,8 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
     
     private final PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager();
     
+    private final PipelineDistributedBarrier pipelineDistributedBarrier = 
PipelineDistributedBarrier.getInstance();
+    
     // Shared by all sharding items
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
@@ -72,6 +76,7 @@ public final class MigrationJob extends AbstractPipelineJob 
implements SimpleJob
         });
         getTasksRunnerMap().put(shardingItem, tasksRunner);
         
PipelineJobProgressPersistService.addJobProgressPersistContext(getJobId(), 
shardingItem);
+        
pipelineDistributedBarrier.persistEphemeralChildrenNode(PipelineMetaDataNode.getScalingJobBarrierEnablePath(getJobId()),
 shardingItem);
     }
     
     private void prepare(final MigrationJobItemContext jobItemContext) {
@@ -105,8 +110,10 @@ public final class MigrationJob extends 
AbstractPipelineJob implements SimpleJob
             return;
         }
         log.info("stop tasks runner, jobId={}", getJobId());
+        String scalingJobBarrierDisablePath = 
PipelineMetaDataNode.getScalingJobBarrierDisablePath(getJobId());
         for (PipelineTasksRunner each : getTasksRunnerMap().values()) {
             each.stop();
+            
pipelineDistributedBarrier.persistEphemeralChildrenNode(scalingJobBarrierDisablePath,
 each.getJobItemContext().getShardingItem());
         }
         getTasksRunnerMap().clear();
         
PipelineJobProgressPersistService.removeJobProgressPersistContext(getJobId());
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
new file mode 100644
index 00000000000..94641521d09
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/util/PipelineDistributedBarrierTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.util;
+
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
+import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+@SuppressWarnings("rawtypes")
+public final class PipelineDistributedBarrierTest {
+    
+    @BeforeClass
+    public static void setUp() {
+        PipelineContextUtil.mockModeConfigAndContextManager();
+    }
+    
+    @Test
+    public void assertRegisterAndRemove() throws NoSuchFieldException, 
IllegalAccessException {
+        PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance();
+        instance.register("/test", 1);
+        Map countDownLatchMap = ReflectionUtil.getFieldValue(instance, 
"countDownLatchMap", Map.class);
+        assertNotNull(countDownLatchMap);
+        assertThat(countDownLatchMap.size(), is(1));
+        instance.removeParentNode("/test");
+        assertThat(countDownLatchMap.size(), is(0));
+    }
+    
+    @Test
+    public void assertAwait() {
+        PipelineDistributedBarrier instance = 
PipelineDistributedBarrier.getInstance();
+        String parentPath = 
"/scaling/j0130317c3054317c7363616c696e675f626d73716c/barrier/enable";
+        instance.register(parentPath, 1);
+        instance.persistEphemeralChildrenNode(parentPath, 1);
+        boolean actual = instance.await(parentPath, 1, TimeUnit.SECONDS);
+        assertFalse(actual);
+        instance.checkChildrenNodeCount(new 
DataChangedEvent("/scaling/j0130317c3054317c7363616c696e675f626d73716c/barrier/enable/1",
 "", Type.ADDED));
+        actual = instance.await(parentPath, 1, TimeUnit.SECONDS);
+        assertTrue(actual);
+    }
+}

Reply via email to