merlimat closed pull request #1559: Allow proxy to come up before any brokers 
have
URL: https://github.com/apache/incubator-pulsar/pull/1559
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
index 27d5a9b743..78223473b2 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java
@@ -20,6 +20,7 @@
 
 import java.io.Closeable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +33,7 @@
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,18 +97,17 @@ public LoadManagerReport deserialize(String key, byte[] 
content) throws Exceptio
         });
 
         // Do initial fetch of brokers list
-        availableBrokersSet = availableBrokersCache.get();
-        updateBrokerList(availableBrokersSet);
+        try {
+            updateBrokerList(availableBrokersCache.get());
+        } catch (NoNodeException nne) { // can happen if no broker started yet
+            updateBrokerList(Collections.emptySet());
+        }
     }
 
     public List<LoadManagerReport> getAvailableBrokers() {
         return availableBrokers;
     }
 
-    public Set<String> getAvailableBrokersSet() {
-        return availableBrokersSet;
-    }
-
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
index e2e46aa8f5..fed55c3eb6 100644
--- 
a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
+++ 
b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java
@@ -194,11 +194,15 @@ public void invalidate(final String path) {
      * @throws InterruptedException
      */
     public boolean exists(final String path) throws KeeperException, 
InterruptedException {
+        return exists(path, this);
+    }
+
+    private boolean exists(final String path, Watcher watcher) throws 
KeeperException, InterruptedException {
         try {
             return existsCache.get(path, new Callable<Boolean>() {
                 @Override
                 public Boolean call() throws Exception {
-                    return zkSession.get().exists(path, ZooKeeperCache.this) 
!= null;
+                    return zkSession.get().exists(path, watcher) != null;
                 }
             });
         } catch (ExecutionException e) {
@@ -386,7 +390,14 @@ public Boolean call() throws Exception {
             });
         } catch (ExecutionException e) {
             Throwable cause = e.getCause();
-            if (cause instanceof KeeperException) {
+            // The node we want may not exist yet, so put a watcher on its 
existance
+            // before throwing up the exception. Its possible that the node 
could have
+            // been created after the call to getChildren, but before the call 
to exists().
+            // If this is the case, exists will return true, and we just call 
getChildren again.
+            if (cause instanceof KeeperException.NoNodeException
+                    && exists(path, watcher)) {
+                return getChildren(path, watcher);
+            } else if (cause instanceof KeeperException) {
                 throw (KeeperException) cause;
             } else if (cause instanceof InterruptedException) {
                 throw (InterruptedException) cause;
diff --git 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
index af00abc42d..09c3ea1524 100644
--- 
a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
+++ 
b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
@@ -175,6 +176,63 @@ void testChildrenCache() throws Exception {
         assertEquals(notificationCount.get(), 3);
     }
 
+    @Test(timeOut = 10000)
+    void testChildrenCacheZnodeCreatedAfterCache() throws Exception {
+
+        ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 
executor);
+        ZooKeeperChildrenCache cache = new 
ZooKeeperChildrenCache(zkCacheService, "/test");
+
+        // Create callback counter
+        AtomicInteger notificationCount = new AtomicInteger(0);
+        ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> {
+            notificationCount.incrementAndGet();
+        };
+
+        // Register counter twice and unregister once, so callback should be 
counted correctly
+        cache.registerListener(counter);
+        cache.registerListener(counter);
+        cache.unregisterListener(counter);
+
+        assertEquals(notificationCount.get(), 0);
+        try {
+            cache.get();
+            fail("Expect this to fail");
+        } catch (KeeperException.NoNodeException nne) {
+            // correct
+        }
+
+        zkClient.create("/test", new byte[0], null, null);
+        zkClient.create("/test/z1", new byte[0], null, null);
+
+        // Wait for cache to be updated in background
+        while (notificationCount.get() < 1) {
+            Thread.sleep(1);
+        }
+
+        assertEquals(cache.get(), new 
TreeSet<String>(Lists.newArrayList("z1")));
+        assertEquals(cache.get("/test"), new 
TreeSet<String>(Lists.newArrayList("z1")));
+        assertEquals(notificationCount.get(), 1);
+
+        zkClient.delete("/test/z1", -1);
+        while (notificationCount.get() < 2) {
+            Thread.sleep(1);
+        }
+
+        assertTrue(cache.get().isEmpty());
+        assertTrue(cache.get().isEmpty());
+        zkCacheService.process(new WatchedEvent(Event.EventType.None, 
KeeperState.Expired, null));
+        zkClient.failNow(Code.SESSIONEXPIRED);
+
+        try {
+            cache.get();
+            fail("should have thrown exception");
+        } catch (Exception e) {
+            // Ok
+        }
+
+        assertEquals(notificationCount.get(), 2);
+    }
+
     @Test(timeOut = 10000)
     void testExistsCache() throws Exception {
         // Check existence after creation of the node


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to