keith-turner commented on code in PR #5256: URL: https://github.com/apache/accumulo/pull/5256#discussion_r1927223793
########## core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java: ########## @@ -119,6 +124,7 @@ private static void digestAuth(ZooKeeper zoo, String secret) { private final String sessionName; private final int timeout; private final ZooReaderWriter zrw; + private final Map<String,Watcher> persistentWatcherPaths = new HashMap<>(); Review Comment: Its possible that different ZooCaches or other future code add the same path w/ different Watcher instances. W/ this Map the last one to add wins removing the previous additions. Maybe could track the added watches in the following way? ```suggestion private static class AddedWatches { private final Set<String> paths; private final Watcher watcher; } private final List<AddedWatches> persistentWatcherPaths = new ArrayList<>(); ``` Could also do something like the following maybe, however this make assumptions about hashCode and equals on the watcher that may not be valid. ```suggestion private final Map<Watcher,List<String>> persistentWatcherPaths = new HashMap<>(); ``` ########## core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java: ########## @@ -290,9 +318,19 @@ public void sync(final String path, VoidCallback cb, Object ctx) { verifyConnected().sync(path, cb, ctx); } + public void addPersistentRecursiveWatchers(Set<String> paths, Watcher watcher) + throws KeeperException, InterruptedException { + for (String path : paths) { + verifyConnected().addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE); + persistentWatcherPaths.put(path, watcher); + log.debug("Added persistent recursive watcher at {}", path); + } + } Review Comment: During this loop the zookeepers could potentially change multiple times and the function could run concurrently w/ reconnect. That is making it harder to reason about these changes, could sync the method to prevent it from running at the same time as reconnect. ```suggestion public synchronized void addPersistentRecursiveWatchers(Set<String> paths, Watcher watcher) throws KeeperException, InterruptedException { // this is not really needed if the method is synchronized, just changed it for code clarity. var zk = verifyConnected(); for (String path : paths) { zk.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE); persistentWatcherPaths.put(path, watcher); log.debug("Added persistent recursive watcher at {}", path); } } ``` ########## core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java: ########## @@ -188,6 +194,28 @@ private synchronized ZooKeeper reconnect() { digestAuth(zk, instanceSecret); } tryAgain = false; + if (!persistentWatcherPaths.isEmpty()) { + // We need to wait until the connection is alive, else we run into + // a case where addPersistentRecursiveWatchers calls verifyConnected + // which calls reconnect. + do { + UtilWaitThread.sleep(100); + } while (!zk.getState().isAlive()); + for (Entry<String,Watcher> entry : persistentWatcherPaths.entrySet()) { + try { + addPersistentRecursiveWatchers(Set.of(entry.getKey()), entry.getValue()); + } catch (KeeperException e) { + log.error("Error setting persistent recursive watcher at " + entry.getKey(), e); + tryAgain = true; + break; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Interrupted setting persistent recursive watcher at " + entry.getKey(), + e); + tryAgain = true; + } + } Review Comment: Somehow after successfully adding new watches the cache must be cleared to avoid a situation like the following from causing problems. 1. Zoocache is created with ZK1 2. Something bad happens to ZK1 and it becomes non-functional 3. The process idles for X seconds during which time many changes are made on zookeeper servers that the process is completely unaware of. 4. Eventually Zk2 is created and watches are added. 5. The changes that happened during step 3 were not cleared from the cache and may never be cleared. Always clearing the cache after successfully adding all watches to a single ZK should avoid the problem above. This ensures that anything that happened while no watches were set will eventually be seen. Clearing the caches after all watches are set avoids race conditions, if the caches was cleared prior to readding watches then there could be race condtions. The current code has handling to clear caches on events from ZK1, but these may happen before the watches are set. However, zoosession has synchronization that may avoid problems w/ the clearing happening before the watches are set. So the current code may be correct, but I am having a hard time proving that to myself. It would be hacky, but I am wondering if we should go through all the watchers and send an event like Expired, Closed etc. Or maybe we can create a new interface like the following ```java interface PersistentWatcher extends Watcher { void zookeeperChanged(); } public void addPersistentRecursiveWatchers(Set<String> paths, PersistentWatcher watcher) ``` then the reconnect method could call the above method after it changes zookeepers. ########## core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java: ########## @@ -59,68 +63,59 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {} private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private final ZCacheWatcher watcher = new ZCacheWatcher(); - private final Optional<ZooCacheWatcher> externalWatcher; + protected volatile NavigableSet<String> watchedPaths = + Collections.unmodifiableNavigableSet(new TreeSet<>()); + // visible for tests + protected final ZCacheWatcher watcher = new ZCacheWatcher(); + private final List<ZooCacheWatcher> externalWatchers = + Collections.synchronizedList(new ArrayList<>()); private static final AtomicLong nextCacheId = new AtomicLong(0); private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); - // The concurrent map returned by Caffiene will only allow one thread to run at a time for a given - // key and ZooCache relies on that. Not all concurrent map implementations have this behavior for - // their compute functions. + public static final Duration CACHE_DURATION = Duration.ofMinutes(30); + + private final Cache<String,ZcNode> cache; + private final ConcurrentMap<String,ZcNode> nodeCache; private final ZooSession zk; private volatile boolean closed = false; - public static class ZcStat { - private long ephemeralOwner; - private long mzxid; - - public ZcStat() {} - - private ZcStat(Stat stat) { - this.ephemeralOwner = stat.getEphemeralOwner(); - this.mzxid = stat.getMzxid(); - } - - public long getEphemeralOwner() { - return ephemeralOwner; - } - - private void set(ZcStat cachedStat) { - this.ephemeralOwner = cachedStat.ephemeralOwner; - this.mzxid = cachedStat.mzxid; - } - - @VisibleForTesting - public void setEphemeralOwner(long ephemeralOwner) { - this.ephemeralOwner = ephemeralOwner; - } - - public long getMzxid() { - return mzxid; - } - } - private final AtomicLong updateCount = new AtomicLong(0); - private class ZCacheWatcher implements Watcher { + class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { log.trace("{}: {}", cacheId, event); } switch (event.getType()) { - case NodeDataChanged: case NodeChildrenChanged: - case NodeCreated: - case NodeDeleted: + // According to documentation we should not receive this event. + // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we + // may receive this event (Fixed in 3.9.0) + break; case ChildWatchRemoved: case DataWatchRemoved: - remove(event.getPath()); + // We don't need to do anything with the cache on these events. + break; + case NodeDataChanged: + log.trace("{} node data changed; clearing {}", cacheId, event.getPath()); + clear(path -> path.equals(event.getPath())); + break; + case NodeCreated: + case NodeDeleted: + // With the Watcher being set at a higher level we need to remove + // the parent of the affected node and all of its children from the cache + // so that the parent and children node can be re-cached. If we only remove the + // affected node, then the cached children in the parent could be incorrect. + int lastSlash = event.getPath().lastIndexOf('/'); + String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash); + log.trace("{} node created or deleted {}; clearing {}", cacheId, event.getPath(), parent); + clear((path) -> path.startsWith(parent)); Review Comment: A bit further down there is code that does nothing on a close event and has a comment about why. The reason in the comment is no longer valid, so the comment should be removed. Also should probably clear the cache on a closed event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org