keith-turner commented on code in PR #5256:
URL: https://github.com/apache/accumulo/pull/5256#discussion_r1928931295


##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java:
##########
@@ -143,65 +141,107 @@ public void process(WatchedEvent event) {
               clear();
               break;
             default:
-              log.warn("{} Unhandled {}", cacheId, event);
+              log.warn("{} Unhandled state {}", cacheId, event);
               break;
           }
           break;
         default:
-          log.warn("{} Unhandled {}", cacheId, event);
+          log.warn("{} Unhandled event type {}", cacheId, event);
           break;
       }
 
-      externalWatcher.ifPresent(w -> w.accept(event));
+      externalWatchers.forEach(ew -> ew.accept(event));
     }
   }
 
   /**
-   * Creates a new cache without an external watcher.
+   * Creates a ZooCache instance that uses the supplied ZooSession for 
communicating with the
+   * instance's ZooKeeper servers. The ZooCache will create persistent 
watchers at the given
+   * pathsToWatch, if any, to be updated when changes are made in ZooKeeper 
for nodes at or below in
+   * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, 
then they will be
+   * notified when this object is notified of changes via the 
PersistentWatcher callback.
    *
-   * @param zk the ZooKeeper instance
-   * @throws NullPointerException if zk is {@code null}
+   * @param zk ZooSession for this instance
+   * @param pathsToWatch Paths in ZooKeeper to watch
    */
-  public ZooCache(ZooSession zk) {
-    this(zk, Optional.empty(), Duration.ofMinutes(3));
+  public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
+    this(zk, pathsToWatch, Ticker.systemTicker());
+  }
+
+  // visible for tests that use a Ticker
+  public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
+    this.zk = requireNonNull(zk);
+    this.zkClientTracker.set(this.getZKClientObjectVersion());
+    this.cache = 
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
+        
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
+    // The concurrent map returned by Caffeine will only allow one thread to 
run at a time for a
+    // given key and ZooCache relies on that. Not all concurrent map 
implementations have this
+    // behavior for their compute functions.
+    this.nodeCache = cache.asMap();
+    this.watchedPaths = Collections.unmodifiableNavigableSet(new 
TreeSet<>(pathsToWatch));
+    setupWatchers();
+    log.trace("{} created new cache", cacheId, new Exception());
+  }
+
+  public void addZooCacheWatcher(ZooCacheWatcher watcher) {
+    externalWatchers.add(requireNonNull(watcher));
+  }
+
+  // visible for tests
+  long getZKClientObjectVersion() {
+    return zk.getConnectionCounter();
   }
 
   /**
-   * Creates a new cache. The given watcher is called whenever a watched node 
changes.
-   *
-   * @param zk the ZooKeeper instance
-   * @param watcher watcher object
-   * @throws NullPointerException if zk or watcher is {@code null}
+   * @return true if ZK has changed; false otherwise
    */
-  public ZooCache(ZooSession zk, ZooCacheWatcher watcher) {
-    this(zk, Optional.of(watcher), Duration.ofMinutes(3));
+  private boolean handleZKConnectionChange() {
+    final long currentCount = getZKClientObjectVersion();
+    final long oldCount = zkClientTracker.get();
+    if (oldCount != currentCount) {
+      if (zkClientTracker.compareAndSet(oldCount, currentCount)) {
+        setupWatchers();
+      }
+      return true;
+    }
+    return false;
   }
 
-  public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, Duration 
timeout) {
-    this.zk = requireNonNull(zk);
-    this.externalWatcher = watcher;
-    RemovalListener<String,ZcNode> removalListerner = (path, zcNode, reason) 
-> {
-      try {
-        log.trace("{} removing watches for {} because {} accesses {}", 
cacheId, path, reason,
-            zcNode == null ? -1 : zcNode.getAccessCount());
-        zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, 
false);
-      } catch (InterruptedException | KeeperException | RuntimeException e) {
-        log.warn("{} failed to remove watches on path {} in zookeeper", 
cacheId, path, e);
+  // Called on construction and when ZooKeeper connection changes
+  synchronized void setupWatchers() {
+
+    for (String left : watchedPaths) {
+      for (String right : watchedPaths) {
+        if (!left.equals(right) && left.contains(right)) {
+          throw new IllegalArgumentException(
+              "Overlapping paths found in paths to watch. left: " + left + ", 
right: " + right);
+        }
       }
-    };
-    // Must register the removal listener using evictionListener inorder for 
removal to be mutually
-    // exclusive with any other operations on the same path. This is important 
for watcher
-    // consistency, concurrently adding and removing watches for the same path 
would leave zoocache
-    // in a really bad state. The cache builder has another way to register a 
removal listener that
-    // is not mutually exclusive.
-    Cache<String,ZcNode> cache =
-        Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, 
false)
-            
.expireAfterAccess(timeout).evictionListener(removalListerner).build();
-    nodeCache = cache.asMap();
-    log.trace("{} created new cache", cacheId, new Exception());
+    }
+
+    try {
+      zk.addPersistentRecursiveWatchers(watchedPaths, watcher);
+      clear();

Review Comment:
   ```suggestion
         clear();
         log.trace("{} Reinitialized persistent watchers and cleared cache {}", 
cacheId, getZKClientObjectVersion());
   ```



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java:
##########
@@ -143,65 +141,107 @@ public void process(WatchedEvent event) {
               clear();
               break;
             default:
-              log.warn("{} Unhandled {}", cacheId, event);
+              log.warn("{} Unhandled state {}", cacheId, event);
               break;
           }
           break;
         default:
-          log.warn("{} Unhandled {}", cacheId, event);
+          log.warn("{} Unhandled event type {}", cacheId, event);
           break;
       }
 
-      externalWatcher.ifPresent(w -> w.accept(event));
+      externalWatchers.forEach(ew -> ew.accept(event));
     }
   }
 
   /**
-   * Creates a new cache without an external watcher.
+   * Creates a ZooCache instance that uses the supplied ZooSession for 
communicating with the
+   * instance's ZooKeeper servers. The ZooCache will create persistent 
watchers at the given
+   * pathsToWatch, if any, to be updated when changes are made in ZooKeeper 
for nodes at or below in
+   * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, 
then they will be
+   * notified when this object is notified of changes via the 
PersistentWatcher callback.
    *
-   * @param zk the ZooKeeper instance
-   * @throws NullPointerException if zk is {@code null}
+   * @param zk ZooSession for this instance
+   * @param pathsToWatch Paths in ZooKeeper to watch
    */
-  public ZooCache(ZooSession zk) {
-    this(zk, Optional.empty(), Duration.ofMinutes(3));
+  public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
+    this(zk, pathsToWatch, Ticker.systemTicker());
+  }
+
+  // visible for tests that use a Ticker
+  public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
+    this.zk = requireNonNull(zk);
+    this.zkClientTracker.set(this.getZKClientObjectVersion());
+    this.cache = 
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
+        
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
+    // The concurrent map returned by Caffeine will only allow one thread to 
run at a time for a
+    // given key and ZooCache relies on that. Not all concurrent map 
implementations have this
+    // behavior for their compute functions.
+    this.nodeCache = cache.asMap();
+    this.watchedPaths = Collections.unmodifiableNavigableSet(new 
TreeSet<>(pathsToWatch));
+    setupWatchers();
+    log.trace("{} created new cache", cacheId, new Exception());

Review Comment:
   ```suggestion
       log.trace("{} created new cache watching {}", cacheId, pathsToWatch, new 
Exception());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to