keith-turner commented on code in PR #5256:
URL: https://github.com/apache/accumulo/pull/5256#discussion_r1927223793


##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -119,6 +124,7 @@ private static void digestAuth(ZooKeeper zoo, String 
secret) {
   private final String sessionName;
   private final int timeout;
   private final ZooReaderWriter zrw;
+  private final Map<String,Watcher> persistentWatcherPaths = new HashMap<>();

Review Comment:
   Its possible that different ZooCaches or other future code add the same path 
w/ different Watcher instances.  W/ this Map the last one to add wins removing 
the previous additions. Maybe could track the added watches in the following 
way?
   
   ```suggestion
     private static class AddedWatches {
         private final Set<String> paths;
         private final Watcher watcher;
     }
     private final List<AddedWatches> persistentWatcherPaths = new 
ArrayList<>();
   ```
   
   Could also do something like the following maybe, however this make 
assumptions about hashCode and equals on the watcher that may not be valid.
   
   ```suggestion
     private final Map<Watcher,List<String>> persistentWatcherPaths = new 
HashMap<>();
   ```
   
   
   
   



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -290,9 +318,19 @@ public void sync(final String path, VoidCallback cb, 
Object ctx) {
     verifyConnected().sync(path, cb, ctx);
   }
 
+  public void addPersistentRecursiveWatchers(Set<String> paths, Watcher 
watcher)
+      throws KeeperException, InterruptedException {
+    for (String path : paths) {
+      verifyConnected().addWatch(path, watcher, 
AddWatchMode.PERSISTENT_RECURSIVE);
+      persistentWatcherPaths.put(path, watcher);
+      log.debug("Added persistent recursive watcher at {}", path);
+    }
+  }

Review Comment:
   During this loop the zookeepers could potentially change multiple times and 
the function could run concurrently w/ reconnect. That is making it harder to 
reason about these changes, could sync the method to prevent it from running at 
the same time as reconnect.
   
   ```suggestion
     public synchronized void addPersistentRecursiveWatchers(Set<String> paths, 
Watcher watcher)
         throws KeeperException, InterruptedException {
       // this is not really needed if the method is synchronized, just changed 
it for code clarity.
       var zk = verifyConnected();
       for (String path : paths) {
         zk.addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
         persistentWatcherPaths.put(path, watcher);
         log.debug("Added persistent recursive watcher at {}", path);
       }
     }
   ```



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -188,6 +194,28 @@ private synchronized ZooKeeper reconnect() {
               digestAuth(zk, instanceSecret);
             }
             tryAgain = false;
+            if (!persistentWatcherPaths.isEmpty()) {
+              // We need to wait until the connection is alive, else we run 
into
+              // a case where addPersistentRecursiveWatchers calls 
verifyConnected
+              // which calls reconnect.
+              do {
+                UtilWaitThread.sleep(100);
+              } while (!zk.getState().isAlive());
+              for (Entry<String,Watcher> entry : 
persistentWatcherPaths.entrySet()) {
+                try {
+                  addPersistentRecursiveWatchers(Set.of(entry.getKey()), 
entry.getValue());
+                } catch (KeeperException e) {
+                  log.error("Error setting persistent recursive watcher at " + 
entry.getKey(), e);
+                  tryAgain = true;
+                  break;
+                } catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+                  log.error("Interrupted setting persistent recursive watcher 
at " + entry.getKey(),
+                      e);
+                  tryAgain = true;
+                }
+              }

Review Comment:
   Somehow after successfully adding new watches the cache must be cleared to 
avoid a situation like the following from causing problems.
   
    1. Zoocache is created with ZK1
    2. Something bad happens to ZK1 and it becomes non-functional
    3. The process idles for X seconds during which time many changes are made 
on zookeeper servers that the process is completely unaware of.
    4. Eventually Zk2 is created and watches are added.
    5. The changes that happened during step 3 were not cleared from the cache 
and may never be cleared.
   
   Always clearing the cache after successfully adding all watches to a single 
ZK should avoid the problem above.  This ensures that anything that happened 
while no watches were set will eventually be seen.  Clearing the caches after 
all watches are set avoids race conditions, if the caches was cleared prior to 
readding watches then there could be race condtions.
   
   The current code has handling to clear caches on events from ZK1, but these 
may happen before the watches are set.   However, zoosession has 
synchronization that may avoid problems w/ the clearing happening before the 
watches are set.  So the current code may be correct, but I am having a hard 
time proving that to myself.
   
   It would be hacky, but I am wondering if we should go through all the 
watchers and send an event like Expired, Closed etc.  Or maybe we can create a 
new interface like the following
   
   ```java
   interface PersistentWatcher extends Watcher {
      void zookeeperChanged();
   }
   
   public void addPersistentRecursiveWatchers(Set<String> paths, 
PersistentWatcher watcher)
   ```
    then the reconnect method could call the above method after it changes 
zookeepers.
   
   



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooCache.java:
##########
@@ -59,68 +63,59 @@ public interface ZooCacheWatcher extends 
Consumer<WatchedEvent> {}
 
   private static final Logger log = LoggerFactory.getLogger(ZooCache.class);
 
-  private final ZCacheWatcher watcher = new ZCacheWatcher();
-  private final Optional<ZooCacheWatcher> externalWatcher;
+  protected volatile NavigableSet<String> watchedPaths =
+      Collections.unmodifiableNavigableSet(new TreeSet<>());
+  // visible for tests
+  protected final ZCacheWatcher watcher = new ZCacheWatcher();
+  private final List<ZooCacheWatcher> externalWatchers =
+      Collections.synchronizedList(new ArrayList<>());
 
   private static final AtomicLong nextCacheId = new AtomicLong(0);
   private final String cacheId = "ZC" + nextCacheId.incrementAndGet();
 
-  // The concurrent map returned by Caffiene will only allow one thread to run 
at a time for a given
-  // key and ZooCache relies on that. Not all concurrent map implementations 
have this behavior for
-  // their compute functions.
+  public static final Duration CACHE_DURATION = Duration.ofMinutes(30);
+
+  private final Cache<String,ZcNode> cache;
+
   private final ConcurrentMap<String,ZcNode> nodeCache;
 
   private final ZooSession zk;
 
   private volatile boolean closed = false;
 
-  public static class ZcStat {
-    private long ephemeralOwner;
-    private long mzxid;
-
-    public ZcStat() {}
-
-    private ZcStat(Stat stat) {
-      this.ephemeralOwner = stat.getEphemeralOwner();
-      this.mzxid = stat.getMzxid();
-    }
-
-    public long getEphemeralOwner() {
-      return ephemeralOwner;
-    }
-
-    private void set(ZcStat cachedStat) {
-      this.ephemeralOwner = cachedStat.ephemeralOwner;
-      this.mzxid = cachedStat.mzxid;
-    }
-
-    @VisibleForTesting
-    public void setEphemeralOwner(long ephemeralOwner) {
-      this.ephemeralOwner = ephemeralOwner;
-    }
-
-    public long getMzxid() {
-      return mzxid;
-    }
-  }
-
   private final AtomicLong updateCount = new AtomicLong(0);
 
-  private class ZCacheWatcher implements Watcher {
+  class ZCacheWatcher implements Watcher {
     @Override
     public void process(WatchedEvent event) {
       if (log.isTraceEnabled()) {
         log.trace("{}: {}", cacheId, event);
       }
 
       switch (event.getType()) {
-        case NodeDataChanged:
         case NodeChildrenChanged:
-        case NodeCreated:
-        case NodeDeleted:
+          // According to documentation we should not receive this event.
+          // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 
we
+          // may receive this event (Fixed in 3.9.0)
+          break;
         case ChildWatchRemoved:
         case DataWatchRemoved:
-          remove(event.getPath());
+          // We don't need to do anything with the cache on these events.
+          break;
+        case NodeDataChanged:
+          log.trace("{} node data changed; clearing {}", cacheId, 
event.getPath());
+          clear(path -> path.equals(event.getPath()));
+          break;
+        case NodeCreated:
+        case NodeDeleted:
+          // With the Watcher being set at a higher level we need to remove
+          // the parent of the affected node and all of its children from the 
cache
+          // so that the parent and children node can be re-cached. If we only 
remove the
+          // affected node, then the cached children in the parent could be 
incorrect.
+          int lastSlash = event.getPath().lastIndexOf('/');
+          String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, 
lastSlash);
+          log.trace("{} node created or deleted {}; clearing {}", cacheId, 
event.getPath(), parent);
+          clear((path) -> path.startsWith(parent));

Review Comment:
   A bit further down there is code that does nothing on a close event and has 
a comment about why.  The reason in the comment is no longer valid, so the 
comment should be removed.  Also should probably clear the cache on a closed 
event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to