This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1c9c074 Allow proxy to come up before any brokers have (#1559) 1c9c074 is described below commit 1c9c07428b6281b88c56ebeed5f595cf7dafabda Author: Ivan Kelly <iv...@apache.org> AuthorDate: Thu Apr 12 18:39:13 2018 +0200 Allow proxy to come up before any brokers have (#1559) If no brokers have come up, then /loadbalance/brokers will not have been created. Previously, if a proxy came up at this point, it would get a NoNodeException when it tried to watch the children of this path, and the proxy itself would hang. This change modifies the ZooKeeper cache, so that if you try to getChildren on a node that doesn't exist, a watcher on that path's existance will be created before the NoNodeException is thrown. Callers can then call getChildren with a watcher, and expect the watcher to trigger when the the node is created and has another node created below it. --- .../proxy/server/util/ZookeeperCacheLoader.java | 13 ++--- .../apache/pulsar/zookeeper/ZooKeeperCache.java | 15 +++++- .../pulsar/zookeeper/ZookeeperCacheTest.java | 58 ++++++++++++++++++++++ 3 files changed, 78 insertions(+), 8 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java index 27d5a9b..7822347 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/util/ZookeeperCacheLoader.java @@ -20,6 +20,7 @@ package org.apache.pulsar.proxy.server.util; import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperDataCache; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,18 +97,17 @@ public class ZookeeperCacheLoader implements Closeable { }); // Do initial fetch of brokers list - availableBrokersSet = availableBrokersCache.get(); - updateBrokerList(availableBrokersSet); + try { + updateBrokerList(availableBrokersCache.get()); + } catch (NoNodeException nne) { // can happen if no broker started yet + updateBrokerList(Collections.emptySet()); + } } public List<LoadManagerReport> getAvailableBrokers() { return availableBrokers; } - public Set<String> getAvailableBrokersSet() { - return availableBrokersSet; - } - public ZooKeeperCache getLocalZkCache() { return localZkCache; } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index e2e46aa..fed55c3 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -194,11 +194,15 @@ public abstract class ZooKeeperCache implements Watcher { * @throws InterruptedException */ public boolean exists(final String path) throws KeeperException, InterruptedException { + return exists(path, this); + } + + private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { try { return existsCache.get(path, new Callable<Boolean>() { @Override public Boolean call() throws Exception { - return zkSession.get().exists(path, ZooKeeperCache.this) != null; + return zkSession.get().exists(path, watcher) != null; } }); } catch (ExecutionException e) { @@ -386,7 +390,14 @@ public abstract class ZooKeeperCache implements Watcher { }); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause instanceof KeeperException) { + // The node we want may not exist yet, so put a watcher on its existance + // before throwing up the exception. Its possible that the node could have + // been created after the call to getChildren, but before the call to exists(). + // If this is the case, exists will return true, and we just call getChildren again. + if (cause instanceof KeeperException.NoNodeException + && exists(path, watcher)) { + return getChildren(path, watcher); + } else if (cause instanceof KeeperException) { throw (KeeperException) cause; } else if (cause instanceof InterruptedException) { throw (InterruptedException) cause; diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index af00abc..09c3ea1 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; @@ -176,6 +177,63 @@ public class ZookeeperCacheTest { } @Test(timeOut = 10000) + void testChildrenCacheZnodeCreatedAfterCache() throws Exception { + + ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, executor); + ZooKeeperChildrenCache cache = new ZooKeeperChildrenCache(zkCacheService, "/test"); + + // Create callback counter + AtomicInteger notificationCount = new AtomicInteger(0); + ZooKeeperCacheListener<Set<String>> counter = (path, data, stat) -> { + notificationCount.incrementAndGet(); + }; + + // Register counter twice and unregister once, so callback should be counted correctly + cache.registerListener(counter); + cache.registerListener(counter); + cache.unregisterListener(counter); + + assertEquals(notificationCount.get(), 0); + try { + cache.get(); + fail("Expect this to fail"); + } catch (KeeperException.NoNodeException nne) { + // correct + } + + zkClient.create("/test", new byte[0], null, null); + zkClient.create("/test/z1", new byte[0], null, null); + + // Wait for cache to be updated in background + while (notificationCount.get() < 1) { + Thread.sleep(1); + } + + assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1"))); + assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1"))); + assertEquals(notificationCount.get(), 1); + + zkClient.delete("/test/z1", -1); + while (notificationCount.get() < 2) { + Thread.sleep(1); + } + + assertTrue(cache.get().isEmpty()); + assertTrue(cache.get().isEmpty()); + zkCacheService.process(new WatchedEvent(Event.EventType.None, KeeperState.Expired, null)); + zkClient.failNow(Code.SESSIONEXPIRED); + + try { + cache.get(); + fail("should have thrown exception"); + } catch (Exception e) { + // Ok + } + + assertEquals(notificationCount.get(), 2); + } + + @Test(timeOut = 10000) void testExistsCache() throws Exception { // Check existence after creation of the node zkClient.create("/test", new byte[0], null, null); -- To stop receiving notification emails like this one, please contact mme...@apache.org.