[ARIES-1774] Switch to watching all endpoints in zookeeper to be able to pass tck
Project: http://git-wip-us.apache.org/repos/asf/aries-rsa/repo Commit: http://git-wip-us.apache.org/repos/asf/aries-rsa/commit/ca922a42 Tree: http://git-wip-us.apache.org/repos/asf/aries-rsa/tree/ca922a42 Diff: http://git-wip-us.apache.org/repos/asf/aries-rsa/diff/ca922a42 Branch: refs/heads/master Commit: ca922a42e0705150d39c6f6955d647f7b1c3ad39 Parents: f07ee8b Author: Christian Schneider <cschn...@adobe.com> Authored: Thu Feb 8 09:30:22 2018 +0100 Committer: Christian Schneider <cschn...@adobe.com> Committed: Thu Feb 8 17:40:34 2018 +0100 ---------------------------------------------------------------------- .../discovery/zookeeper/ZooKeeperDiscovery.java | 3 +- .../repository/ZookeeperEndpointRepository.java | 162 ++++++----- .../subscribe/EndpointListenerTracker.java | 16 +- .../zookeeper/subscribe/InterfaceMonitor.java | 266 ------------------- .../subscribe/InterfaceMonitorManager.java | 213 ++++----------- .../ZookeeperEndpointRepositoryTest.java | 25 +- .../subscribe/InterfaceMonitorManagerTest.java | 71 ++--- .../subscribe/InterfaceMonitorTest.java | 71 ----- 8 files changed, 198 insertions(+), 629 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java index d265a22..50d9598 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java @@ -98,7 +98,8 @@ public class ZooKeeperDiscovery implements Watcher, ManagedService { repository = new ZookeeperEndpointRepository(zkClient); endpointListener = new PublishingEndpointListener(repository); endpointListener.start(bctx); - imManager = new InterfaceMonitorManager(bctx, zkClient); + imManager = new InterfaceMonitorManager(repository); + repository.addListener(imManager); endpointListenerTracker = new EndpointListenerTracker(bctx, imManager); endpointListenerTracker.open(); started = true; http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java index 2349c45..c5c03a4 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; +import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterfaceMonitorManager; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -53,33 +54,11 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { } catch (Exception e) { throw new IllegalStateException("Unable to create base path"); } - // Not yet needed - //this.registerWatcher(); + this.registerWatcher(); } - private void registerWatcher() { - try { - List<String> children = zk.getChildren(ZookeeperEndpointRepository.PATH_PREFIX, this); - System.out.println(children); - } catch (KeeperException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - protected void notifyListener(WatchedEvent wevent) { - EndpointDescription ep = read(wevent.getPath()); - if (ep != null) { - int type = getEndpointEventType(wevent); - EndpointEvent event = new EndpointEvent(type, ep); - listener.endpointChanged(event, null); - } - } - - private int getEndpointEventType(WatchedEvent wevent) { - EventType type = wevent.getType(); - return EndpointEvent.ADDED; + public void addListener(EndpointEventListener listener) { + this.listener = listener; } /** @@ -89,23 +68,7 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { * @return endpoint found in the node or null if no endpoint was found */ public EndpointDescription read(String path) { - try { - Stat stat = zk.exists(path, false); - if (stat == null || stat.getDataLength() <= 0) { - return null; - } - byte[] data = zk.getData(path, false, null); - LOG.debug("Got data for node: {}", path); - - EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data)); - if (endpoint != null) { - return endpoint; - } - LOG.warn("No Discovery information found for node: {}", path); - } catch (Exception e) { - LOG.error("Problem getting EndpointDescription from node " + path, e); - } - return null; + return nodes.get(path); } public void add(EndpointDescription endpoint) throws URISyntaxException, KeeperException, @@ -113,6 +76,8 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { Collection<String> interfaces = endpoint.getInterfaces(); String endpointKey = getKey(endpoint); + createEphemeralNode(ZookeeperEndpointRepository.getZooKeeperPath("") + endpointKey, getData(endpoint)); + LOG.info("Exporting endpoint to zookeeper: {}", endpoint); for (String name : interfaces) { String path = ZookeeperEndpointRepository.getZooKeeperPath(name); @@ -152,8 +117,42 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { } } - public List<EndpointDescription> getAll() throws KeeperException, InterruptedException { - return null; + public Collection<EndpointDescription> getAll() { + return nodes.values(); + } + + /** + * Removes nulls and empty strings from the given string array. + * + * @param strings an array of strings + * @return a new array containing the non-null and non-empty + * elements of the original array in the same order + */ + public static List<String> removeEmpty(List<String> strings) { + List<String> result = new ArrayList<String>(); + if (strings == null) { + return result; + } + for (String s : strings) { + if (s != null && !s.isEmpty()) { + result.add(s); + } + } + return result; + } + + public static String getZooKeeperPath(String name) { + return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); + } + + @Override + public void process(WatchedEvent event) { + LOG.info("Received event {}", event); + if (event.getType() == EventType.NodeDeleted) { + handleRemoved(event.getPath()); + return; + } + watchRecursive(event.getPath()); } @Override @@ -161,6 +160,28 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { } + private void registerWatcher() { + try { + watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX); + } catch (Exception e) { + LOG.info(e.getMessage(), e); + } + } + + private void watchRecursive(String path) { + LOG.info("Watching {}", path); + handleZNodeChanged(path); + try { + List<String> children = zk.getChildren(path, this); + for (String child : children) { + String childPath = (path.endsWith("/") ? path : path + "/") + child; + watchRecursive(childPath); + } + } catch (Exception e) { + LOG.info(e.getMessage(), e); + } + } + private byte[] getData(EndpointDescription epd) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); parser.writeEndpoint(epd, bos); @@ -201,39 +222,38 @@ public class ZookeeperEndpointRepository implements Closeable, Watcher { } } - /** - * Removes nulls and empty strings from the given string array. - * - * @param strings an array of strings - * @return a new array containing the non-null and non-empty - * elements of the original array in the same order - */ - public static List<String> removeEmpty(List<String> strings) { - List<String> result = new ArrayList<String>(); - if (strings == null) { - return result; - } - for (String s : strings) { - if (s != null && !s.isEmpty()) { - result.add(s); - } - } - return result; - } - - public static String getZooKeeperPath(String name) { - return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + name.replace('.', '/'); - } - private static String getKey(EndpointDescription endpoint) throws URISyntaxException { URI uri = new URI(endpoint.getId()); return new StringBuilder().append(uri.getHost()).append("#").append(uri.getPort()) .append("#").append(uri.getPath().replace('/', '#')).toString(); } - @Override - public void process(WatchedEvent event) { - + private void handleZNodeChanged(String path) { + try { + Stat stat = new Stat(); + byte[] data = zk.getData(path, false, stat); + if (data == null || data.length == 0) { + return; + } + EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data)); + if (endpoint != null) { + handleChanged(path, endpoint); + } + } catch (Exception e) { + LOG.info(e.getMessage(), e); + } + } + + private void handleRemoved(String path) { + EndpointDescription endpoint = nodes.remove(path); + EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint); + listener.endpointChanged(event, null); + } + + private void handleChanged(String path, EndpointDescription endpoint) { + EndpointDescription old = nodes.put(path, endpoint); + EndpointEvent event = new EndpointEvent(old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED, endpoint); + listener.endpointChanged(event, null); } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java index 0ed1097..d4805d0 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java @@ -51,21 +51,23 @@ public class EndpointListenerTracker extends ServiceTracker { } @Override - public Object addingService(ServiceReference endpointListener) { - imManager.addInterest(endpointListener); - return null; + public Object addingService(ServiceReference sref) { + Object epListener = super.addingService(sref); + imManager.addInterest(sref, epListener); + return epListener; } @Override - public void modifiedService(ServiceReference endpointListener, Object service) { + public void modifiedService(ServiceReference sref, Object epListener) { // called when an EndpointListener updates its service properties, // e.g. when its interest scope is expanded/reduced - imManager.addInterest(endpointListener); + imManager.addInterest(sref, epListener); } @Override - public void removedService(ServiceReference endpointListener, Object service) { - imManager.removeInterest(endpointListener); + public void removedService(ServiceReference sref, Object epListener) { + imManager.removeInterest(sref); + super.removedService(sref, epListener); } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java deleted file mode 100644 index 2c90b3c..0000000 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitor.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.aries.rsa.discovery.zookeeper.subscribe; - -import java.io.ByteArrayInputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser; -import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; -import org.apache.zookeeper.AsyncCallback.StatCallback; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; -import org.osgi.service.remoteserviceadmin.EndpointDescription; -import org.osgi.service.remoteserviceadmin.EndpointEvent; -import org.osgi.service.remoteserviceadmin.EndpointEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitors ZooKeeper for changes in published endpoints. - * <p> - * Specifically, it monitors the node path associated with a given interface class, - * whose data is a serialized version of an EndpointDescription, and notifies an - * EndpointListener when changes are detected (which can then propagate the - * notification to other EndpointListeners with a matching scope). - * <p> - * Note that the EndpointListener is used here as a decoupling interface for - * convenience, and is not necessarily used according to its documented contract. - */ -public class InterfaceMonitor implements Watcher, StatCallback { - - private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitor.class); - - private final String znode; - private final ZooKeeper zk; - private final EndpointEventListener endpointListener; - private final boolean recursive; - private volatile boolean closed; - - // This map reference changes, so don't synchronize on it - private Map<String, EndpointDescription> nodes = new HashMap<String, EndpointDescription>(); - - private EndpointDescriptionParser parser; - - public InterfaceMonitor(ZooKeeper zk, String objClass, EndpointEventListener endpointListener, String scope) { - this.zk = zk; - this.znode = ZookeeperEndpointRepository.getZooKeeperPath(objClass); - this.recursive = objClass == null || objClass.isEmpty(); - this.endpointListener = endpointListener; - this.parser = new EndpointDescriptionParser(); - LOG.debug("Creating new InterfaceMonitor {} for scope [{}] and objectClass [{}]", - new Object[] {recursive ? "(recursive)" : "", scope, objClass}); - } - - /** - * Returns all endpoints that are currently known to this monitor. - * - * @return all endpoints that are currently known to this monitor - */ - public synchronized List<EndpointDescription> getEndpoints() { - return new ArrayList<EndpointDescription>(nodes.values()); - } - - public void start() { - watch(); - } - - private void watch() { - LOG.debug("registering a ZooKeeper.exists({}) callback", znode); - zk.exists(znode, this, this, null); - zk.getData(znode, this, new DataCallback() { - - @Override - public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { - processDelta(); - } - }, null); - } - - /** - * Zookeeper Watcher interface callback. - */ - public void process(WatchedEvent event) { - LOG.debug("ZooKeeper watcher callback on node {} for event {}", znode, event); - processDelta(); - } - - /** - * Zookeeper StatCallback interface callback. - */ - @SuppressWarnings("deprecation") - public void processResult(int rc, String path, Object ctx, Stat stat) { - LOG.debug("ZooKeeper callback on node: {} code: {}", znode, rc); - - switch (rc) { - case Code.Ok: - case Code.NoNode: - processDelta(); - return; - - case Code.SessionExpired: - case Code.NoAuth: - case Code.ConnectionLoss: - return; - - default: - watch(); - } - } - - private void processDelta() { - if (closed) { - return; - } - - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("ZooKeeper connection was already closed! Not processing changed event."); - return; - } - - try { - if (zk.exists(znode, false) != null) { - zk.getChildren(znode, this); - refreshNodes(); - } else { - LOG.debug("znode {} doesn't exist -> not processing any changes", znode); - } - } catch (Exception e) { - if (zk.getState() != ZooKeeper.States.CONNECTED) { - LOG.debug("Error getting Zookeeper data: " + e); // e.g. session expired, handled by ZooKeeperDiscovery - } else { - LOG.error("Error getting ZooKeeper data.", e); - } - } - } - - public synchronized void close() { - closed = true; - for (EndpointDescription endpoint : nodes.values()) { - EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint); - endpointListener.endpointChanged(event, null); - } - nodes.clear(); - } - - private synchronized void refreshNodes() { - if (closed) { - return; - } - LOG.info("Processing change on node: {}", znode); - - Map<String, EndpointDescription> newNodes = new HashMap<String, EndpointDescription>(); - Map<String, EndpointDescription> prevNodes = new HashMap<String, EndpointDescription>(nodes); - processChildren(znode, newNodes, prevNodes); - - // whatever is left in prevNodes now has been removed from Discovery - LOG.debug("processChildren done. Nodes that are missing now and need to be removed: {}", prevNodes.values()); - for (EndpointDescription endpoint : prevNodes.values()) { - EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint); - endpointListener.endpointChanged(event, null); - } - nodes = newNodes; - } - - /** - * Iterates through all child nodes of the given node and tries to find - * endpoints. If the recursive flag is set it also traverses into the child - * nodes. - * - * @return true if an endpoint was found and if the node therefore needs to - * be monitored for changes - */ - private boolean processChildren(String zn, Map<String, EndpointDescription> newNodes, - Map<String, EndpointDescription> prevNodes) { - List<String> children; - try { - LOG.debug("Processing the children of {}", zn); - children = zk.getChildren(zn, false); - - boolean foundANode = false; - for (String child : children) { - String childZNode = zn + '/' + child; - EndpointDescription endpoint = getEndpointDescriptionFromNode(childZNode); - if (endpoint != null) { - EndpointDescription prevEndpoint = prevNodes.get(child); - - newNodes.put(child, endpoint); - prevNodes.remove(child); - foundANode = true; - LOG.debug("Properties: {}", endpoint.getProperties()); - if (prevEndpoint == null) { - // This guy is new - LOG.info("found new node " + zn + "/[" + child + "] ( []->child ) props: " - + endpoint.getProperties().values()); - EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); - endpointListener.endpointChanged(event, null); - } else if (!prevEndpoint.getProperties().equals(endpoint.getProperties())) { - LOG.info("Found changed node " + zn + "/[" + child + "] ( []->child ) props: " - + endpoint.getProperties().values()); - EndpointEvent event = new EndpointEvent(EndpointEvent.MODIFIED, endpoint); - endpointListener.endpointChanged(event, null); - } - } - if (recursive && processChildren(childZNode, newNodes, prevNodes)) { - zk.getChildren(childZNode, this); - } - } - - return foundANode; - } catch (KeeperException e) { - LOG.error("Problem processing ZooKeeper node", e); - } catch (InterruptedException e) { - LOG.error("Problem processing ZooKeeper node", e); - } - return false; - } - - /** - * Retrieves data from the given node and parses it into an EndpointDescription. - * - * @param node a node path - * @return endpoint found in the node or null if no endpoint was found - */ - private EndpointDescription getEndpointDescriptionFromNode(String node) { - try { - Stat stat = zk.exists(node, false); - if (stat == null || stat.getDataLength() <= 0) { - return null; - } - byte[] data = zk.getData(node, false, null); - LOG.debug("Got data for node: {}", node); - - EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data)); - if (endpoint != null) { - return endpoint; - } - LOG.warn("No Discovery information found for node: {}", node); - } catch (Exception e) { - LOG.error("Problem getting EndpointDescription from node " + node, e); - } - return null; - } -} http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java index 0aa98b3..7b5c7d2 100644 --- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java +++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManager.java @@ -18,23 +18,13 @@ */ package org.apache.aries.rsa.discovery.zookeeper.subscribe; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Dictionary; import java.util.HashMap; -import java.util.Hashtable; import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.apache.aries.rsa.util.StringPlus; -import org.apache.zookeeper.ZooKeeper; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Filter; import org.osgi.framework.ServiceReference; import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEvent; @@ -50,165 +40,95 @@ import org.slf4j.LoggerFactory; * These events are then forwarded to all interested EndpointEventListeners. */ @SuppressWarnings({"deprecation", "rawtypes"}) -public class InterfaceMonitorManager { +public class InterfaceMonitorManager implements EndpointEventListener { private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class); - private static final Pattern OBJECTCLASS_PATTERN = Pattern.compile(".*\\(objectClass=([^)]+)\\).*"); - private final BundleContext bctx; - private final ZooKeeper zk; - // map of EndpointEventListeners and the scopes they are interested in - private final Map<ServiceReference, List<String>> epListenerScopes = - new HashMap<ServiceReference, List<String>>(); - // map of scopes and their interest data - private final Map<String, Interest> interests = new HashMap<String, Interest>(); + private final ZookeeperEndpointRepository repository; + private final Map<ServiceReference, Interest> interests = new HashMap<ServiceReference, Interest>(); protected static class Interest { - List<ServiceReference> epListeners = - new CopyOnWriteArrayList<ServiceReference>(); - InterfaceMonitor monitor; + List<String> scopes; + Object epListener; } - public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) { - this.bctx = bctx; - this.zk = zk; + public InterfaceMonitorManager(ZookeeperEndpointRepository repository) { + this.repository = repository; } - public void addInterest(ServiceReference<?> eplistener) { - if (isOurOwnEndpointEventListener(eplistener)) { + public void addInterest(ServiceReference<?> sref, Object epListener) { + if (isOurOwnEndpointEventListener(sref)) { LOG.debug("Skipping our own EndpointEventListener"); return; } - List<String> scopes = getScopes(eplistener); + List<String> scopes = getScopes(sref); LOG.debug("adding Interests: {}", scopes); - for (String scope : scopes) { - String objClass = getObjectClass(scope); - addInterest(eplistener, scope, objClass); - } - } - - private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) { - return Boolean.parseBoolean(String.valueOf( - EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); - } - - public synchronized void addInterest(ServiceReference epListener, - String scope, String objClass) { // get or create interest for given scope and add listener to it - Interest interest = interests.get(scope); + Interest interest = interests.get(epListener); if (interest == null) { // create interest, add listener and start monitor interest = new Interest(); - interests.put(scope, interest); - interest.epListeners.add(epListener); // add it before monitor starts so we don't miss events - interest.monitor = createInterfaceMonitor(scope, objClass, interest); - interest.monitor.start(); - } else { - // interest already exists, so just add listener to it - if (!interest.epListeners.contains(epListener)) { - interest.epListeners.add(epListener); - } - // notify listener of all known endpoints for given scope - // (as EndpointEventListener contract requires of all added/modified listeners) - for (EndpointDescription endpoint : interest.monitor.getEndpoints()) { - EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); - notifyListeners(event, scope, Arrays.asList(epListener)); - } - } - - // add scope to listener's scopes list - List<String> scopes = epListenerScopes.get(epListener); - if (scopes == null) { - scopes = new ArrayList<String>(1); - epListenerScopes.put(epListener, scopes); - } - if (!scopes.contains(scope)) { - scopes.add(scope); + interest.epListener = epListener; + interest.scopes = scopes; + interests.put(sref, interest); + sendExistingEndpoints(scopes, epListener); } } - public synchronized void removeInterest(ServiceReference<EndpointEventListener> EndpointEventListener) { - LOG.info("removing EndpointEventListener interests: {}", EndpointEventListener); - List<String> scopes = epListenerScopes.get(EndpointEventListener); - if (scopes == null) { - return; + private void sendExistingEndpoints(List<String> scopes, Object epListener) { + for (EndpointDescription endpoint : repository.getAll()) { + EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint); + notifyListener(event, scopes, epListener); } + } - for (String scope : scopes) { - Interest interest = interests.get(scope); - if (interest != null) { - interest.epListeners.remove(EndpointEventListener); - if (interest.epListeners.isEmpty()) { - interest.monitor.close(); - interests.remove(scope); - } - } - } - epListenerScopes.remove(EndpointEventListener); + private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) { + return Boolean.parseBoolean(String.valueOf( + EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID))); } - protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) { - // holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor - EndpointEventListener listener = new EndpointEventListener() { + public synchronized void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) { + LOG.info("removing EndpointEventListener interests: {}", epListenerRef); + interests.remove(epListenerRef); + } - @Override - public void endpointChanged(EndpointEvent event, String filter) { - notifyListeners(event, scope, interest.epListeners); - } - }; - return new InterfaceMonitor(zk, objClass, listener, scope); + @Override + public void endpointChanged(EndpointEvent event, String filter) { + for (Interest interest : interests.values()) { + notifyListener(event, interest.scopes, interest.epListener); + } } - private void notifyListeners(EndpointEvent event, String currentScope, - List<ServiceReference> epListeners) { + private void notifyListener(EndpointEvent event, List<String> scopes, Object service) { EndpointDescription endpoint = event.getEndpoint(); - for (ServiceReference<?> epListenerRef : epListeners) { - if (epListenerRef.getBundle() == null) { - LOG.info("listening service was unregistered, ignoring"); - } - Object service = bctx.getService(epListenerRef); - LOG.trace("matching {} against {}", endpoint, currentScope); - if (matchFilter(bctx, currentScope, endpoint)) { - LOG.debug("Matched {} against {}", endpoint, currentScope); - try { - if (service instanceof EndpointEventListener) { - EndpointEventListener epeListener = (EndpointEventListener)service; - notifyListener(event, currentScope, epeListener); - } else if (service instanceof EndpointListener) { - EndpointListener epListener = (EndpointListener)service; - notifyListener(event, currentScope, epListener); - } - } finally { - if (service != null) { - bctx.ungetService(epListenerRef); - } - } - } + String currentScope = getFirstMatch(scopes, endpoint); + if (currentScope == null) { + return; } - } - - private static boolean matchFilter(BundleContext bctx, String filter, EndpointDescription endpoint) { - if (filter == null) { - return false; + LOG.debug("Matched {} against {}", endpoint, currentScope); + if (service instanceof EndpointEventListener) { + notifyEEListener(event, currentScope, (EndpointEventListener)service); + } else if (service instanceof EndpointListener) { + notifyEListener(event, currentScope, (EndpointListener)service); } + } - try { - Filter f = bctx.createFilter(filter); - Dictionary<String, Object> dict = new Hashtable<String, Object>(endpoint.getProperties()); - return f.match(dict); - } catch (Exception e) { - return false; + private String getFirstMatch(List<String> scopes, EndpointDescription endpoint) { + for (String scope : scopes) { + if (endpoint.matches(scope)) { + return scope; + } } + return null; } - - private void notifyListener(EndpointEvent event, String currentScope, EndpointEventListener listener) { + private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) { EndpointDescription endpoint = event.getEndpoint(); LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); listener.endpointChanged(event, currentScope); } - private void notifyListener(EndpointEvent event, String currentScope, EndpointListener listener) { + private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) { EndpointDescription endpoint = event.getEndpoint(); LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ", listener, currentScope, endpoint); switch (event.getType()) { @@ -228,49 +148,18 @@ public class InterfaceMonitorManager { } public synchronized void close() { - for (Interest interest : interests.values()) { - interest.monitor.close(); - } interests.clear(); - epListenerScopes.clear(); } /** * Only for test case! */ - protected synchronized Map<String, Interest> getInterests() { + protected synchronized Map<ServiceReference, Interest> getInterests() { return interests; } - /** - * Only for test case! - */ - protected synchronized Map<ServiceReference, List<String>> getEndpointListenerScopes() { - return epListenerScopes; - } - protected List<String> getScopes(ServiceReference<?> sref) { return StringPlus.normalize(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)); } - - public static String getObjectClass(String scope) { - Matcher m = OBJECTCLASS_PATTERN.matcher(scope); - return m.matches() ? m.group(1) : null; - } - /** - * Returns a service's properties as a map. - * - * @param serviceReference a service reference - * @return the service's properties as a map - */ - public static Map<String, Object> getProperties(ServiceReference<?> serviceReference) { - String[] keys = serviceReference.getPropertyKeys(); - Map<String, Object> props = new HashMap<String, Object>(keys.length); - for (String key : keys) { - Object val = serviceReference.getProperty(key); - props.put(key, val); - } - return props; - } } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java index 3a20f5a..d9f23e6 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java @@ -1,15 +1,22 @@ package org.apache.aries.rsa.discovery.zookeeper.repository; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.samePropertyValuesAs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -32,6 +39,7 @@ public class ZookeeperEndpointRepositoryTest { private ZooKeeperServer server; private ZooKeeper zk; private ServerCnxnFactory factory; + private List<EndpointEvent> events = new ArrayList<>(); @Before public void before() throws IOException, InterruptedException, KeeperException { @@ -62,25 +70,38 @@ public class ZookeeperEndpointRepositoryTest { @After public void after() throws InterruptedException { - zk.close(); + //zk.close(); // Seems to cause SessionTimeout error factory.shutdown(); } @Test public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException { + final Semaphore sem = new Semaphore(0); EndpointEventListener listener = new EndpointEventListener() { @Override public void endpointChanged(EndpointEvent event, String filter) { - System.out.println(event); + events.add(event); + sem.release(); } }; ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk, listener); + EndpointDescription endpoint = createEndpoint(); repository.add(endpoint); + assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true)); + String path = "/osgi/service_registry/java/lang/Runnable/test.de#-1##service1"; EndpointDescription ep2 = repository.read(path); + assertNotNull(ep2); + + repository.remove(endpoint); + + assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true)); + assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint))); + assertThat(events.get(1), samePropertyValuesAs(new EndpointEvent(EndpointEvent.REMOVED, endpoint))); + repository.close(); } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java index 84eca09..41b0795 100644 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java +++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java @@ -18,95 +18,68 @@ */ package org.apache.aries.rsa.discovery.zookeeper.subscribe; -import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; -import java.util.Collections; -import java.util.Dictionary; -import java.util.Hashtable; +import java.util.ArrayList; import java.util.List; -import org.apache.zookeeper.ZooKeeper; +import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery; +import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; import org.easymock.EasyMock; -import org.easymock.IAnswer; import org.easymock.IMocksControl; import org.junit.Test; -import org.osgi.framework.BundleContext; -import org.osgi.framework.Constants; import org.osgi.framework.ServiceReference; +import org.osgi.service.remoteserviceadmin.EndpointDescription; import org.osgi.service.remoteserviceadmin.EndpointEventListener; public class InterfaceMonitorManagerTest { @Test public void testEndpointListenerTrackerCustomizer() { - IMocksControl c = EasyMock.createNiceControl(); - BundleContext ctx = c.createMock(BundleContext.class); - ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)", "mine"); - ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)", "mine"); - ZooKeeper zk = c.createMock(ZooKeeper.class); - InterfaceMonitorManager eltc = new InterfaceMonitorManager(ctx, zk); + IMocksControl c = EasyMock.createControl(); + ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)"); + ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)"); + ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class); + List<EndpointDescription> endpoints = new ArrayList<>(); + expect(repository.getAll()).andReturn(endpoints).atLeastOnce(); + EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); + EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); c.replay(); + InterfaceMonitorManager eltc = new InterfaceMonitorManager(repository); // sref has no scope -> nothing should happen - assertEquals(0, eltc.getEndpointListenerScopes().size()); assertEquals(0, eltc.getInterests().size()); - eltc.addInterest(sref); - assertScopeIncludes(sref, eltc); - assertEquals(1, eltc.getEndpointListenerScopes().size()); - assertEquals(1, eltc.getInterests().size()); - eltc.addInterest(sref); - assertScopeIncludes(sref, eltc); - assertEquals(1, eltc.getEndpointListenerScopes().size()); + eltc.addInterest(sref, epListener1); assertEquals(1, eltc.getInterests().size()); - eltc.addInterest(sref2); - assertScopeIncludes(sref, eltc); - assertScopeIncludes(sref2, eltc); - assertEquals(2, eltc.getEndpointListenerScopes().size()); + eltc.addInterest(sref, epListener1); assertEquals(1, eltc.getInterests().size()); + eltc.addInterest(sref2, epListener2); + assertEquals(2, eltc.getInterests().size()); + eltc.removeInterest(sref); - assertScopeIncludes(sref2, eltc); - assertEquals(1, eltc.getEndpointListenerScopes().size()); assertEquals(1, eltc.getInterests().size()); eltc.removeInterest(sref); - assertScopeIncludes(sref2, eltc); - assertEquals(1, eltc.getEndpointListenerScopes().size()); assertEquals(1, eltc.getInterests().size()); eltc.removeInterest(sref2); - assertEquals(0, eltc.getEndpointListenerScopes().size()); assertEquals(0, eltc.getInterests().size()); c.verify(); } @SuppressWarnings("unchecked") - private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope, String objectClass) { + private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) { ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class); - final Dictionary<String, String> props = new Hashtable<>(); - props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope); - props.put(Constants.OBJECTCLASS, objectClass); - String[] keys = Collections.list(props.keys()).toArray(new String[]{}); - EasyMock.expect(sref.getPropertyKeys()).andReturn(keys).anyTimes(); - EasyMock.expect(sref.getProperty((String)EasyMock.anyObject())).andAnswer(new IAnswer<Object>() { - public Object answer() throws Throwable { - return props.get(getCurrentArguments()[0]); - } - }).anyTimes(); + expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce(); + expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce(); return sref; } - private void assertScopeIncludes(ServiceReference<EndpointEventListener> sref, InterfaceMonitorManager imm) { - List<String> srefScope = imm.getEndpointListenerScopes().get(sref); - assertEquals(1, srefScope.size()); - assertEquals("(objectClass=mine)", srefScope.get(0)); - - } - } http://git-wip-us.apache.org/repos/asf/aries-rsa/blob/ca922a42/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java ---------------------------------------------------------------------- diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java deleted file mode 100644 index e09cfbf..0000000 --- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorTest.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.aries.rsa.discovery.zookeeper.subscribe; - -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; - -import java.util.Collections; - -import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.AsyncCallback.DataCallback; -import org.apache.zookeeper.data.Stat; -import org.easymock.EasyMock; -import org.easymock.IMocksControl; -import org.osgi.service.remoteserviceadmin.EndpointEventListener; - -import junit.framework.TestCase; - -public class InterfaceMonitorTest extends TestCase { - - public void testInterfaceMonitor() throws KeeperException, InterruptedException { - IMocksControl c = EasyMock.createControl(); - - ZooKeeper zk = c.createMock(ZooKeeper.class); - expect(zk.getState()).andReturn(ZooKeeper.States.CONNECTED).anyTimes(); - - String scope = "(myProp=test)"; - String interf = "es.schaaf.test"; - String node = ZookeeperEndpointRepository.getZooKeeperPath(interf); - - EndpointEventListener endpointListener = c.createMock(EndpointEventListener.class); - InterfaceMonitor im = new InterfaceMonitor(zk, interf, endpointListener, scope); - zk.exists(eq(node), eq(im), eq(im), EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - zk.getData(eq(node), eq(im), EasyMock.anyObject(DataCallback.class), EasyMock.anyObject()); - expectLastCall(); - - expect(zk.exists(eq(node), eq(false))).andReturn(new Stat()).anyTimes(); - expect(zk.getChildren(eq(node), eq(false))).andReturn(Collections.<String> emptyList()).once(); - expect(zk.getChildren(eq(node), eq(im))).andReturn(Collections.<String> emptyList()).once(); - - c.replay(); - im.start(); - // simulate a zk callback - WatchedEvent we = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, node); - im.process(we); - c.verify(); - } -}