dlmarion commented on code in PR #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r850399223


##########
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.util.PropSnapshot;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy 
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper 
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can 
be used to detect
+ * changes without reading the properties. When the update count changes, the 
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements 
PropChangeListener {
+
+  protected final Logger log;
+  private final AccumuloConfiguration parent;
+  private final PropCacheKey propCacheKey;
+
+  private final PropSnapshot snapshot;
+
+  public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheKey 
propCacheKey,
+      AccumuloConfiguration parent) {
+    this.log = requireNonNull(log, "a Logger must be supplied");
+    requireNonNull(context, "the context cannot be null");
+    this.propCacheKey = requireNonNull(propCacheKey, "a PropCacheId must be 
supplied");
+    this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must 
be supplied");
+
+    PropStore propStore =
+        requireNonNull(context.getPropStore(), "The PropStore must be supplied 
and exist");
+
+    propStore.registerAsListener(propCacheKey, this);
+
+    snapshot = new PropSnapshot(propCacheKey, propStore);
+
+  }
+
+  public long getDataVersion() {
+    return snapshot.get().getDataVersion();
+  }
+
+  /**
+   * The update count is the sum of the change count of this configuration and 
the change counts of
+   * the parents. The count is used to detect if any changes occurred in the 
configuration hierarchy
+   * and if the configuration needs to be recalculated to maintain consistency 
with values in the
+   * backend store.
+   * <p>
+   * The count is required to be an increasing value.
+   */
+  @Override
+  public long getUpdateCount() {
+    long count = 0;
+    long dataVersion = 0;
+    for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+      if (p instanceof ZooBasedConfiguration) {
+        dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+      } else {
+        dataVersion = p.getUpdateCount();
+      }
+      count += dataVersion;
+    }
+
+    log.trace("update count result for: {} - data version: {} update: {}", 
propCacheKey,
+        dataVersion, count);
+    return count;
+  }
+
+  @Override
+  public AccumuloConfiguration getParent() {
+    return parent;
+  }
+
+  public PropCacheKey getPropCacheKey() {
+    return propCacheKey;
+  }
+
+  @Override
+  public @Nullable String get(final Property property) {
+    Map<String,String> props = getSnapshot();
+    String value = props.get(property.getKey());

Review Comment:
   If getParent() returned `Optional<AccumuloConfiguration>`, then this method 
could end wiith one additional line that maybe looks like:
   ```
   return (value != null) ? value : 
getParent().orElse(Collections.emptyMap()).get(property);
   ```
   
   Or, if getParent() returned an empty AccumuloConfiguration type object:
   ```
   return (value != null) ? value : getParent().get(property);
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java:
##########
@@ -49,7 +51,7 @@ public class VersionedProperties {
   public static final DateTimeFormatter tsFormatter =

Review Comment:
   ```suggestion
     public static final DateTimeFormatter TIMESTAMP_FORMATTER =
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/util/TablePropUtil.java:
##########
@@ -18,48 +18,47 @@
  */
 package org.apache.accumulo.server.util;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.AbstractId;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.zookeeper.KeeperException;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropStoreException;
 
-public class TablePropUtil {
+public class TablePropUtil implements PropUtil {
 
-  public static boolean setTableProperty(ServerContext context, TableId 
tableId, String property,
-      String value) throws KeeperException, InterruptedException {
-    return setTableProperty(context.getZooReaderWriter(), 
context.getZooKeeperRoot(), tableId,
-        property, value);
-  }
-
-  public static boolean setTableProperty(ZooReaderWriter zoo, String zkRoot, 
TableId tableId,
-      String property, String value) throws KeeperException, 
InterruptedException {
-    if (!Property.isTablePropertyValid(property, value))
-      return false;
+  private TablePropUtil() {}
 
-    // create the zk node for per-table properties for this table if it 
doesn't already exist
-    String zkTablePath = getTablePath(zkRoot, tableId);
-    zoo.putPersistentData(zkTablePath, new byte[0], NodeExistsPolicy.SKIP);
+  public static TablePropUtil factory() {
+    return new TablePropUtil();
+  }
 
-    // create the zk node for this property and set it's data to the specified 
value
-    String zPath = zkTablePath + "/" + property;
-    zoo.putPersistentData(zPath, value.getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
+  /**
+   * Helper method to set provided properties for the provided table.
+   *
+   * @throws PropStoreException
+   *           if an underlying exception (KeeperException, 
InterruptException) or other failure to
+   *           read properties from the cache / backend store
+   */
+  @Override
+  public void setProperties(ServerContext context, AbstractId<?> tableId,
+      Map<String,String> props) {
+    Map<String,String> tempProps = new HashMap<>(props);
+    // TODO reconcile with NamespacePropUtil on invalid, this ignores, 
namespace throws exception

Review Comment:
   Is this TODO still valid?



##########
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java:
##########
@@ -183,8 +183,14 @@ private void checkForMerge(final long tid, final Manager 
manager) throws Excepti
     VolumeManager fs = manager.getVolumeManager();
     final Path bulkDir = new Path(bulkInfo.sourceDir);
 
-    int maxTablets = 
Integer.parseInt(manager.getContext().getTableConfiguration(bulkInfo.tableId)
-        .get(Property.TABLE_BULK_MAX_TABLETS));
+    String value = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+        .get(Property.TABLE_BULK_MAX_TABLETS);
+    if (value == null) {
+      throw new IllegalStateException("The property: " + 
Property.TABLE_BULK_MAX_TABLETS.getKey()

Review Comment:
   Property.TABLE_BULK_MAX_TABLETS has a default value of `0`. Under normal 
circumstances `value` should never be null. If this is being used to see if 
some type of error has occurred in the new Property code, then I would suggest 
that this type of check be moved inside the Configuration classes in the `get` 
method.



##########
server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java:
##########
@@ -19,134 +19,78 @@
 package org.apache.accumulo.server.conf;
 
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.clientImpl.Namespace;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
-import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.conf.ZooCachePropertyAccessor.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class NamespaceConfiguration extends AccumuloConfiguration {
+public class NamespaceConfiguration extends ZooBasedConfiguration {
 
-  private static final Map<PropCacheKey,ZooCache> propCaches = new 
java.util.HashMap<>();
-
-  private final AccumuloConfiguration parent;
-  private final AtomicReference<ZooCachePropertyAccessor> propCacheAccessor =
-      new AtomicReference<>();
-  protected NamespaceId namespaceId = null;
+  private static final Logger log = 
LoggerFactory.getLogger(NamespaceConfiguration.class);
   protected ServerContext context;
-  private ZooCacheFactory zcf = new ZooCacheFactory();
-  private final String path;
 
   public NamespaceConfiguration(NamespaceId namespaceId, ServerContext context,
       AccumuloConfiguration parent) {
-    this.context = context;
-    this.parent = parent;
-    this.namespaceId = namespaceId;
-    this.path = context.getZooKeeperRoot() + Constants.ZNAMESPACES + "/" + 
namespaceId
-        + Constants.ZNAMESPACE_CONF;
+    super(log, context, PropCacheKey.forNamespace(context, namespaceId), 
parent);
   }
 
-  /**
-   * Gets the parent configuration of this configuration.
-   *
-   * @return parent configuration
-   */
-  public AccumuloConfiguration getParentConfiguration() {
-    return parent;
-  }
+  @Override
+  public String get(Property property) {
 
-  void setZooCacheFactory(ZooCacheFactory zcf) {
-    this.zcf = zcf;
-  }
+    String key = property.getKey();
 
-  private ZooCache getZooCache() {
-    synchronized (propCaches) {
-      PropCacheKey key = new PropCacheKey(context.getInstanceID(), 
namespaceId.canonical());
-      return propCaches.computeIfAbsent(key,
-          k -> zcf.getZooCache(context.getZooKeepers(), 
context.getZooKeepersSessionTimeOut()));
+    var namespaceId = getPropCacheKey().getNamespaceId();
+    if (namespaceId != null && namespaceId.equals(Namespace.ACCUMULO.id())
+        && isIteratorOrConstraint(key)) {
+      // ignore iterators from parent if system namespace
+      return null;
     }
-  }
-
-  private ZooCachePropertyAccessor getPropCacheAccessor() {
-    // updateAndGet below always calls compare and set, so avoid if not null
-    ZooCachePropertyAccessor zcpa = propCacheAccessor.get();
-    if (zcpa != null)
-      return zcpa;
-
-    return propCacheAccessor
-        .updateAndGet(pca -> pca == null ? new 
ZooCachePropertyAccessor(getZooCache()) : pca);
-  }
-
-  private String getPath() {
-    return path;
-  }
-
-  @Override
-  public boolean isPropertySet(Property prop, boolean cacheAndWatch) {
-    if (!cacheAndWatch)
-      throw new UnsupportedOperationException(
-          "Namespace configuration only supports checking if a property is set 
in cache.");
-
-    if (getPropCacheAccessor().isPropertySet(prop, getPath()))
-      return true;
 
-    return parent.isPropertySet(prop, cacheAndWatch);
-  }
+    Map<String,String> theseProps = getSnapshot();
+    String value = theseProps.get(key);
 
-  @Override
-  public String get(Property property) {
-    String key = property.getKey();
-    AccumuloConfiguration getParent;
-    if (namespaceId.equals(Namespace.ACCUMULO.id()) && 
isIteratorOrConstraint(key)) {
-      // ignore iterators from parent if system namespace
-      getParent = null;
-    } else {
-      getParent = parent;
+    if (value != null) {
+      return value;
     }
-    return getPropCacheAccessor().get(property, getPath(), getParent);
+
+    return getParent().get(key);

Review Comment:
   I'm a little confused as to whether `getParent()` can return null or not. 
There is no null check here, but there is in TableConfiguration. Both this 
class and TableConfiguration extend the same class. Also, I'm wondering if 
there is a way to get rid of all of the null checking around `getParent()`. I 
read through the conversation in `AccumuloConfiguration.getParent()`. I didn't 
see any mention of using `Optional` or maybe returning a non-null, but empty 
configuration.



##########
server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java:
##########
@@ -193,6 +198,38 @@ private boolean doInit(ZooReaderWriter zoo, Opts opts, 
VolumeManager fs,
     return true;
   }
 
+  private void initRequiredPropStoreZKPaths(final Opts opts, final 
ZooReaderWriter zoo,
+      final String instanceNamePath, final InstanceId instanceId) {
+
+    try {
+
+      zoo.putPersistentData(Constants.ZROOT, new byte[0], 
ZooUtil.NodeExistsPolicy.SKIP,
+          ZooDefs.Ids.OPEN_ACL_UNSAFE);
+      zoo.putPersistentData(Constants.ZROOT + Constants.ZINSTANCES, new 
byte[0],
+          ZooUtil.NodeExistsPolicy.SKIP, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+
+      // setup instance name
+      if (opts.clearInstanceName) {
+        zoo.recursiveDelete(instanceNamePath, ZooUtil.NodeMissingPolicy.SKIP);
+      }
+
+      log.warn("INSTANCE PATH path: {}", instanceId);

Review Comment:
   Should this be info?



##########
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropCodec.java:
##########
@@ -152,28 +171,26 @@ public static int getEncodingVersion(final byte[] bytes) {
   }
 
   /**
-   * Extracts the data version from the encoded byte array without fully 
decoding the payload.
-   * Normally the data version should be obtained from a fully decoded 
instance of the versioned
-   * properties.
+   * Extracts the timestamp from the encoded byte array without fully decoding 
the payload. Normally
+   * the timestamp should be obtained from a fully decoded instance of the 
versioned properties.
    * <p>
    * The cost of reading the byte array from the backing store should be 
considered verses the
    * additional cost of decoding - with a goal of reducing data reads from the 
store preferred.
    * Generally reading from the store will be followed by some sort of usage 
which would require the
-   * full decode operation anyway.
+   * full decode operation anyway, so uses of this method should be narrow and 
limited.
    *
    * @param bytes
    *          serialized encoded versioned property byte array.
-   * @return the encoding version used to serialize the properties.
+   * @return the timestamp used to serialize the properties.
    */
-  public static int getDataVersion(final byte[] bytes) {
+  public static Instant readTimestamp(final byte[] bytes) {
     try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         DataInputStream dis = new DataInputStream(bis)) {
       // skip encoding metadata

Review Comment:
   ```suggestion
         // Use EncodingOptions.fromDataStream to read the metadata from the 
stream
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.annotations.VisibleForTesting;
+
+public class PropCacheCaffeineImpl implements PropCache {
+
+  public static final TimeUnit BASE_TIME_UNITS = TimeUnit.MINUTES;
+  public static final int REFRESH_MIN = 15;
+  public static final int EXPIRE_MIN = 60;
+  private static final Logger log = 
LoggerFactory.getLogger(PropCacheCaffeineImpl.class);
+  private static final Executor executor = 
ThreadPools.getServerThreadPools().createThreadPool(1,
+      20, 60, TimeUnit.SECONDS, "cache-refresh", false);
+
+  private final PropStoreMetrics metrics;
+
+  private final LoadingCache<PropCacheKey,VersionedProperties> cache;
+
+  private PropCacheCaffeineImpl(final 
CacheLoader<PropCacheKey,VersionedProperties> cacheLoader,
+      final PropStoreMetrics metrics, final Ticker ticker) {
+    this.metrics = metrics;
+
+    if (ticker != null) { // build test instance with artificial clock.

Review Comment:
   Alternatively, could you do something like:
   ```
     var builder = Caffeine.newBuilder().refreshAfterWrite(REFRESH_MIN, 
BASE_TIME_UNITS)
             .expireAfterAccess(EXPIRE_MIN, 
BASE_TIME_UNITS).evictionListener(this::evictionNotifier)
             .executor(executor);
     if (ticker != null) {
       builder = builder.ticker(ticker);
     }
     cache = builder.build;
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java:
##########
@@ -484,11 +452,10 @@ static Map<String,DataFileValue> 
cleanupRootTabletFiles(VolumeManager fs, String
   public void upgradeFileDeletes(ServerContext context, Ample.DataLevel level) 
{
 
     String tableName = level.metaTable();
-    AccumuloClient c = context;

Review Comment:
   Why remove this and then cast below?



##########
server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigTransformer.java:
##########
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
+import org.apache.accumulo.core.util.DurationFormat;
+import org.apache.accumulo.fate.util.Retry;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.accumulo.server.conf.store.impl.PropStoreWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Read legacy properties (pre 2.1) from ZooKeeper and transform them into the 
single node format.
+ * The encoded properties are stored in ZooKeeper and then the legacy property 
ZooKeeper nodes are
+ * deleted.
+ */
+public class ConfigTransformer {
+
+  private static final Logger log = 
LoggerFactory.getLogger(ConfigTransformer.class);
+
+  private final ZooReaderWriter zrw;
+  private final VersionedPropCodec codec;
+  private final PropStoreWatcher propStoreWatcher;
+  private final Retry retry;
+
+  /**
+   * Instantiate a transformer instance.
+   *
+   * @param zrw
+   *          a ZooReaderWriter
+   * @param codec
+   *          the codec used to encode to the single-node format.
+   * @param propStoreWatcher
+   *          the watcher registered to receive future notifications of 
changes to the encoded
+   *          property node.
+   */
+  public ConfigTransformer(final ZooReaderWriter zrw, VersionedPropCodec codec,
+      final PropStoreWatcher propStoreWatcher) {
+    this.zrw = zrw;
+    this.codec = codec;
+    this.propStoreWatcher = propStoreWatcher;
+
+    // default - allow for a conservative max delay of about a minute
+    retry =
+        Retry.builder().maxRetries(15).retryAfter(250, 
MILLISECONDS).incrementBy(500, MILLISECONDS)
+            .maxWait(5, SECONDS).backOffFactor(1.75).logInterval(3, 
MINUTES).createRetry();
+
+  }
+
+  public ConfigTransformer(final ZooReaderWriter zrw, VersionedPropCodec codec,
+      final PropStoreWatcher propStoreWatcher, final Retry retry) {
+    this.zrw = zrw;
+    this.codec = codec;
+    this.propStoreWatcher = propStoreWatcher;
+    this.retry = retry;
+  }
+
+  /**
+   * Transform the properties for the provided prop cache key.
+   *
+   * @return the encoded properties.
+   */
+  public VersionedProperties transform(final PropCacheKey propCacheKey) {
+    TransformLock lock = TransformLock.createLock(propCacheKey, zrw);
+    return transform(propCacheKey, lock);
+  }
+
+  // Allow external (mocked) TransformLock to be used
+  @VisibleForTesting
+  VersionedProperties transform(final PropCacheKey propCacheKey, final 
TransformLock lock) {
+
+    VersionedProperties results;
+    Instant start = Instant.now();
+    try {
+      while (!lock.isLocked()) {
+        try {
+          retry.useRetry();
+          retry.waitForNextAttempt();
+          // look and return node if created.
+          if (zrw.exists(propCacheKey.getPath())) {
+            Stat stat = new Stat();
+            byte[] bytes = zrw.getData(propCacheKey.getPath(), 
propStoreWatcher, stat);
+            return codec.fromBytes(stat.getVersion(), bytes);
+          }
+          // still does not exist - try again.
+          lock.lock();
+        } catch (InterruptedException ex) {
+          Thread.currentThread().interrupt();
+          throw new PropStoreException("Failed to get transform lock for " + 
propCacheKey, ex);
+        } catch (IllegalStateException ex) {
+          throw new PropStoreException("Failed to get transform lock for " + 
propCacheKey, ex);
+        }
+      }
+
+      Set<LegacyPropNode> upgradeNodes = readLegacyProps(propCacheKey);
+
+      upgradeNodes = convertDeprecatedProps(propCacheKey, upgradeNodes);
+
+      results = writeConverted(propCacheKey, upgradeNodes);
+
+      if (results == null) {
+        throw new PropStoreException("Could not create properties for " + 
propCacheKey, null);
+      }
+
+      // validate lock still valid before deletion.
+      if (!lock.validateLock()) {
+        throw new PropStoreException(
+            "legacy conversion failed. Lost transform lock for " + 
propCacheKey, null);
+      }
+
+      int errorCount = deleteLegacyProps(upgradeNodes);

Review Comment:
   Do we want to delete transformed properties as we do them, or delete them 
all at the end if successful?



##########
server/base/src/main/java/org/apache/accumulo/server/util/NamespacePropUtil.java:
##########
@@ -18,45 +18,43 @@
  */
 package org.apache.accumulo.server.util;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
+import java.util.Collection;
+import java.util.Map;
 
-import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.AbstractId;
 import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.ServerContext;
-import org.apache.zookeeper.KeeperException;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
 
-public class NamespacePropUtil {
-  public static boolean setNamespaceProperty(ServerContext context, 
NamespaceId namespaceId,
-      String property, String value) throws KeeperException, 
InterruptedException {
-    if (!Property.isTablePropertyValid(property, value))
-      return false;
+public class NamespacePropUtil implements PropUtil {
 
-    ZooReaderWriter zoo = context.getZooReaderWriter();
+  private NamespaceId namespaceId;
 
-    // create the zk node for per-namespace properties for this namespace if 
it doesn't already
-    // exist
-    String zkNamespacePath = getPath(context, namespaceId);
-    zoo.putPersistentData(zkNamespacePath, new byte[0], NodeExistsPolicy.SKIP);
+  private NamespacePropUtil() {}
 
-    // create the zk node for this property and set it's data to the specified 
value
-    String zPath = zkNamespacePath + "/" + property;
-    zoo.putPersistentData(zPath, value.getBytes(UTF_8), 
NodeExistsPolicy.OVERWRITE);
-
-    return true;
+  public static NamespacePropUtil factory() {
+    return new NamespacePropUtil();
   }
 
-  public static void removeNamespaceProperty(ServerContext context, 
NamespaceId namespaceId,
-      String property) throws InterruptedException, KeeperException {
-    String zPath = getPath(context, namespaceId) + "/" + property;
-    context.getZooReaderWriter().recursiveDelete(zPath, 
NodeMissingPolicy.SKIP);
+  @Override
+  public void setProperties(ServerContext context, AbstractId<?> namespaceId,
+      Map<String,String> properties) {
+    for (Map.Entry<String,String> prop : properties.entrySet()) {
+      // TODO reconcile with TablePropUtil on invalid, this throws exception, 
table ignores

Review Comment:
   Does this TODO still need to be addressed?



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java:
##########
@@ -126,7 +122,7 @@ public class Upgrader9to10 implements Upgrader {
   public void upgradeZookeeper(ServerContext context) {
     setMetaTableProps(context);
     upgradeRootTabletMetadata(context);
-    renameOldMasterPropsinZK(context);
+    // renameOldMasterPropsinZK(context);

Review Comment:
   remove?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to