ctubbsii commented on a change in pull request #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r828289950



##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
##########
@@ -29,15 +29,19 @@
 import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.NamespaceId;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
-import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A factor for configurations used by a server process. Instance of this 
class are thread-safe.
  */
 public class ServerConfigurationFactory extends ServerConfiguration {
 
+  // TODO - would it add clarity if log passed in by caller?
+  private static final Logger log = 
LoggerFactory.getLogger(ServerConfigurationFactory.class);
+

Review comment:
       I don't think this class needs its own logger. It's just a trivial 
wrapper that we can't get rid of because it leaks into old balancer APIs. 
ServerConfiguration should have its own logger.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/RuntimeFixedProperties.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to a manage a fixed set of defined properties (designated in 
Properties as fixed).
+ * Certain properties are stored for persistence across restarts, they are 
read during start-up and
+ * remain unchanged for the life of the instance. Any updates to the 
properties will only be
+ * reflected with a restart.
+ * <p>
+ * Note that there are no guarantees that all services will always have the 
same values. If a fixed
+ * property value is changed and if all services are not restarted, they would 
be operating with
+ * different values.
+ */
+public class RuntimeFixedProperties {

Review comment:
       Not sure if this class needs to be part of this PR, or if it could be 
its own change first. It seems like it could be evaluated on its own.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
##########
@@ -107,6 +111,9 @@ private ServerContext(ServerInfo info) {
     this.info = info;
     zooReaderWriter = new ZooReaderWriter(info.getSiteConfiguration());
     serverDirs = info.getServerDirs();
+
+    propStore = new ZooPropStore.Builder(info.getInstanceID(), zooReaderWriter,

Review comment:
       > Why not call `ZooPropStore.initialize()` ?
   
   I think initialize implies that it's creating the entry in ZK.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy 
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper 
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can 
be used to detect
+ * changes without reading the properties. When the update count changes, the 
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements 
PropChangeListener {
+
+  protected final Logger log;
+  private final AccumuloConfiguration parent;
+  private final PropCacheId propCacheId;
+  private final PropStore propStore;
+
+  private final AtomicReference<PropSnapshot> snapshotRef = new 
AtomicReference<>(null);
+
+  public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheId 
propCacheId,
+      AccumuloConfiguration parent) {
+    this.log = requireNonNull(log, "a Logger must be supplied");
+    requireNonNull(context, "the context cannot be null");
+    this.propCacheId = requireNonNull(propCacheId, "a PropCacheId must be 
supplied");
+    this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must 
be supplied");
+
+    this.propStore =
+        requireNonNull(context.getPropStore(), "The PropStore must be supplied 
and exist");
+
+    propStore.registerAsListener(propCacheId, this);
+
+    snapshotRef.set(updateSnapshot());
+
+  }
+
+  public long getDataVersion() {
+    var snapshot = snapshotRef.get();
+    if (snapshot == null) {
+      return updateSnapshot().getDataVersion();
+    }
+    return snapshot.getDataVersion();
+  }
+
+  /**
+   * The update count is the sum of the change count of this configuration and 
the change counts of
+   * the parents. The count is used to detect if any changes occurred in the 
configuration hierarchy
+   * and if the configuration needs to be recalculated to maintain consistency 
with values in the
+   * backend store.
+   * <p>
+   * The count is required to be an increasing value.
+   */
+  @Override
+  public long getUpdateCount() {
+    long count = 0;
+    long dataVersion = 0;
+    for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+      if (p instanceof ZooBasedConfiguration) {
+        dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+      } else {
+        dataVersion = p.getUpdateCount();
+      }
+      count += dataVersion;
+    }
+
+    log.trace("update count result for: {} - data version: {} update: {}", 
propCacheId, dataVersion,
+        count);
+    return count;
+  }
+
+  @Override
+  public AccumuloConfiguration getParent() {
+    return parent;
+  }
+
+  public PropCacheId getCacheId() {
+    return propCacheId;
+  }
+
+  @Override
+  public @Nullable String get(final Property property) {
+    Map<String,String> props = getSnapshot();
+    String value = props.get(property.getKey());
+    if (value != null) {
+      return value;
+    }
+    AccumuloConfiguration parent = getParent();
+    if (parent != null) {
+      return parent.get(property);
+    }
+    return null;
+  }
+
+  @Override
+  public void getProperties(final Map<String,String> props, final 
Predicate<String> filter) {
+
+    parent.getProperties(props, filter);
+
+    Map<String,String> theseProps = getSnapshot();
+
+    log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}", 
getCacheId(), filter,
+        theseProps, props);
+
+    for (Map.Entry<String,String> p : theseProps.entrySet()) {
+      if (filter.test(p.getKey()) && p.getValue() != null) {
+        log.trace("passed filter - add to map: {} = {}", p.getKey(), 
p.getValue());
+        props.put(p.getKey(), p.getValue());
+      }
+    }
+  }
+
+  @Override
+  public boolean isPropertySet(final Property property) {
+
+    Map<String,String> theseProps = getSnapshot();
+
+    if (theseProps.get(property.getKey()) != null) {
+      return true;
+    }
+
+    return getParent().isPropertySet(property);
+
+  }
+
+  public Map<String,String> getSnapshot() {
+    if (snapshotRef.get() == null) {
+      return updateSnapshot().getProps();
+    }
+    return snapshotRef.get().getProps();
+  }

Review comment:
       There are two calls to snapshotRef. The second could be null after 
you've checked the first isn't.
   
   ```suggestion
       var snap = snapshotRef.get();
       if (snap == null) {
         return updateSnapshot().getProps();
       }
       return snap.getProps();
     }
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy 
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper 
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can 
be used to detect
+ * changes without reading the properties. When the update count changes, the 
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements 
PropChangeListener {
+
+  protected final Logger log;
+  private final AccumuloConfiguration parent;
+  private final PropCacheId propCacheId;
+  private final PropStore propStore;
+
+  private final AtomicReference<PropSnapshot> snapshotRef = new 
AtomicReference<>(null);
+
+  public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheId 
propCacheId,
+      AccumuloConfiguration parent) {
+    this.log = requireNonNull(log, "a Logger must be supplied");
+    requireNonNull(context, "the context cannot be null");
+    this.propCacheId = requireNonNull(propCacheId, "a PropCacheId must be 
supplied");
+    this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must 
be supplied");
+
+    this.propStore =
+        requireNonNull(context.getPropStore(), "The PropStore must be supplied 
and exist");
+
+    propStore.registerAsListener(propCacheId, this);
+
+    snapshotRef.set(updateSnapshot());
+
+  }
+
+  public long getDataVersion() {
+    var snapshot = snapshotRef.get();
+    if (snapshot == null) {
+      return updateSnapshot().getDataVersion();
+    }
+    return snapshot.getDataVersion();
+  }
+
+  /**
+   * The update count is the sum of the change count of this configuration and 
the change counts of
+   * the parents. The count is used to detect if any changes occurred in the 
configuration hierarchy
+   * and if the configuration needs to be recalculated to maintain consistency 
with values in the
+   * backend store.
+   * <p>
+   * The count is required to be an increasing value.
+   */
+  @Override
+  public long getUpdateCount() {
+    long count = 0;
+    long dataVersion = 0;
+    for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+      if (p instanceof ZooBasedConfiguration) {
+        dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+      } else {
+        dataVersion = p.getUpdateCount();
+      }
+      count += dataVersion;
+    }
+
+    log.trace("update count result for: {} - data version: {} update: {}", 
propCacheId, dataVersion,
+        count);
+    return count;
+  }
+
+  @Override
+  public AccumuloConfiguration getParent() {
+    return parent;
+  }
+
+  public PropCacheId getCacheId() {
+    return propCacheId;
+  }
+
+  @Override
+  public @Nullable String get(final Property property) {
+    Map<String,String> props = getSnapshot();
+    String value = props.get(property.getKey());
+    if (value != null) {
+      return value;
+    }
+    AccumuloConfiguration parent = getParent();
+    if (parent != null) {
+      return parent.get(property);
+    }
+    return null;
+  }
+
+  @Override
+  public void getProperties(final Map<String,String> props, final 
Predicate<String> filter) {
+
+    parent.getProperties(props, filter);
+
+    Map<String,String> theseProps = getSnapshot();
+
+    log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}", 
getCacheId(), filter,
+        theseProps, props);
+
+    for (Map.Entry<String,String> p : theseProps.entrySet()) {
+      if (filter.test(p.getKey()) && p.getValue() != null) {
+        log.trace("passed filter - add to map: {} = {}", p.getKey(), 
p.getValue());
+        props.put(p.getKey(), p.getValue());
+      }
+    }
+  }
+
+  @Override
+  public boolean isPropertySet(final Property property) {
+
+    Map<String,String> theseProps = getSnapshot();
+
+    if (theseProps.get(property.getKey()) != null) {
+      return true;
+    }
+
+    return getParent().isPropertySet(property);
+
+  }
+
+  public Map<String,String> getSnapshot() {
+    if (snapshotRef.get() == null) {
+      return updateSnapshot().getProps();
+    }
+    return snapshotRef.get().getProps();
+  }
+
+  @Override
+  public void invalidateCache() {
+    snapshotRef.set(null);
+  }
+
+  private final Lock updateLock = new ReentrantLock();
+
+  private @NonNull PropSnapshot updateSnapshot() throws PropStoreException {
+
+    PropSnapshot localSnapshot = snapshotRef.get();
+
+    if (localSnapshot != null) {
+      // no changes return locally cached config
+      return localSnapshot;
+    }
+    updateLock.lock();
+    int retryCount = 5;
+    try {
+      localSnapshot = snapshotRef.get();
+      // check for update while waiting for lock.
+      if (localSnapshot != null) {
+        return localSnapshot;
+      }
+
+      PropSnapshot propsRead;
+
+      long startCount;
+      do {
+        startCount = propStore.getNodeVersion(propCacheId);
+        propsRead = doRead();
+        if (propsRead.getDataVersion() == startCount) {
+          snapshotRef.set(propsRead);
+          return snapshotRef.get();
+        }
+      } while (--retryCount > 0);
+
+      snapshotRef.set(null);
+    } finally {
+      updateLock.unlock();
+    }
+    throw new IllegalStateException(
+        "Failed to read property updates for " + propCacheId + " after " + 
retryCount + " tries");
+
+  }
+
+  private PropSnapshot doRead() throws PropStoreException {
+
+    var vProps = propStore.get(propCacheId);
+    log.trace("doRead() - updateSnapshot() for {}, returned: {}", propCacheId, 
vProps);
+    if (vProps == null) {
+      // TODO - this could return marker instead?
+      // return new PropSnapshot(INVALID_DATA, Map.of());
+      throw new IllegalStateException("Properties for " + propCacheId + " do 
not exist");
+    } else {
+      return new PropSnapshot(vProps.getDataVersion(), vProps.getProperties());
+    }
+  }
+
+  @Override
+  public void zkChangeEvent(PropCacheId watchedId) {
+    if (propCacheId.equals(watchedId)) {
+      log.debug("Received zookeeper property change event for {} - current 
version: {}",
+          propCacheId,
+          snapshotRef.get() != null ? snapshotRef.get().getDataVersion() : "no 
data version set");
+      // snapshotRef.set(new PropSnapshot(INVALID_DATA_VER, Map.of()));
+      snapshotRef.set(null);
+    }
+  }
+
+  @Override
+  public void cacheChangeEvent(PropCacheId watchedId) {
+    if (propCacheId.equals(watchedId)) {
+      log.debug("Received cache property change event for {} - current 
version: {}", propCacheId,
+          snapshotRef.get() != null ? snapshotRef.get().getDataVersion() : "no 
data version set");

Review comment:
       again

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path 
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table), 
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and 
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+  public static final String PROP_NODE_NAME = "encoded_props";
+
+  // indices for path.split();
+  public static final int TYPE_TOKEN_POSITION = 3;
+  public static final int IID_TOKEN_POSITION = 2;
+  public static final int ID_TOKEN_POSITION = 4;
+
+  // remove starting slash from constant.
+  public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+  public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+  private final String path;
+  private final IdType idType;
+  private final NamespaceId namespaceId;
+  private final TableId tableId;
+
+  private PropCacheId(final String path, final IdType idType, final 
NamespaceId namespaceId,
+      final TableId tableId) {
+    this.path = path;
+    this.idType = idType;
+    this.namespaceId = namespaceId;
+    this.tableId = tableId;
+  }
+
+  /**
+   * Instantiate a system prop cache id using the instance id from the context.
+   *
+   * @param context
+   *          the system context specifying the instance id
+   * @return a prop cache id for system properties,
+   */
+  public static PropCacheId forSystem(final ServerContext context) {
+    return forSystem(context.getInstanceID());
+  }
+
+  /**
+   * Instantiate a system prop cache id.
+   *
+   * @param instanceId
+   *          the instance id.
+   * @return a prop cache id for system properties,
+   */
+  public static PropCacheId forSystem(final InstanceId instanceId) {
+    return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" + 
PROP_NODE_NAME,
+        IdType.SYSTEM, null, null);
+  }
+
+  /**
+   * Instantiate a namespace prop cache id using the instance id from the 
context.
+   *
+   * @param context
+   *          the system context specifying the instance id
+   * @param namespaceId
+   *          the namespace id
+   * @return a prop cache id a namespaces properties,
+   */
+  public static PropCacheId forNamespace(final ServerContext context,
+      final NamespaceId namespaceId) {
+    return forNamespace(context.getInstanceID(), namespaceId);
+  }
+
+  /**
+   * Instantiate a namespace prop cache id using the instance id from the 
context.
+   *
+   * @param instanceId
+   *          the instance id
+   * @param namespaceId
+   *          the namespace id
+   * @return a prop cache id a namespaces properties,
+   */
+  public static PropCacheId forNamespace(final InstanceId instanceId,
+      final NamespaceId namespaceId) {
+    return new PropCacheId(ZooUtil.getRoot(instanceId) + ZNAMESPACES + "/" + 
namespaceId.canonical()
+        + ZNAMESPACE_CONF + "/" + PROP_NODE_NAME, IdType.NAMESPACE, 
namespaceId, null);
+  }
+
+  /**
+   * Instantiate a namespace prop cache id using the instance id from the 
context.
+   *
+   * @param context
+   *          the system context specifying the instance id
+   * @param tableId
+   *          the table id
+   * @return a prop cache id a namespaces properties,
+   */
+  public static PropCacheId forTable(final ServerContext context, final 
TableId tableId) {
+    return forTable(context.getInstanceID(), tableId);
+  }
+
+  /**
+   * Instantiate a namespace prop cache id using the instance id from the 
context.
+   *
+   * @param instanceId
+   *          the instance id
+   * @param tableId
+   *          the table id
+   * @return a prop cache id a namespaces properties,
+   */
+  public static PropCacheId forTable(final InstanceId instanceId, final 
TableId tableId) {
+    return new PropCacheId(ZooUtil.getRoot(instanceId) + ZTABLES + "/" + 
tableId.canonical()
+        + ZTABLE_CONF + "/" + PROP_NODE_NAME, IdType.TABLE, null, tableId);
+  }
+
+  /**
+   * Determine the prop cache id from a ZooKeeper path
+   *
+   * @param path
+   *          the path
+   * @return the prop cache id
+   */
+  public static Optional<PropCacheId> fromPath(final String path) {
+    String[] tokens = path.split("/");
+
+    InstanceId instanceId = InstanceId.of(tokens[IID_TOKEN_POSITION]);
+
+    IdType type = extractType(tokens);
+
+    switch (type) {
+      case SYSTEM:
+        return Optional.of(PropCacheId.forSystem(instanceId));
+      case NAMESPACE:
+        return Optional
+            .of(PropCacheId.forNamespace(instanceId, 
NamespaceId.of(tokens[ID_TOKEN_POSITION])));
+      case TABLE:
+        return Optional.of(PropCacheId.forTable(instanceId, 
TableId.of(tokens[ID_TOKEN_POSITION])));
+      case UNKNOWN:
+      default:
+        return Optional.empty();
+    }
+  }
+
+  /**
+   * Determine if the IdType is system, namespace or table from a tokenized 
path. To be a valid id,
+   * the final token is PROP_NODE_NAME and then the type is defined if the 
path has table or
+   * namespace in the path, otherwise it is assumed to be system.
+   *
+   * @param tokens
+   *          a path split into String[] of tokens
+   * @return the id type.
+   */
+  public static IdType extractType(final String[] tokens) {
+    if (tokens.length == 0 || !tokens[tokens.length - 
1].equals(PROP_NODE_NAME)) {
+      // without tokens or it does not end with PROP_NAME_NAME
+      return IdType.UNKNOWN;
+    }
+    if (tokens[TYPE_TOKEN_POSITION].equals(TABLES_NODE_NAME)) {
+      return IdType.TABLE;
+    }
+    if (tokens[TYPE_TOKEN_POSITION].equals(NAMESPACE_NODE_NAME)) {
+      return IdType.NAMESPACE;
+    }
+    return IdType.SYSTEM;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public IdType getIdType() {
+    return idType;
+  }
+
+  @Override
+  public int compareTo(@NonNull PropCacheId other) {
+    return 
Comparator.comparing(PropCacheId::getIdType).thenComparing(PropCacheId::getPath)
+        .compare(this, other);
+  }
+
+  // TODO - remove optional and return null.
+  /**
+   * If the prop cache is for a namespace, return the namespace id.
+   *
+   * @return the namespace id.
+   */
+  public Optional<NamespaceId> getNamespaceId() {
+    return Optional.ofNullable(namespaceId);
+  }
+
+  /**
+   * if the prop cache is for a table, return the table id.
+   *
+   * @return the table id.
+   */
+  public Optional<TableId> getTableId() {
+    return Optional.ofNullable(tableId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (o == null || getClass() != o.getClass())
+      return false;
+    PropCacheId that = (PropCacheId) o;
+    return path.equals(that.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path);
+  }
+
+  @Override
+  public String toString() {
+    switch (idType) {
+      case SYSTEM:
+        return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[", 
"]")
+            .add("idType=System").toString();
+      case NAMESPACE:
+        return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[", 
"]")
+            .add("idType=Namespace").add("namespaceId=" + 
namespaceId).toString();
+      case TABLE:
+        return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[", 
"]")
+            .add("idType=Table").add("tableId=" + tableId).toString();
+      default:
+        return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[", 
"]")
+            .add("idType=" + idType).add("namespaceId=" + 
namespaceId).add("tableId=" + tableId)
+            .add("path='" + path + "'").toString();
+    }
+
+  }
+
+  /**
+   * Define types of properties stored in zookeeper. Note: default properties 
are not in zookeeper
+   * but come from code.
+   */
+  public enum IdType {
+    UNKNOWN, SYSTEM, NAMESPACE, TABLE
+  }

Review comment:
       I don't think we need these types. It's sufficient to have the entry 
point have the static entry points. The underlying key should just be the ZK 
path constructed using the InstanceId and the selected entry point method.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts 
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set / 
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to 
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored 
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive 
active notification
+ * without needed to register / manage ZooKeeper watchers external to the 
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper 
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread 
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+  private static final Logger log = 
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+  private final ExecutorService executorService =
+      ThreadPools.getServerThreadPools().createFixedThreadPool(1, 
"zoo_change_update", false);
+
+  private final ReentrantReadWriteLock listenerLock = new 
ReentrantReadWriteLock();
+  private final ReentrantReadWriteLock.ReadLock listenerReadLock = 
listenerLock.readLock();
+  private final ReentrantReadWriteLock.WriteLock listenerWriteLock = 
listenerLock.writeLock();
+
+  // access should be guarded by acquiring the listener read or write lock
+  private final Map<PropCacheId,Set<PropChangeListener>> listeners = new 
HashMap<>();
+
+  private final ReadyMonitor zkReadyMonitor;
+
+  public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+    this.zkReadyMonitor = zkReadyMonitor;
+  }
+
+  public void registerListener(final PropCacheId propCacheId, final 
PropChangeListener listener) {
+    listenerWriteLock.lock();
+    try {
+      Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheId, s 
-> new HashSet<>());
+      set.add(listener);
+    } finally {
+      listenerWriteLock.unlock();
+    }
+  }
+
+  /**
+   * Process a ZooKeeper event. This method does not reset the watcher. 
Subscribers are notified of
+   * the change - if they call get to update and respond to the change the 
watcher will be (re)set
+   * then. This helps clean up watchers by not automatically re-adding the 
watcher on the event but
+   * only if being used.
+   *
+   * @param event
+   *          ZooKeeper event.
+   */
+  @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire 
and forget
+  @Override
+  public void process(final WatchedEvent event) {
+
+    String path;
+    switch (event.getType()) {
+      case NodeDataChanged:
+        path = event.getPath();
+        log.trace("handle change event for path: {}", path);
+        PropCacheId.fromPath(path).ifPresent(this::signalZkChangeEvent);
+        break;
+      case NodeDeleted:
+        path = event.getPath();
+        log.trace("handle delete event for path: {}", path);
+        PropCacheId.fromPath(path).ifPresent(cacheId -> {
+          // notify listeners
+          Set<PropChangeListener> snapshot = getListenerSnapshot(cacheId);
+          if (snapshot != null) {
+            executorService
+                .submit(new 
PropStoreEventTask.PropStoreDeleteEventTask(cacheId, snapshot));
+          }
+
+          listenerCleanup(cacheId);
+
+        });
+
+        break;
+      case None:
+        Event.KeeperState state = event.getState();
+        switch (state) {
+          // pause - could reconnect
+          case ConnectedReadOnly:
+          case Disconnected:
+            log.debug("ZooKeeper disconnected event received");
+            zkReadyMonitor.clearReady();
+            executorService.submit(new 
PropStoreEventTask.PropStoreConnectionEventTask(null,
+                getAllListenersSnapshot()));
+            break;
+
+          // okay
+          case SyncConnected:
+            log.debug("ZooKeeper connected event received");
+            zkReadyMonitor.setReady();
+            break;
+
+          // terminal - never coming back.
+          case Expired:
+          case Closed:
+            log.info("ZooKeeper connection closed event received");
+            zkReadyMonitor.clearReady();
+            zkReadyMonitor.setClosed(); // terminal condition
+            executorService.submit(new 
PropStoreEventTask.PropStoreConnectionEventTask(null,
+                getAllListenersSnapshot()));
+            break;
+
+          default:
+            log.trace("ignoring zooKeeper state: {}", state);
+        }
+        break;
+      default:
+        break;
+    }
+
+  }
+
+  /**
+   * Submit task to notify registered listeners that the propCacheId node 
received an event
+   * notification from ZooKeeper and should be updated. The process can be 
initiated either by a
+   * ZooKeeper notification or a change detected in the cache based on a 
ZooKeeper event.
+   *
+   * @param propCacheId
+   *          the cache id
+   */
+  @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire 
and forget

Review comment:
       Suppressing non-standard warnings can generate new warnings in 
compilers/IDEs that don't recognize these non-standard ones (for good reason: 
unrecognized warnings submissions could be typos and not what the user 
intends). The ignoring of the return value should be solved in a different way, 
such as calling execute instead of submit.

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
##########
@@ -253,6 +253,8 @@ public TabletServerResourceManager(ServerContext context) {
     this.context = context;
     final AccumuloConfiguration acuConf = context.getConfiguration();
 
+    log.info("Using configuration: {}", acuConf);
+

Review comment:
       We already log the configuration on startup for all the servers. So, 
this is a bit redundant and should be removed before merging.
   ```suggestion
   
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
##########
@@ -121,32 +74,23 @@ public void getProperties(Map<String,String> props, 
Predicate<String> filter) {
     if (getNamespaceId().equals(Namespace.ACCUMULO.id()))
       parentFilter = key -> isIteratorOrConstraint(key) ? false : 
filter.test(key);
 
-    getPropCacheAccessor().getProperties(props, getPath(), filter, parent, 
parentFilter);
+    getParent().getProperties(props, parentFilter != null ? parentFilter : 
filter);
+
+    Map<String,String> theseProps = getSnapshot();
+    for (Map.Entry<String,String> p : theseProps.entrySet()) {
+      if (filter.test(p.getKey()) && p.getValue() != null) {
+        props.put(p.getKey(), p.getValue());
+      }
+    }

Review comment:
       ```suggestion
       getSnapshot().entrySet().filter(e -> filter.test(e.getKey()) && 
e.getValue() != null)
           .forEach(e -> props.put(e.getKey(), e.getValue()));
   ```

##########
File path: 
test/src/main/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java
##########
@@ -60,8 +60,8 @@ public void test() throws Exception {
       }
       c.tableOperations().list();
       String zooKeepers = ClientProperty.INSTANCE_ZOOKEEPERS.getValue(props);
-      final long MIN = 475L;
-      final long MAX = 900L;
+      final long MIN = 150L; // 475L;
+      final long MAX = 250L; // 900L;

Review comment:
       The old values don't need to be commented out. A better comment might 
explain where the range of values comes from (like *X number of tables' 
properties times Y threads plus Z non-property watched fields* or whatever).
   ```suggestion
         final long MIN = 150L;
         final long MAX = 250L;
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigConverter.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO - this is in progress and should not be merged without changes.
+// TODO - needs upgrade integration and testing.
+/**
+ * Convert pre-2.1 system, namespace and table properties to PropEncoded 
format.
+ *
+ * <pre>
+ * Source ZooKeeper paths:
+ *   srcSysPath - system config source = /accumulo/[iid-id]/config;
+ *   srcNsBasePath - namespace config source /accumulo/[iid]/namespaces;
+ *   srcTableBasePath - table config source /accumulo/[iid]/tables;
+ * </pre>
+ */
+public class ConfigConverter {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ConfigConverter.class);
+
+  private final ZooReaderWriter zrw;
+  private final InstanceId instanceId;
+
+  private final PropStore propStore;
+
+  private final String zkBasePath; // base path for accumulo instance - 
/accumulo/[iid]
+
+  private final Set<String> legacyPaths = new HashSet<>();
+
+  public ConfigConverter(final ServerContext context) {
+
+    instanceId = context.getInstanceID();
+    zrw = context.getZooReaderWriter();
+    propStore = context.getPropStore();
+
+    zkBasePath = ZooUtil.getRoot(instanceId);
+  }
+
+  public synchronized static void convert(final ServerContext context,
+      final boolean deleteWhenComplete) {
+    ConfigConverter converter = new ConfigConverter(context);
+    converter.convertSys();
+    converter.convertNamespace();
+    converter.convertTables();
+
+    if (deleteWhenComplete) {
+      converter.removeLegacyPaths();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", ConfigConverter.class.getSimpleName() + "[", 
"]")
+        .add("converted=" + legacyPaths).toString();
+  }
+
+  public void convertSys() {
+    var sysId = PropCacheId.forSystem(instanceId);
+    var zkPathSysConfig = zkBasePath + Constants.ZCONFIG;
+
+    Map<String,String> props = readLegacyProps(zkPathSysConfig);
+
+    Map<String,String> renamedProps = new HashMap<>();
+    props.forEach((original, value) -> {
+      var finalName = DeprecatedPropertyUtil.getReplacementName(original,
+          (log, replacement) -> log
+              .info("Automatically renaming deprecated property '{}' with its 
replacement '{}'"
+                  + " in ZooKeeper configuration upgrade.", original, 
replacement));
+      renamedProps.put(finalName, value);
+    });
+
+    log.info("system props: {} -> {}", props, renamedProps);
+
+    writeConverted(sysId, renamedProps, zkPathSysConfig);
+
+    // delete - the confirmation and then delete done in two steps so that the 
removal is atomic.
+    // If the props were deleted as confirmed
+  }
+
+  public void convertNamespace() {
+    var zkPathNamespaceBase = zkBasePath + Constants.ZNAMESPACES;
+    try {
+      List<String> namespaces = zrw.getChildren(zkPathNamespaceBase);
+      for (String namespace : namespaces) {
+        String zkPropBasePath = zkPathNamespaceBase + "/" + namespace + 
Constants.ZNAMESPACE_CONF;
+        log.info("NS:{} base path: {}", namespace, zkPropBasePath);
+        Map<String,String> props = readLegacyProps(zkPropBasePath);
+        log.info("Namespace props: {} - {}", namespace, props);
+        writeConverted(PropCacheId.forNamespace(instanceId, 
NamespaceId.of(namespace)), props,
+            zkPropBasePath);
+      }
+    } catch (KeeperException ex) {
+      throw new IllegalStateException(
+          "Failed to convert namespace from ZooKeeper for path: " + 
zkPathNamespaceBase, ex);
+    } catch (InterruptedException ex) {
+      throw new IllegalStateException(
+          "Interrupted reading namespaces from ZooKeeper for path: " + 
zkPathNamespaceBase, ex);
+    }
+  }
+
+  public void convertTables() {
+    var zkPathTableBase = zkBasePath + Constants.ZTABLES;
+    try {
+      List<String> tables = zrw.getChildren(zkPathTableBase);
+      for (String table : tables) {
+        String zkPropBasePath = zkPathTableBase + "/" + table + 
Constants.ZTABLE_CONF;
+        log.info("table:{} base path: {}", table, zkPropBasePath);
+        Map<String,String> props = readLegacyProps(zkPropBasePath);
+        log.info("table props: {} - {}", table, props);
+        writeConverted(PropCacheId.forTable(instanceId, TableId.of(table)), 
props, zkPropBasePath);
+      }
+    } catch (KeeperException ex) {
+      throw new IllegalStateException(
+          "Failed to convert tables from ZooKeeper for path: " + 
zkPathTableBase, ex);
+    } catch (InterruptedException ex) {
+      throw new IllegalStateException(
+          "Interrupted reading namespaces from ZooKeeper for path: " + 
zkPathTableBase, ex);
+    }
+  }
+
+  private void removeLegacyPaths() {
+    for (String path : legacyPaths) {
+      log.debug("delete ZooKeeper path: {}", path);
+      try {
+        zrw.delete(path);
+      } catch (KeeperException ex) {
+        log.warn(
+            "Failed to delete path on property conversion " + path + ", 
reason" + ex.getMessage());
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw new IllegalStateException(ex);
+      }
+    }
+  }
+
+  private Map<String,String> readLegacyProps(final String path) {
+    requireNonNull(path, "A ZooKeeper path for configuration properties must 
be supplied");
+    Map<String,String> props = new HashMap<>();
+    try {
+      List<String> children = zrw.getChildren(path);
+      log.info("Looking in: {}, found: {}", path, children);
+      for (String child : children) {
+        if (Property.isValidPropertyKey(child)) {
+          byte[] bytes = zrw.getData(path + "/" + child);
+          props.put(child, new String(bytes, UTF_8));
+          legacyPaths.add(path + "/" + child);
+        } else {
+          log.info("Skipping invalid property: {} in {}", child, path);
+        }
+      }
+    } catch (KeeperException ex) {
+      throw new IllegalStateException("Failed to get children from ZooKeeper 
for path: " + path,
+          ex);
+    } catch (InterruptedException ex) {
+      throw new IllegalStateException(
+          "Interrupted reading children from ZooKeeper for path: " + path, ex);
+    }
+    return props;
+  }
+
+  private void writeConverted(final PropCacheId propCacheId, final 
Map<String,String> props,

Review comment:
       I don't think this converter util needs to use PropCacheId. I think it 
would be simpler if it just used a path directly. A lot of this code might be 
redundant.

##########
File path: 
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
##########
@@ -183,8 +183,15 @@ private void checkForMerge(final long tid, final Manager 
manager) throws Excepti
     VolumeManager fs = manager.getVolumeManager();
     final Path bulkDir = new Path(bulkInfo.sourceDir);
 
-    int maxTablets = 
Integer.parseInt(manager.getContext().getTableConfiguration(bulkInfo.tableId)
-        .get(Property.TABLE_BULK_MAX_TABLETS));
+    String value = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+        .get(Property.TABLE_BULK_MAX_TABLETS);
+    if (value == null) {
+      value = Property.TABLE_BULK_MAX_TABLETS.getDefaultValue();
+      log.info("Property not found " + Property.TABLE_BULK_MAX_TABLETS + " 
using default: " + value
+          + " for tableId: " + bulkInfo.tableId + " using default: " + value);
+    }
+
+    int maxTablets = Integer.parseInt(value);

Review comment:
       This is a change in behavior that causes it to use a different value 
than the user intended to specify when there is a typo, instead of erroring 
out. It also does so with merely an info message, and not even a warning about 
the error. This seems completely unrelated to this PR and could be evaluated on 
its own in a different PR.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy 
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper 
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can 
be used to detect
+ * changes without reading the properties. When the update count changes, the 
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements 
PropChangeListener {
+
+  protected final Logger log;
+  private final AccumuloConfiguration parent;
+  private final PropCacheId propCacheId;
+  private final PropStore propStore;
+
+  private final AtomicReference<PropSnapshot> snapshotRef = new 
AtomicReference<>(null);
+
+  public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheId 
propCacheId,
+      AccumuloConfiguration parent) {
+    this.log = requireNonNull(log, "a Logger must be supplied");
+    requireNonNull(context, "the context cannot be null");
+    this.propCacheId = requireNonNull(propCacheId, "a PropCacheId must be 
supplied");
+    this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must 
be supplied");
+
+    this.propStore =
+        requireNonNull(context.getPropStore(), "The PropStore must be supplied 
and exist");
+
+    propStore.registerAsListener(propCacheId, this);
+
+    snapshotRef.set(updateSnapshot());
+
+  }
+
+  public long getDataVersion() {
+    var snapshot = snapshotRef.get();
+    if (snapshot == null) {
+      return updateSnapshot().getDataVersion();
+    }
+    return snapshot.getDataVersion();
+  }
+
+  /**
+   * The update count is the sum of the change count of this configuration and 
the change counts of
+   * the parents. The count is used to detect if any changes occurred in the 
configuration hierarchy
+   * and if the configuration needs to be recalculated to maintain consistency 
with values in the
+   * backend store.
+   * <p>
+   * The count is required to be an increasing value.
+   */
+  @Override
+  public long getUpdateCount() {
+    long count = 0;
+    long dataVersion = 0;
+    for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+      if (p instanceof ZooBasedConfiguration) {
+        dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+      } else {
+        dataVersion = p.getUpdateCount();
+      }
+      count += dataVersion;
+    }
+
+    log.trace("update count result for: {} - data version: {} update: {}", 
propCacheId, dataVersion,
+        count);
+    return count;
+  }
+
+  @Override
+  public AccumuloConfiguration getParent() {
+    return parent;
+  }
+
+  public PropCacheId getCacheId() {
+    return propCacheId;
+  }
+
+  @Override
+  public @Nullable String get(final Property property) {
+    Map<String,String> props = getSnapshot();
+    String value = props.get(property.getKey());
+    if (value != null) {
+      return value;
+    }
+    AccumuloConfiguration parent = getParent();
+    if (parent != null) {
+      return parent.get(property);
+    }
+    return null;
+  }
+
+  @Override
+  public void getProperties(final Map<String,String> props, final 
Predicate<String> filter) {
+
+    parent.getProperties(props, filter);
+
+    Map<String,String> theseProps = getSnapshot();
+
+    log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}", 
getCacheId(), filter,
+        theseProps, props);
+
+    for (Map.Entry<String,String> p : theseProps.entrySet()) {
+      if (filter.test(p.getKey()) && p.getValue() != null) {
+        log.trace("passed filter - add to map: {} = {}", p.getKey(), 
p.getValue());
+        props.put(p.getKey(), p.getValue());
+      }
+    }
+  }
+
+  @Override
+  public boolean isPropertySet(final Property property) {
+
+    Map<String,String> theseProps = getSnapshot();
+
+    if (theseProps.get(property.getKey()) != null) {
+      return true;
+    }
+
+    return getParent().isPropertySet(property);
+
+  }
+
+  public Map<String,String> getSnapshot() {
+    if (snapshotRef.get() == null) {
+      return updateSnapshot().getProps();
+    }
+    return snapshotRef.get().getProps();
+  }
+
+  @Override
+  public void invalidateCache() {
+    snapshotRef.set(null);
+  }
+
+  private final Lock updateLock = new ReentrantLock();
+
+  private @NonNull PropSnapshot updateSnapshot() throws PropStoreException {
+
+    PropSnapshot localSnapshot = snapshotRef.get();
+
+    if (localSnapshot != null) {
+      // no changes return locally cached config
+      return localSnapshot;
+    }
+    updateLock.lock();
+    int retryCount = 5;
+    try {
+      localSnapshot = snapshotRef.get();
+      // check for update while waiting for lock.
+      if (localSnapshot != null) {
+        return localSnapshot;
+      }
+
+      PropSnapshot propsRead;
+
+      long startCount;
+      do {
+        startCount = propStore.getNodeVersion(propCacheId);
+        propsRead = doRead();
+        if (propsRead.getDataVersion() == startCount) {
+          snapshotRef.set(propsRead);
+          return snapshotRef.get();
+        }
+      } while (--retryCount > 0);
+
+      snapshotRef.set(null);
+    } finally {
+      updateLock.unlock();
+    }
+    throw new IllegalStateException(
+        "Failed to read property updates for " + propCacheId + " after " + 
retryCount + " tries");
+
+  }
+
+  private PropSnapshot doRead() throws PropStoreException {
+
+    var vProps = propStore.get(propCacheId);
+    log.trace("doRead() - updateSnapshot() for {}, returned: {}", propCacheId, 
vProps);
+    if (vProps == null) {
+      // TODO - this could return marker instead?
+      // return new PropSnapshot(INVALID_DATA, Map.of());
+      throw new IllegalStateException("Properties for " + propCacheId + " do 
not exist");
+    } else {
+      return new PropSnapshot(vProps.getDataVersion(), vProps.getProperties());
+    }
+  }
+
+  @Override
+  public void zkChangeEvent(PropCacheId watchedId) {
+    if (propCacheId.equals(watchedId)) {
+      log.debug("Received zookeeper property change event for {} - current 
version: {}",
+          propCacheId,
+          snapshotRef.get() != null ? snapshotRef.get().getDataVersion() : "no 
data version set");

Review comment:
       multiple get calls to snapshotRef could result in null after null check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to