EdColeman commented on code in PR #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r846547866


##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts 
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set / 
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to 
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored 
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive 
active notification
+ * without needed to register / manage ZooKeeper watchers external to the 
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper 
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread 
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+  private static final Logger log = 
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+  private static final ExecutorService executorService =
+      ThreadPools.getServerThreadPools().createFixedThreadPool(2, 
"zoo_change_update", false);
+
+  private final ReentrantReadWriteLock listenerLock = new 
ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock listenerReadLock = 
listenerLock.readLock();
+  private final ReentrantReadWriteLock.WriteLock listenerWriteLock = 
listenerLock.writeLock();
+
+  // access should be guarded by acquiring the listener read or write lock
+  private final Map<PropCacheKey,Set<PropChangeListener>> listeners = new 
HashMap<>();
+
+  private final ReadyMonitor zkReadyMonitor;
+
+  public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+    this.zkReadyMonitor = zkReadyMonitor;
+  }
+
+  public void registerListener(final PropCacheKey propCacheKey, final 
PropChangeListener listener) {
+    listenerWriteLock.lock();
+    try {
+      Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheKey, s 
-> new HashSet<>());
+      set.add(listener);
+    } finally {
+      listenerWriteLock.unlock();
+    }
+  }
+
+  /**
+   * Process a ZooKeeper event. This method does not reset the watcher. 
Subscribers are notified of
+   * the change - if they call get to update and respond to the change the 
watcher will be (re)set
+   * then. This helps clean up watchers by not automatically re-adding the 
watcher on the event but
+   * only if being used.
+   *
+   * @param event
+   *          ZooKeeper event.
+   */
+  @Override
+  public void process(final WatchedEvent event) {
+
+    String path;
+    PropCacheKey propCacheKey;
+    switch (event.getType()) {
+      case NodeDataChanged:
+        path = event.getPath();
+        log.trace("handle change event for path: {}", path);
+        propCacheKey = PropCacheKey.fromPath(path);
+        if (propCacheKey != null) {
+          signalZkChangeEvent(propCacheKey);
+        }
+        break;
+      case NodeDeleted:
+        path = event.getPath();
+        log.trace("handle delete event for path: {}", path);
+        propCacheKey = PropCacheKey.fromPath(path);
+        if (propCacheKey != null) {
+          // notify listeners
+          Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheKey);
+          if (snapshot != null) {
+            executorService
+                .execute(new 
PropStoreEventTask.PropStoreDeleteEventTask(propCacheKey, snapshot));
+          }
+          listenerCleanup(propCacheKey);
+        }
+        break;
+      case None:
+        Event.KeeperState state = event.getState();
+        switch (state) {
+          // pause - could reconnect
+          case ConnectedReadOnly:
+          case Disconnected:
+            log.debug("ZooKeeper disconnected event received");
+            zkReadyMonitor.clearReady();
+            executorService.execute(
+                new 
PropStoreEventTask.PropStoreConnectionEventTask(getAllListenersSnapshot()));

Review Comment:
   I did not think that the code would be sensitive to order. All events 
basically signal the cache to clear the entry.  The cache is not trying to 
respond to each change, it will fetch a changed value on demand when / if the 
next read occurs.  This means that the event order for changes should not 
matter.
   
   In the case of a disconnect - the prop store watcher first sets ready to 
false so any reads because changes will block until ready. And that occurs 
independent of the notification of the changes. Even if there are change 
notifications queued and in-flight, any reads will be blocked. 
   
   In the case of multiple events and reads while other events are in-flight  
Each event may just re-clear a cleared entry - so basically a no-op.
   
   (below, current - final is the stabilized "final" value, until changed again 
by future updates")
   
   In the cases where a clear / read occurs and there are additional events in 
the pipeline each change could trigger a clear and then a potential read. The 
read may not get the values that were set with that specific change event, but 
it will get a value from that event or a value set later. If the value was as 
an intermediate change, subsequent events should re-clear and trigger reads 
until the current, final value is read.  Once it has the current, final value, 
events in the pipeline may be clearing / re-reading the same "current - final"  
value, but the final state of the event chain should always end up with the 
correct, current state.
   
   
   
   
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to