This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c9c074  Allow proxy to come up before any brokers have (#1559)
1c9c074 is described below

commit 1c9c07428b6281b88c56ebeed5f595cf7dafabda
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Apr 12 18:39:13 2018 +0200

    Allow proxy to come up before any brokers have (#1559)
    
    If no brokers have come up, then /loadbalance/brokers will not have
    been created. Previously, if a proxy came up at this point, it would
    get a NoNodeException when it tried to watch the children of this
    path, and the proxy itself would hang.
    
    This change modifies the ZooKeeper cache, so that if you try to
    getChildren on a node that doesn't exist, a watcher on that path's
    existance will be created before the NoNodeException is thrown.
    
    Callers can then call getChildren with a watcher, and expect the
    watcher to trigger when the the node is created and has another node
    created below it.
---
 .../proxy/server/util/ZookeeperCacheLoader.java    | 13 ++---
 .../apache/pulsar/zookeeper/ZooKeeperCache.java    | 15 +++++-
 .../pulsar/zookeeper/ZookeeperCacheTest.java       | 58 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 27d5a9b..7822347 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server.util;
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +33,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,18 +97,17 @@ public class ZookeeperCacheLoader implements Closeable {
         });
 
         // Do initial fetch of brokers list
-        availableBrokersSet = availableBrokersCache.get();
-        updateBrokerList(availableBrokersSet);
+        try {
+            updateBrokerList(availableBrokersCache.get());
+        } catch (NoNodeException nne) { // can happen if no broker started yet
+            updateBrokerList(Collections.emptySet());
+        }
     }
 
     public List<LoadManagerReport> getAvailableBrokers() {
         return availableBrokers;
     }
 
-    public Set<String> getAvailableBrokersSet() {
-        return availableBrokersSet;
-    }
-
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index e2e46aa..fed55c3 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -194,11 +194,15 @@ public abstract class ZooKeeperCache implements Watcher {
      * @throws InterruptedException
      */
     public boolean exists(final String path) throws KeeperException, 
InterruptedException {
+        return exists(path, this);
+    }
+
+    private boolean exists(final String path, Watcher watcher) throws 
KeeperException, InterruptedException {
         try {
             return existsCache.get(path, new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
-                    return zkSession.get().exists(path, ZooKeeperCache.this) 
!= null;
+                    return zkSession.get().exists(path, watcher) != null;
                 }
             });
         } catch (ExecutionException e) {
@@ -386,7 +390,14 @@ public abstract class ZooKeeperCache implements Watcher {
             });
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            if (cause instanceof KeeperException) {
+            // The node we want may not exist yet, so put a watcher on its 
existance
+            // before throwing up the exception. Its possible that the node 
could have
+            // been created after the call to getChildren, but before the call 
to exists().
+            // If this is the case, exists will return true, and we just call 
getChildren again.
+            if (cause instanceof KeeperException.NoNodeException
+                    && exists(path, watcher)) {
+                return getChildren(path, watcher);
+            } else if (cause instanceof KeeperException) {
                 throw (KeeperException) cause;
             } else if (cause instanceof InterruptedException) {
                 throw (InterruptedException) cause;
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index af00abc..09c3ea1 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
@@ -176,6 +177,63 @@ public class ZookeeperCacheTest {
     }
 
     @Test(timeOut = 10000)
+    void testChildrenCacheZnodeCreatedAfterCache() throws Exception {
+
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
+        ZooKeeperChildrenCache cache = new 
ZooKeeperChildrenCache(zkCacheService, "/test");
+
+        // Create callback counter
+        AtomicInteger notificationCount = new AtomicInteger(0);
+        ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> {
+            notificationCount.incrementAndGet();
+        };
+
+        // Register counter twice and unregister once, so callback should be 
counted correctly
+        cache.registerListener(counter);
+        cache.registerListener(counter);
+        cache.unregisterListener(counter);
+
+        assertEquals(notificationCount.get(), 0);
+        try {
+            cache.get();
+            fail("Expect this to fail");
+        } catch (KeeperException.NoNodeException nne) {
+            // correct
+        }
+
+        zkClient.create("/test", new byte[0], null, null);
+        zkClient.create("/test/z1", new byte[0], null, null);
+
+        // Wait for cache to be updated in background
+        while (notificationCount.get() < 1) {
+            Thread.sleep(1);
+        }
+
+        assertEquals(cache.get(), new 
TreeSet<String>(Lists.newArrayList("z1")));
+        assertEquals(cache.get("/test"), new 
TreeSet<String>(Lists.newArrayList("z1")));
+        assertEquals(notificationCount.get(), 1);
+
+        zkClient.delete("/test/z1", -1);
+        while (notificationCount.get() < 2) {
+            Thread.sleep(1);
+        }
+
+        assertTrue(cache.get().isEmpty());
+        assertTrue(cache.get().isEmpty());
+        zkCacheService.process(new WatchedEvent(Event.EventType.None, 
KeeperState.Expired, null));
+        zkClient.failNow(Code.SESSIONEXPIRED);
+
+        try {
+            cache.get();
+            fail("should have thrown exception");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        assertEquals(notificationCount.get(), 2);
+    }
+
+    @Test(timeOut = 10000)
     void testExistsCache() throws Exception {
         // Check existence after creation of the node
         zkClient.create("/test", new byte[0], null, null);

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to