keith-turner commented on code in PR #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r846528435


##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.toSet;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts 
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set / 
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to 
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored 
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive 
active notification
+ * without needed to register / manage ZooKeeper watchers external to the 
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper 
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread 
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+  private static final Logger log = 
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+  private static final ExecutorService executorService =
+      ThreadPools.getServerThreadPools().createFixedThreadPool(2, 
"zoo_change_update", false);
+
+  private final ReentrantReadWriteLock listenerLock = new 
ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock listenerReadLock = 
listenerLock.readLock();
+  private final ReentrantReadWriteLock.WriteLock listenerWriteLock = 
listenerLock.writeLock();
+
+  // access should be guarded by acquiring the listener read or write lock
+  private final Map<PropCacheKey,Set<PropChangeListener>> listeners = new 
HashMap<>();
+
+  private final ReadyMonitor zkReadyMonitor;
+
+  public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+    this.zkReadyMonitor = zkReadyMonitor;
+  }
+
+  public void registerListener(final PropCacheKey propCacheKey, final 
PropChangeListener listener) {
+    listenerWriteLock.lock();
+    try {
+      Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheKey, s 
-> new HashSet<>());
+      set.add(listener);
+    } finally {
+      listenerWriteLock.unlock();
+    }
+  }
+
+  /**
+   * Process a ZooKeeper event. This method does not reset the watcher. 
Subscribers are notified of
+   * the change - if they call get to update and respond to the change the 
watcher will be (re)set
+   * then. This helps clean up watchers by not automatically re-adding the 
watcher on the event but
+   * only if being used.
+   *
+   * @param event
+   *          ZooKeeper event.
+   */
+  @Override
+  public void process(final WatchedEvent event) {
+
+    String path;
+    PropCacheKey propCacheKey;
+    switch (event.getType()) {
+      case NodeDataChanged:
+        path = event.getPath();
+        log.trace("handle change event for path: {}", path);
+        propCacheKey = PropCacheKey.fromPath(path);
+        if (propCacheKey != null) {
+          signalZkChangeEvent(propCacheKey);
+        }
+        break;
+      case NodeDeleted:
+        path = event.getPath();
+        log.trace("handle delete event for path: {}", path);
+        propCacheKey = PropCacheKey.fromPath(path);
+        if (propCacheKey != null) {
+          // notify listeners
+          Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheKey);
+          if (snapshot != null) {
+            executorService
+                .execute(new 
PropStoreEventTask.PropStoreDeleteEventTask(propCacheKey, snapshot));
+          }
+          listenerCleanup(propCacheKey);
+        }
+        break;
+      case None:
+        Event.KeeperState state = event.getState();
+        switch (state) {
+          // pause - could reconnect
+          case ConnectedReadOnly:
+          case Disconnected:
+            log.debug("ZooKeeper disconnected event received");
+            zkReadyMonitor.clearReady();
+            executorService.execute(
+                new 
PropStoreEventTask.PropStoreConnectionEventTask(getAllListenersSnapshot()));

Review Comment:
   The executor service has two threads.  That means events from zookeeper can 
be notified in parallel and in a different order than they were observed from 
zookeeper, does this matter?  I think if the thread pool had one thread and a 
linked queue it would process the events in the same order in which they were 
observed from zookeeper.  Not sure if this even matters though.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy 
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper 
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can 
be used to detect
+ * changes without reading the properties. When the update count changes, the 
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements 
PropChangeListener {
+
+  protected final Logger log;
+  private final AccumuloConfiguration parent;
+  private final PropCacheKey propCacheKey;
+  private final PropStore propStore;
+
+  private final AtomicReference<PropSnapshot> snapshotRef = new 
AtomicReference<>(null);
+
+  public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheKey 
propCacheKey,
+      AccumuloConfiguration parent) {
+    this.log = requireNonNull(log, "a Logger must be supplied");
+    requireNonNull(context, "the context cannot be null");
+    this.propCacheKey = requireNonNull(propCacheKey, "a PropCacheId must be 
supplied");
+    this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must 
be supplied");
+
+    this.propStore =
+        requireNonNull(context.getPropStore(), "The PropStore must be supplied 
and exist");
+
+    propStore.registerAsListener(propCacheKey, this);
+
+    snapshotRef.set(updateSnapshot());
+
+  }
+
+  public long getDataVersion() {
+    var snapshot = snapshotRef.get();
+    if (snapshot == null) {
+      return updateSnapshot().getDataVersion();
+    }
+    return snapshot.getDataVersion();
+  }
+
+  /**
+   * The update count is the sum of the change count of this configuration and 
the change counts of
+   * the parents. The count is used to detect if any changes occurred in the 
configuration hierarchy
+   * and if the configuration needs to be recalculated to maintain consistency 
with values in the
+   * backend store.
+   * <p>
+   * The count is required to be an increasing value.
+   */
+  @Override
+  public long getUpdateCount() {
+    long count = 0;
+    long dataVersion = 0;
+    for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+      if (p instanceof ZooBasedConfiguration) {
+        dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+      } else {
+        dataVersion = p.getUpdateCount();
+      }
+      count += dataVersion;
+    }
+
+    log.trace("update count result for: {} - data version: {} update: {}", 
propCacheKey,
+        dataVersion, count);
+    return count;
+  }
+
+  @Override
+  public AccumuloConfiguration getParent() {
+    return parent;
+  }
+
+  public PropCacheKey getPropCacheKey() {
+    return propCacheKey;
+  }
+
+  @Override
+  public @Nullable String get(final Property property) {
+    Map<String,String> props = getSnapshot();
+    String value = props.get(property.getKey());
+    if (value != null) {
+      return value;
+    }
+    AccumuloConfiguration parent = getParent();
+    if (parent != null) {
+      return parent.get(property);
+    }
+    return null;
+  }
+
+  @Override
+  public void getProperties(final Map<String,String> props, final 
Predicate<String> filter) {
+
+    parent.getProperties(props, filter);
+
+    Map<String,String> theseProps = getSnapshot();
+
+    log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}", 
getPropCacheKey(), filter,
+        theseProps, props);
+
+    for (Map.Entry<String,String> p : theseProps.entrySet()) {
+      if (filter.test(p.getKey()) && p.getValue() != null) {
+        log.trace("passed filter - add to map: {} = {}", p.getKey(), 
p.getValue());
+        props.put(p.getKey(), p.getValue());
+      }
+    }
+  }
+
+  @Override
+  public boolean isPropertySet(final Property property) {
+
+    Map<String,String> theseProps = getSnapshot();
+
+    if (theseProps.get(property.getKey()) != null) {
+      return true;
+    }
+
+    return getParent().isPropertySet(property);
+
+  }
+
+  public Map<String,String> getSnapshot() {
+    var snap = snapshotRef.get();
+    if (snap == null) {
+      return updateSnapshot().getProps();
+    }
+    return snap.getProps();
+  }
+
+  @Override
+  public void invalidateCache() {
+    snapshotRef.set(null);
+  }
+
+  private final Lock updateLock = new ReentrantLock();
+
+  private @NonNull PropSnapshot updateSnapshot() throws PropStoreException {
+
+    PropSnapshot localSnapshot = snapshotRef.get();
+
+    if (localSnapshot != null) {
+      // no changes return locally cached config
+      return localSnapshot;
+    }
+    updateLock.lock();
+    int retryCount = 5;
+    try {
+      localSnapshot = snapshotRef.get();
+      // check for update while waiting for lock.
+      if (localSnapshot != null) {
+        return localSnapshot;
+      }
+
+      PropSnapshot propsRead;
+      var vProps = propStore.get(propCacheKey);
+      if (vProps == null) {
+        snapshotRef.set(null);
+        throw new IllegalStateException("Failed to read properties for " + 
propCacheKey);
+      } else {
+        snapshotRef.set(new PropSnapshot(vProps.getDataVersion(), 
vProps.getProperties()));
+      }
+      return snapshotRef.get();

Review Comment:
   Might be a race condition here.  There are many places in this class that do 
`snapshotRef.set(null);` w/o acquiring `updateLock`.  So another thread could 
sneak in and do the following : 
   
    1. Thread 1 runs `snapshotRef.set(new PropSnapshot(vProps.getDataVersion(), 
vProps.getProperties()));`
    2. Thread 2 runs `snapshotRef.set(null);`
    3. Thread 1 runs `return snapshotRef.get();`
   
   Could acquire the updateLock everywhere snapshotRef is set to null, or maybe 
do the following in this function instead of acquiring the lock everywhere.
   
   ```suggestion
           var newSnapshot = new PropSnapshot(vProps.getDataVersion(), 
vProps.getProperties()); 
           snapshotRef.set(newSnapshot);
           return newSnapshot;
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to