This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 871a1e0 Ensure getting list of topics for namespace is handled asynchronously (#5188) 871a1e0 is described below commit 871a1e028febc45742842754a801073a47d71698 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Sep 17 17:30:50 2019 -0700 Ensure getting list of topics for namespace is handled asynchronously (#5188) * Ensure getting list of topics for namespace is handled asynchrounously * Fixed mocked zk deadlock * Test fixes * Fixed mutex unlocking in mock-zookeeper create method * Fixed caching of empty values * Do async call in background * Fixed merge conflicts * Fixed broken import from shaded class --- .../java/org/apache/zookeeper/MockZooKeeper.java | 81 ++++++----- .../org/apache/pulsar/broker/PulsarService.java | 3 +- .../apache/pulsar/broker/admin/AdminResource.java | 1 + .../pulsar/broker/admin/impl/NamespacesBase.java | 6 +- .../broker/admin/impl/PersistentTopicsBase.java | 3 +- .../apache/pulsar/broker/admin/v1/Namespaces.java | 20 ++- .../broker/admin/v1/NonPersistentTopics.java | 1 + .../apache/pulsar/broker/admin/v2/Namespaces.java | 32 ++-- .../broker/admin/v2/NonPersistentTopics.java | 1 + .../pulsar/broker/namespace/NamespaceService.java | 60 ++++---- .../apache/pulsar/broker/service/ServerCnx.java | 36 ++--- .../client/impl/PatternTopicsConsumerImplTest.java | 4 +- .../pulsar/zookeeper/GlobalZooKeeperCache.java | 2 +- .../pulsar/zookeeper/LocalZooKeeperCache.java | 2 +- .../apache/pulsar/zookeeper/ZooKeeperCache.java | 161 +++++++++++++-------- .../pulsar/zookeeper/ZooKeeperChildrenCache.java | 43 ++++-- .../pulsar/zookeeper/ZookeeperCacheTest.java | 23 +-- 17 files changed, 276 insertions(+), 203 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index 0afbcd6..c417604 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -126,6 +126,11 @@ public class MockZooKeeper extends ZooKeeper { public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws KeeperException, InterruptedException { mutex.lock(); + + final Set<Watcher> toNotifyCreate = Sets.newHashSet(); + final Set<Watcher> toNotifyParent = Sets.newHashSet(); + final String parent = path.substring(0, path.lastIndexOf("/")); + try { checkProgrammedFail(); @@ -136,7 +141,6 @@ public class MockZooKeeper extends ZooKeeper { throw new KeeperException.NodeExistsException(path); } - final String parent = path.substring(0, path.lastIndexOf("/")); if (!parent.isEmpty() && !tree.containsKey(parent)) { throw new KeeperException.NoNodeException(); } @@ -152,55 +156,57 @@ public class MockZooKeeper extends ZooKeeper { tree.put(path, Pair.of(data, 0)); - final Set<Watcher> toNotifyCreate = Sets.newHashSet(); toNotifyCreate.addAll(watchers.get(path)); - final Set<Watcher> toNotifyParent = Sets.newHashSet(); if (!parent.isEmpty()) { toNotifyParent.addAll(watchers.get(parent)); } watchers.removeAll(path); - final String finalPath = path; - executor.execute(() -> { - toNotifyCreate.forEach( - watcher -> watcher.process( - new WatchedEvent(EventType.NodeCreated, - KeeperState.SyncConnected, - finalPath))); - toNotifyParent.forEach( - watcher -> watcher.process( - new WatchedEvent(EventType.NodeChildrenChanged, - KeeperState.SyncConnected, - parent))); - }); - - return path; } finally { + mutex.unlock(); } + final String finalPath = path; + executor.execute(() -> { + + toNotifyCreate.forEach( + watcher -> watcher.process( + new WatchedEvent(EventType.NodeCreated, + KeeperState.SyncConnected, + finalPath))); + toNotifyParent.forEach( + watcher -> watcher.process( + new WatchedEvent(EventType.NodeChildrenChanged, + KeeperState.SyncConnected, + parent))); + }); + + return path; } @Override public void create(final String path, final byte[] data, final List<ACL> acl, CreateMode createMode, final StringCallback cb, final Object ctx) { - if (stopped) { - cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); - return; - } - final Set<Watcher> toNotifyCreate = Sets.newHashSet(); - toNotifyCreate.addAll(watchers.get(path)); - - final Set<Watcher> toNotifyParent = Sets.newHashSet(); - final String parent = path.substring(0, path.lastIndexOf("/")); - if (!parent.isEmpty()) { - toNotifyParent.addAll(watchers.get(parent)); - } - watchers.removeAll(path); executor.execute(() -> { mutex.lock(); + + if (stopped) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } + + final Set<Watcher> toNotifyCreate = Sets.newHashSet(); + toNotifyCreate.addAll(watchers.get(path)); + + final Set<Watcher> toNotifyParent = Sets.newHashSet(); + final String parent = path.substring(0, path.lastIndexOf("/")); + if (!parent.isEmpty()) { + toNotifyParent.addAll(watchers.get(parent)); + } + if (getProgrammedFailStatus()) { mutex.unlock(); cb.processResult(failReturnCode.intValue(), path, ctx, null); @@ -215,6 +221,7 @@ public class MockZooKeeper extends ZooKeeper { cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } else { tree.put(path, Pair.of(data, 0)); + watchers.removeAll(path); mutex.unlock(); cb.processResult(0, path, ctx, null); @@ -331,6 +338,12 @@ public class MockZooKeeper extends ZooKeeper { return; } + if (!tree.containsKey(path)) { + mutex.unlock(); + cb.processResult(KeeperException.Code.NoNode, path, ctx, null); + return; + } + List<String> children = Lists.newArrayList(); for (String item : tree.tailMap(path).keySet()) { if (!item.startsWith(path)) { @@ -347,12 +360,12 @@ public class MockZooKeeper extends ZooKeeper { } } - mutex.unlock(); - - cb.processResult(0, path, ctx, children); if (watcher != null) { watchers.put(path, watcher); } + mutex.unlock(); + + cb.processResult(0, path, ctx, children); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 113c081..2818744 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -649,7 +649,7 @@ public class PulsarService implements AutoCloseable { List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList(); long topicLoadStart = System.nanoTime(); - for (String topic : getNamespaceService().getListOfPersistentTopics(nsName)) { + for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { @@ -974,7 +974,6 @@ public class PulsarService implements AutoCloseable { return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls; } - private void startWorkerService(AuthenticationService authenticationService, AuthorizationService authorizationService) throws InterruptedException, IOException, KeeperException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index ed497e8..92aaecd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -597,6 +597,7 @@ public abstract class AdminResource extends PulsarWebResource { try { topicExist = pulsar.getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .join() .contains(topicName.toString()); } catch (Exception e) { log.warn("Unexpected error while getting list of topics. topic={}. Error: {}", diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 5d6d8fc..02c3e46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -24,8 +24,6 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; -import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; -import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -189,7 +187,7 @@ public abstract class NamespacesBase extends AdminResource { boolean isEmpty; try { - isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).isEmpty() + isEmpty = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join().isEmpty() && getPartitionedTopicList(TopicDomain.persistent).isEmpty() && getPartitionedTopicList(TopicDomain.non_persistent).isEmpty(); } catch (Exception e) { @@ -319,7 +317,7 @@ public abstract class NamespacesBase extends AdminResource { NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); try { - List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName); + List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join(); for (String topic : topics) { NamespaceBundle topicBundle = (NamespaceBundle) pulsar().getNamespaceService() .getBundle(TopicName.get(topic)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 509b41d..71c758b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -378,6 +378,7 @@ public class PersistentTopicsBase extends AdminResource { try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .join() .contains(topicName.toString()); if (topicExist) { log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); @@ -1201,7 +1202,7 @@ public class PersistentTopicsBase extends AdminResource { return; } } - + if (partitionException.get() != null) { log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId, partitionException.get()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index a47bc23..a0fbeca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -113,21 +113,25 @@ public class Namespaces extends NamespacesBase { @ApiOperation(hidden = true, value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public List<String> getTopics(@PathParam("property") String property, + public void getTopics(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, + @Suspended AsyncResponse asyncResponse) { validateAdminAccessForTenant(property); validateNamespaceName(property, cluster, namespace); // Validate that namespace exists, throws 404 if it doesn't exist getNamespacePolicies(namespaceName); - try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); - } catch (Exception e) { - log.error("Failed to get topics list for namespace {}/{}/{}", property, cluster, namespace, e); - throw new RestException(e); - } + pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + .thenAccept(topics -> { + asyncResponse.resume(topics); + }) + .exceptionally(ex -> { + log.error("Failed to get topics list for namespace {}", namespaceName, ex); + asyncResponse.resume(ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 96bc084..7667167 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -129,6 +129,7 @@ public class NonPersistentTopics extends PersistentTopics { try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .join() .contains(topicName.toString()); if (topicExist) { log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 435892e..bd5ff0b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.broker.admin.v2; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + import java.util.List; import java.util.Map; import java.util.Set; @@ -49,17 +54,12 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.apache.pulsar.common.policies.data.SubscribeRate; import org.apache.pulsar.common.policies.data.SubscriptionAuthMode; -import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; - @Path("/namespaces") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) @@ -80,21 +80,25 @@ public class Namespaces extends NamespacesBase { @ApiOperation(value = "Get the list of all the topics under a certain namespace.", response = String.class, responseContainer = "Set") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public List<String> getTopics(@PathParam("tenant") String tenant, + public void getTopics(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode) { + @QueryParam("mode") @DefaultValue("PERSISTENT") Mode mode, + @Suspended AsyncResponse asyncResponse) { validateAdminAccessForTenant(tenant); validateNamespaceName(tenant, namespace); // Validate that namespace exists, throws 404 if it doesn't exist getNamespacePolicies(namespaceName); - try { - return pulsar().getNamespaceService().getListOfTopics(namespaceName, mode); - } catch (Exception e) { - log.error("Failed to get topics list for namespace {}", namespaceName, e); - throw new RestException(e); - } + pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + .thenAccept(topics -> { + asyncResponse.resume(topics); + }) + .exceptionally(ex -> { + log.error("Failed to get topics list for namespace {}", namespaceName, ex); + asyncResponse.resume(ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 10dc5ee..9bc65e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -174,6 +174,7 @@ public class NonPersistentTopics extends PersistentTopics { try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .join() .contains(topicName.toString()); if (topicExist) { log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 5fbc20f..83e3ade 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -21,6 +21,8 @@ package org.apache.pulsar.broker.namespace; import com.google.common.collect.Lists; import com.google.common.hash.Hashing; import io.netty.channel.EventLoopGroup; + +import org.apache.commons.collections4.ListUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; @@ -54,7 +56,6 @@ import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; -import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -68,6 +69,7 @@ import org.slf4j.LoggerFactory; import java.net.URI; import java.net.URL; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -863,14 +865,15 @@ public class NamespaceService { return getBundle(topicName); } - public List<String> getFullListOfTopics(NamespaceName namespaceName) throws Exception { - List<String> topics = getListOfPersistentTopics(namespaceName); - topics.addAll(getListOfNonPersistentTopics(namespaceName)); - return topics; + public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespaceName) { + return getListOfPersistentTopics(namespaceName) + .thenCombine(getListOfNonPersistentTopics(namespaceName), + (persistentTopics, nonPersistentTopics) -> { + return ListUtils.union(persistentTopics, nonPersistentTopics); + }); } - public List<String> getListOfTopics(NamespaceName namespaceName, Mode mode) - throws Exception { + public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) { switch (mode) { case ALL: return getFullListOfTopics(namespaceName); @@ -882,30 +885,26 @@ public class NamespaceService { } } - public List<String> getListOfPersistentTopics(NamespaceName namespaceName) throws Exception { - List<String> topics = Lists.newArrayList(); - + public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) { // For every topic there will be a managed ledger created. - try { - String path = String.format("/managed-ledgers/%s/persistent", namespaceName); - if (LOG.isDebugEnabled()) { - LOG.debug("Getting children from managed-ledgers now: {}", path); - } - - for (String topic : pulsar.getLocalZkCacheService().managedLedgerListCache().get(path)) { - topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(topic))); - } - } catch (KeeperException.NoNodeException e) { - // NoNode means there are no persistent topics for this namespace + String path = String.format("/managed-ledgers/%s/persistent", namespaceName); + if (LOG.isDebugEnabled()) { + LOG.debug("Getting children from managed-ledgers now: {}", path); } - topics.sort(null); - return topics; - } + return pulsar.getLocalZkCacheService().managedLedgerListCache().getAsync(path) + .thenApply(znodes -> { + List<String> topics = Lists.newArrayList(); + for (String znode : znodes) { + topics.add(String.format("persistent://%s/%s", namespaceName, Codec.decode(znode))); + } - public List<String> getListOfNonPersistentTopics(NamespaceName namespaceName) throws Exception { - List<String> topics = Lists.newArrayList(); + topics.sort(null); + return topics; + }); + } + public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) { ClusterData peerClusterData; try { peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName) @@ -921,6 +920,7 @@ public class NamespaceService { } // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache. + List<String> topics = Lists.newArrayList(); synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) { if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) { pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values() @@ -935,13 +935,13 @@ public class NamespaceService { } topics.sort(null); - return topics; + return CompletableFuture.completedFuture(topics); } - private List<String> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData, - NamespaceName namespace) throws Exception { + private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData, + NamespaceName namespace) { PulsarClientImpl client = getNamespaceClient(peerClusterData); - return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT).get(); + return client.getLookup().getTopicsUnderNamespace(namespace, Mode.NON_PERSISTENT); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index d91bd1a..1cab8e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1297,27 +1297,27 @@ public class ServerCnx extends PulsarHandler { final long requestId = commandGetTopicsOfNamespace.getRequestId(); final String namespace = commandGetTopicsOfNamespace.getNamespace(); final CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode(); + final NamespaceName namespaceName = NamespaceName.get(namespace); - try { - final NamespaceName namespaceName = NamespaceName.get(namespace); - - final List<String> topics = getBrokerService().pulsar().getNamespaceService() - .getListOfTopics(namespaceName, mode); + getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode) + .thenAccept(topics -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", + remoteAddress, namespace, requestId, topics.size()); + } - if (log.isDebugEnabled()) { - log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", - remoteAddress, namespace, requestId, topics.size()); - } + ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId)); + }) + .exceptionally(ex -> { + log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", + remoteAddress, namespace, requestId); + ctx.writeAndFlush( + Commands.newError(requestId, + BrokerServiceException.getClientErrorCode(new ServerMetadataException(ex)), + ex.getMessage())); - ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId)); - } catch (Exception e) { - log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", - remoteAddress, namespace, requestId); - ctx.writeAndFlush( - Commands.newError(requestId, - BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)), - e.getMessage())); - } + return null; + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java index 1d33641..18cf12c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java @@ -27,6 +27,7 @@ import static org.testng.Assert.fail; import com.google.common.collect.Lists; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.stream.IntStream; @@ -725,7 +726,8 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase { // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic. List<String> topicNames = Lists.newArrayList(topicName2); NamespaceService nss = pulsar.getNamespaceService(); - doReturn(topicNames).when(nss).getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); + doReturn(CompletableFuture.completedFuture(topicNames)).when(nss) + .getListOfPersistentTopics(NamespaceName.get("my-property/my-ns")); // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3 log.debug("recheck topics change"); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java index 8a7c3e7..671ef50 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java @@ -115,7 +115,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable { // dataCache.synchronous().invalidateAll(); - childrenCache.invalidateAll(); + childrenCache.synchronous().invalidateAll(); return; default: break; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java index 3045a0b..79b9738 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalZooKeeperCache.java @@ -51,7 +51,7 @@ public class LocalZooKeeperCache extends ZooKeeperCache { // in case of expired, the zkSession is no longer good LOG.warn("Lost connection from local ZK. Invalidating the whole cache."); dataCache.synchronous().invalidateAll(); - childrenCache.invalidateAll(); + childrenCache.synchronous().invalidateAll(); return; default: break; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java index 6136481..eaab17b 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperCache.java @@ -22,17 +22,17 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.Sets; +import java.io.IOException; import java.nio.file.Paths; import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Collections; import java.util.Map.Entry; import java.util.Optional; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -41,6 +41,9 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.util.SafeRunnable; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.zookeeper.AsyncCallback.ChildrenCallback; +import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -77,8 +80,8 @@ public abstract class ZooKeeperCache implements Watcher { public static final String ZK_CACHE_INSTANCE = "zk_cache_instance"; protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache; - protected final Cache<String, Set<String>> childrenCache; - protected final Cache<String, Boolean> existsCache; + protected final AsyncLoadingCache<String, Set<String>> childrenCache; + protected final AsyncLoadingCache<String, Boolean> existsCache; private final OrderedExecutor executor; private final OrderedExecutor backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build(); private boolean shouldShutdownExecutor; @@ -96,8 +99,10 @@ public abstract class ZooKeeperCache implements Watcher { this.dataCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES) .buildAsync((key, executor1) -> null); - this.childrenCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); - this.existsCache = CacheBuilder.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build(); + this.childrenCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES) + .buildAsync((key, executor1) -> null); + this.existsCache = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES) + .buildAsync((key, executor1) -> null); } public ZooKeeperCache(ZooKeeper zkSession, int zkOperationTimeoutSeconds) { @@ -114,14 +119,14 @@ public abstract class ZooKeeperCache implements Watcher { final String path = event.getPath(); if (path != null) { dataCache.synchronous().invalidate(path); - childrenCache.invalidate(path); + childrenCache.synchronous().invalidate(path); // sometimes zk triggers one watch per zk-session and if zkDataCache and ZkChildrenCache points to this // ZookeeperCache instance then ZkChildrenCache may not invalidate for it's parent. Therefore, invalidate // cache for parent if child is created/deleted if (event.getType().equals(EventType.NodeCreated) || event.getType().equals(EventType.NodeDeleted)) { - childrenCache.invalidate(Paths.get(path).getParent().toString()); + childrenCache.synchronous().invalidate(Paths.get(path).getParent().toString()); } - existsCache.invalidate(path); + existsCache.synchronous().invalidate(path); if (executor != null && updater != null) { if (LOG.isDebugEnabled()) { LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, updater); @@ -152,7 +157,7 @@ public abstract class ZooKeeperCache implements Watcher { } private void invalidateAllExists() { - existsCache.invalidateAll(); + existsCache.synchronous().invalidateAll(); } public void invalidateAllData() { @@ -160,7 +165,7 @@ public abstract class ZooKeeperCache implements Watcher { } public void invalidateAllChildren() { - childrenCache.invalidateAll(); + childrenCache.synchronous().invalidateAll(); } public void invalidateData(String path) { @@ -168,11 +173,11 @@ public abstract class ZooKeeperCache implements Watcher { } public void invalidateChildren(String path) { - childrenCache.invalidate(path); + childrenCache.synchronous().invalidate(path); } private void invalidateExists(String path) { - existsCache.invalidate(path); + existsCache.synchronous().invalidate(path); } public void asyncInvalidate(String path) { @@ -203,20 +208,30 @@ public abstract class ZooKeeperCache implements Watcher { } private boolean exists(final String path, Watcher watcher) throws KeeperException, InterruptedException { - try { - return existsCache.get(path, () -> zkSession.get().exists(path, watcher) != null); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof KeeperException) { - throw (KeeperException) cause; - } else if (cause instanceof InterruptedException) { - throw (InterruptedException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw new RuntimeException(cause); + return existsAsync(path, watcher).join(); + } + + @SuppressWarnings("deprecation") + public CompletableFuture<Boolean> existsAsync(String path, Watcher watcher) { + return existsCache.get(path, (p, executor) -> { + ZooKeeper zk = zkSession.get(); + if (zk == null) { + return FutureUtil.failedFuture(new IOException("ZK session not ready")); } - } + + CompletableFuture<Boolean> future = new CompletableFuture<>(); + zk.exists(path, watcher, (StatCallback) (rc, path1, ctx, stat) -> { + if (rc == Code.OK.intValue()) { + future.complete(true); + } else if (rc == Code.NONODE.intValue()) { + future.complete(false); + } else { + future.completeExceptionally(KeeperException.create(rc)); + } + }, null); + + return future; + }); } /** @@ -365,7 +380,15 @@ public abstract class ZooKeeperCache implements Watcher { * @throws InterruptedException */ public Set<String> getChildren(final String path) throws KeeperException, InterruptedException { - return getChildren(path, this); + try { + return getChildrenAsync(path, this).join(); + } catch (CompletionException e) { + if (e.getCause() instanceof KeeperException) { + throw (KeeperException)e.getCause(); + } else { + throw e; + } + } } /** @@ -375,35 +398,50 @@ public abstract class ZooKeeperCache implements Watcher { * @param path * @param watcher * @return - * @throws KeeperException - * @throws InterruptedException */ - public Set<String> getChildren(final String path, final Watcher watcher) - throws KeeperException, InterruptedException { - try { - return childrenCache.get(path, () -> { - LOG.debug("Fetching children at {}", path); - return Sets.newTreeSet(checkNotNull(zkSession.get()).getChildren(path, watcher)); - }); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - // The node we want may not exist yet, so put a watcher on its existance - // before throwing up the exception. Its possible that the node could have - // been created after the call to getChildren, but before the call to exists(). - // If this is the case, exists will return true, and we just call getChildren again. - if (cause instanceof KeeperException.NoNodeException - && exists(path, watcher)) { - return getChildren(path, watcher); - } else if (cause instanceof KeeperException) { - throw (KeeperException) cause; - } else if (cause instanceof InterruptedException) { - throw (InterruptedException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else { - throw new RuntimeException(cause); - } - } + @SuppressWarnings("deprecation") + public CompletableFuture<Set<String>> getChildrenAsync(String path, Watcher watcher) { + return childrenCache.get(path, (p, executor) -> { + CompletableFuture<Set<String>> future = new CompletableFuture<>(); + executor.execute(SafeRunnable.safeRun(() -> { + ZooKeeper zk = zkSession.get(); + if (zk == null) { + future.completeExceptionally(new IOException("ZK session not ready")); + return; + } + + zk.getChildren(path, watcher, (ChildrenCallback) (rc, path1, ctx, children) -> { + if (rc == Code.OK.intValue()) { + future.complete(Sets.newTreeSet(children)); + } else if (rc == Code.NONODE.intValue()) { + // The node we want may not exist yet, so put a watcher on its existence + // before throwing up the exception. Its possible that the node could have + // been created after the call to getChildren, but before the call to exists(). + // If this is the case, exists will return true, and we just call getChildren again. + existsAsync(path, watcher).thenAccept(exists -> { + if (exists) { + getChildrenAsync(path, watcher) + .thenAccept(c -> future.complete(c)) + .exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + } else { + // Z-node does not exist + future.complete(Collections.emptySet()); + } + }).exceptionally(ex -> { + future.completeExceptionally(ex); + return null; + }); + } else { + future.completeExceptionally(KeeperException.create(rc)); + } + }, null); + })); + + return future; + }); } @SuppressWarnings("unchecked") @@ -412,7 +450,12 @@ public abstract class ZooKeeperCache implements Watcher { } public Set<String> getChildrenIfPresent(String path) { - return childrenCache.getIfPresent(path); + CompletableFuture<Set<String>> future = childrenCache.getIfPresent(path); + if (future != null && future.isDone() && !future.isCompletedExceptionally()) { + return future.getNow(null); + } else { + return null; + } } @Override @@ -422,9 +465,9 @@ public abstract class ZooKeeperCache implements Watcher { } public void invalidateRoot(String root) { - for (String key : childrenCache.asMap().keySet()) { + for (String key : childrenCache.synchronous().asMap().keySet()) { if (key.startsWith(root)) { - childrenCache.invalidate(key); + childrenCache.synchronous().invalidate(key); } } } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java index cc8352e..0bb6f46 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperChildrenCache.java @@ -20,6 +20,8 @@ package org.apache.pulsar.zookeeper; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.zookeeper.ZooKeeperCache.CacheUpdater; @@ -49,12 +51,24 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String> } public Set<String> get() throws KeeperException, InterruptedException { - return cache.getChildren(path, this); + return get(this.path); } public Set<String> get(String path) throws KeeperException, InterruptedException { - LOG.debug("getChildren called at: {}", path); - return cache.getChildren(path, this); + if (LOG.isDebugEnabled()) { + LOG.debug("getChildren called at: {}", path); + } + + Set<String> children = cache.getChildrenAsync(path, this).join(); + if (children == null) { + throw KeeperException.create(KeeperException.Code.NONODE); + } + + return children; + } + + public CompletableFuture<Set<String>> getAsync(String path) { + return cache.getChildrenAsync(path, this); } public void clear() { @@ -67,18 +81,17 @@ public class ZooKeeperChildrenCache implements Watcher, CacheUpdater<Set<String> @Override public void reloadCache(final String path) { - try { - cache.invalidate(path); - Set<String> children = cache.getChildren(path, this); - LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path); - for (ZooKeeperCacheListener<Set<String>> listener : listeners) { - listener.onUpdate(path, children, null); - } - } catch (KeeperException.NoNodeException nne) { - LOG.debug("Node [{}] does not exist", nne.getPath()); - } catch (Exception e) { - LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path); - } + cache.invalidate(path); + cache.getChildrenAsync(path, this) + .thenAccept(children -> { + LOG.info("reloadCache called in zookeeperChildrenCache for path {}", path); + for (ZooKeeperCacheListener<Set<String>> listener : listeners) { + listener.onUpdate(path, children, null); + } + }).exceptionally(ex -> { + LOG.warn("Reloading ZooKeeperDataCache failed at path:{}", path, ex); + return null; + }).join(); } @Override diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index e25c1ad..834844a 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -25,6 +25,13 @@ import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.MoreExecutors; + +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.Collections; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -36,7 +43,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.WatchedEvent; @@ -50,12 +56,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.MoreExecutors; - -import io.netty.util.concurrent.DefaultThreadFactory; - @Test public class ZookeeperCacheTest { private MockZooKeeper zkClient; @@ -194,12 +194,7 @@ public class ZookeeperCacheTest { cache.unregisterListener(counter); assertEquals(notificationCount.get(), 0); - try { - cache.get(); - fail("Expect this to fail"); - } catch (KeeperException.NoNodeException nne) { - // correct - } + assertEquals(cache.get(), Collections.emptySet()); zkClient.create("/test", new byte[0], null, null); zkClient.create("/test/z1", new byte[0], null, null); @@ -231,8 +226,6 @@ public class ZookeeperCacheTest { } catch (Exception e) { // Ok } - - assertEquals(notificationCount.get(), (recvNotifications + 1)); } @Test(timeOut = 10000)