merlimat closed pull request #1559: Allow proxy to come up before any brokers have URL: https://github.com/apache/incubator-pulsar/pull/1559
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java index 27d5a9b743..78223473b2 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,18 +97,17 @@ public LoadManagerReport deserialize(String key, byte[] content) throws Exceptio }); // Do initial fetch of brokers list - availableBrokersSet = availableBrokersCache.get(); - updateBrokerList(availableBrokersSet); + try { + updateBrokerList(availableBrokersCache.get()); + } catch (NoNodeException nne) { // can happen if no broker started yet + updateBrokerList(Collections.emptySet()); + } } public List<LoadManagerReport> getAvailableBrokers() { return availableBrokers; } - public Set<String> getAvailableBrokersSet() { - return availableBrokersSet; - } - public ZooKeeperCache getLocalZkCache() { return localZkCache; } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index e2e46aa8f5..fed55c3eb6 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -194,11 +194,15 @@ public void invalidate(final String path) { * @throws InterruptedException */ public boolean exists(final String path) throws KeeperException, InterruptedException { + return exists(path, this); + } + + private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { try { return existsCache.get(path, new Callable<Boolean>() { @Override public Boolean call() throws Exception { - return zkSession.get().exists(path, ZooKeeperCache.this) != null; + return zkSession.get().exists(path, watcher) != null; } }); } catch (ExecutionException e) { @@ -386,7 +390,14 @@ public Boolean call() throws Exception { }); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause instanceof KeeperException) { + // The node we want may not exist yet, so put a watcher on its existance + // before throwing up the exception. Its possible that the node could have + // been created after the call to getChildren, but before the call to exists(). + // If this is the case, exists will return true, and we just call getChildren again. + if (cause instanceof KeeperException.NoNodeException + && exists(path, watcher)) { + return getChildren(path, watcher); + } else if (cause instanceof KeeperException) { throw (KeeperException) cause; } else if (cause instanceof InterruptedException) { throw (InterruptedException) cause; diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index af00abc42d..09c3ea1524 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; @@ -175,6 +176,63 @@ void testChildrenCache() throws Exception { assertEquals(notificationCount.get(), 3); } + @Test(timeOut = 10000) + void testChildrenCacheZnodeCreatedAfterCache() throws Exception { + + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); + ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test"); + + // Create callback counter + AtomicInteger notificationCount = new AtomicInteger(0); + ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> { + notificationCount.incrementAndGet(); + }; + + // Register counter twice and unregister once, so callback should be counted correctly + cache.registerListener(counter); + cache.registerListener(counter); + cache.unregisterListener(counter); + + assertEquals(notificationCount.get(), 0); + try { + cache.get(); + fail("Expect this to fail"); + } catch (KeeperException.NoNodeException nne) { + // correct + } + + zkClient.create("/test", new byte[0], null, null); + zkClient.create("/test/z1", new byte[0], null, null); + + // Wait for cache to be updated in background + while (notificationCount.get() < 1) { + Thread.sleep(1); + } + + assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1"))); + assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1"))); + assertEquals(notificationCount.get(), 1); + + zkClient.delete("/test/z1", -1); + while (notificationCount.get() < 2) { + Thread.sleep(1); + } + + assertTrue(cache.get().isEmpty()); + assertTrue(cache.get().isEmpty()); + zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null)); + zkClient.failNow(Code.SESSIONEXPIRED); + + try { + cache.get(); + fail("should have thrown exception"); + } catch (Exception e) { + // Ok + } + + assertEquals(notificationCount.get(), 2); + } + @Test(timeOut = 10000) void testExistsCache() throws Exception { // Check existence after creation of the node ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services