dlmarion commented on code in PR #5256: URL: https://github.com/apache/accumulo/pull/5256#discussion_r1928890082
########## core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java: ########## @@ -143,65 +141,106 @@ public void process(WatchedEvent event) { clear(); break; default: - log.warn("{} Unhandled {}", cacheId, event); + log.warn("{} Unhandled state {}", cacheId, event); break; } break; default: - log.warn("{} Unhandled {}", cacheId, event); + log.warn("{} Unhandled event type {}", cacheId, event); break; } - externalWatcher.ifPresent(w -> w.accept(event)); + externalWatchers.forEach(ew -> ew.accept(event)); } } /** - * Creates a new cache without an external watcher. + * Creates a ZooCache instance that uses the supplied ZooSession for communicating with the + * instance's ZooKeeper servers. The ZooCache will create persistent watchers at the given + * pathsToWatch, if any, to be updated when changes are made in ZooKeeper for nodes at or below in + * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, then they will be + * notified when this object is notified of changes via the PersistentWatcher callback. * - * @param zk the ZooKeeper instance - * @throws NullPointerException if zk is {@code null} + * @param zk ZooSession for this instance + * @param pathsToWatch Paths in ZooKeeper to watch */ - public ZooCache(ZooSession zk) { - this(zk, Optional.empty(), Duration.ofMinutes(3)); + public ZooCache(ZooSession zk, Set<String> pathsToWatch) { + this(zk, pathsToWatch, Ticker.systemTicker()); } - /** - * Creates a new cache. The given watcher is called whenever a watched node changes. - * - * @param zk the ZooKeeper instance - * @param watcher watcher object - * @throws NullPointerException if zk or watcher is {@code null} - */ - public ZooCache(ZooSession zk, ZooCacheWatcher watcher) { - this(zk, Optional.of(watcher), Duration.ofMinutes(3)); + // visible for tests that use a Ticker + public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) { + this.zk = requireNonNull(zk); + this.zkClientTracker.set(this.getZKClientObjectVersion()); + this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) + .ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build(); + // The concurrent map returned by Caffiene will only allow one thread to run at a time for a + // given key and ZooCache relies on that. Not all concurrent map implementations have this + // behavior for their compute functions. + this.nodeCache = cache.asMap(); + setupWatchers(pathsToWatch); + log.trace("{} created new cache", cacheId, new Exception()); } - public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, Duration timeout) { - this.zk = requireNonNull(zk); - this.externalWatcher = watcher; - RemovalListener<String,ZcNode> removalListerner = (path, zcNode, reason) -> { - try { - log.trace("{} removing watches for {} because {} accesses {}", cacheId, path, reason, - zcNode == null ? -1 : zcNode.getAccessCount()); - zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, false); - } catch (InterruptedException | KeeperException | RuntimeException e) { - log.warn("{} failed to remove watches on path {} in zookeeper", cacheId, path, e); + public void addZooCacheWatcher(ZooCacheWatcher watcher) { + externalWatchers.add(requireNonNull(watcher)); + } + + // visible for tests + long getZKClientObjectVersion() { + return zk.getConnectionCounter(); + } + + private boolean handleZKConnectionChange() { Review Comment: Added in a481f9b -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org