keith-turner commented on a change in pull request #2324:
URL: https://github.com/apache/accumulo/pull/2324#discussion_r742132361



##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+
+  private final VersionedPropCodec propCodec;
+
+  private final ReadyMonitor zkReadyMon;
+
+  private final ServerContext context;
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+
+  private Map<String,String> fixedProps = null;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param context
+   *          the server context
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   */
+  private ZooPropStore(final ServerContext context, final ReadyMonitor 
readyMonitor,
+      final PropStoreWatcher propStoreWatcher) {
+
+    this.context = context;
+    this.zrw = context.getZooReaderWriter();
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    this.propCodec = context.getVersionedPropertiesCodec();
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, 
context.getVersionedPropertiesCodec(),
+        propStoreWatcher, cacheMetrics);
+
+    cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build();
+
+    try {
+      var path = ZooUtil.getRoot(context.getInstanceID());
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("have a ZooKeeper connection");
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException(
+            "Instance may not have been initialized, root node" + path + " 
does not exist");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node from 
ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public synchronized static boolean initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID());
+    return createInitialProps(context, sysPropsId, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheId
+   *          a prop id
+   * @param props
+   *          initial properties
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public static boolean createInitialProps(final ServerContext context,
+      final PropCacheId propCacheId, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheId.getPath())) {
+        return false;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      return zrw.putPersistentData(propCacheId.getPath(),
+          context.getVersionedPropertiesCodec().toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted creating node " + 
propCacheId, ex);
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to create node " + propCacheId, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  // TODO - evaluate returning the props instead of boolean.
+  @Override
+  public boolean create(PropCacheId propCacheId, Map<String,String> props)
+      throws PropStoreException {
+
+    VersionedProperties vProps = new VersionedProperties(props);
+
+    try {
+
+      var path = propCacheId.getPath();
+      if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL)) {
+        return false;
+      }
+
+      Stat stat = zrw.getStatus(path, propStoreWatcher);
+
+      if (stat.getVersion() != vProps.getNextVersion()) {
+        throw new PropStoreException(
+            "Invalid data version on create, received " + stat.getVersion(),
+            new IllegalStateException());
+      }
+
+      return true;
+
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public VersionedProperties get(final PropCacheId propCacheId) throws 
PropStoreException {
+    try {
+
+      checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.
+      propStoreWatcher.registerListener(propCacheId, this);
+      return cache.get(propCacheId);
+
+    } catch (Exception ex) {
+      throw new PropStoreException("read from prop store get() failed for: " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public boolean putAll(final PropCacheId propCacheId, final 
Map<String,String> props)
+      throws PropStoreException {
+
+    try {
+

Review comment:
       > What I think you are suggesting is that both would receive success and 
neither would know which value was set without double checking.
   
   That situation can happen even w/o operations to set a property overlapping 
in time.  Consider the following.
   
    * via the shell user A makes an Accumulo API call to set the split 
threshold to 1G.  This call completes successfully w/o any other API set prop 
calls running concurrently.  The API call returns true and the shell stores 
this return value in a variable for user A's shell instance. 
    * via the shell user B makes an Accumulo API call to set the split 
threshold to 512M.  This call completes successfully w/o any other API set prop 
calls running concurrently.  The API call returns true and the shell stores 
this return value in a variable for user B's shell instance.
    * User A' shell prints success.
    * User B's shell prints success.
   
   So even if the API code failed when concurrency was detected, it would not 
fail in this case because the concurrency happened outside the API call.  
However the concurrency still happened and one user's set was overwritten by 
the other even though success was reported.
   
   

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+
+  private final VersionedPropCodec propCodec;
+
+  private final ReadyMonitor zkReadyMon;
+
+  private final ServerContext context;
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+
+  private Map<String,String> fixedProps = null;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param context
+   *          the server context
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   */
+  private ZooPropStore(final ServerContext context, final ReadyMonitor 
readyMonitor,
+      final PropStoreWatcher propStoreWatcher) {
+
+    this.context = context;
+    this.zrw = context.getZooReaderWriter();
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    this.propCodec = context.getVersionedPropertiesCodec();
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, 
context.getVersionedPropertiesCodec(),
+        propStoreWatcher, cacheMetrics);
+
+    cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build();
+
+    try {
+      var path = ZooUtil.getRoot(context.getInstanceID());
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("have a ZooKeeper connection");
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException(
+            "Instance may not have been initialized, root node" + path + " 
does not exist");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node from 
ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public synchronized static boolean initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID());
+    return createInitialProps(context, sysPropsId, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheId
+   *          a prop id
+   * @param props
+   *          initial properties
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public static boolean createInitialProps(final ServerContext context,
+      final PropCacheId propCacheId, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheId.getPath())) {
+        return false;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      return zrw.putPersistentData(propCacheId.getPath(),
+          context.getVersionedPropertiesCodec().toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted creating node " + 
propCacheId, ex);
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to create node " + propCacheId, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  // TODO - evaluate returning the props instead of boolean.
+  @Override
+  public boolean create(PropCacheId propCacheId, Map<String,String> props)
+      throws PropStoreException {
+
+    VersionedProperties vProps = new VersionedProperties(props);
+
+    try {
+
+      var path = propCacheId.getPath();
+      if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL)) {
+        return false;
+      }
+
+      Stat stat = zrw.getStatus(path, propStoreWatcher);
+
+      if (stat.getVersion() != vProps.getNextVersion()) {
+        throw new PropStoreException(
+            "Invalid data version on create, received " + stat.getVersion(),
+            new IllegalStateException());
+      }
+
+      return true;
+
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public VersionedProperties get(final PropCacheId propCacheId) throws 
PropStoreException {
+    try {
+
+      checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.
+      propStoreWatcher.registerListener(propCacheId, this);
+      return cache.get(propCacheId);
+
+    } catch (Exception ex) {
+      throw new PropStoreException("read from prop store get() failed for: " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public boolean putAll(final PropCacheId propCacheId, final 
Map<String,String> props)
+      throws PropStoreException {
+
+    try {
+

Review comment:
       The only way to deal the concurrency problem at the Accumulo API level 
would be to expose something like the version for getting and setting 
properties. The current APIs should probably retry.  If we have new APIs that 
expose this information, then those could fail if the version changed since 
reading the properties.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+
+  private final VersionedPropCodec propCodec;
+
+  private final ReadyMonitor zkReadyMon;
+
+  private final ServerContext context;
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+
+  private Map<String,String> fixedProps = null;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param context
+   *          the server context
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   */
+  private ZooPropStore(final ServerContext context, final ReadyMonitor 
readyMonitor,
+      final PropStoreWatcher propStoreWatcher) {
+
+    this.context = context;
+    this.zrw = context.getZooReaderWriter();
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    this.propCodec = context.getVersionedPropertiesCodec();
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, 
context.getVersionedPropertiesCodec(),
+        propStoreWatcher, cacheMetrics);
+
+    cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build();
+
+    try {
+      var path = ZooUtil.getRoot(context.getInstanceID());
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("have a ZooKeeper connection");
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException(
+            "Instance may not have been initialized, root node" + path + " 
does not exist");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node from 
ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public synchronized static boolean initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID());
+    return createInitialProps(context, sysPropsId, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheId
+   *          a prop id
+   * @param props
+   *          initial properties
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public static boolean createInitialProps(final ServerContext context,
+      final PropCacheId propCacheId, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheId.getPath())) {
+        return false;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      return zrw.putPersistentData(propCacheId.getPath(),
+          context.getVersionedPropertiesCodec().toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted creating node " + 
propCacheId, ex);
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to create node " + propCacheId, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  // TODO - evaluate returning the props instead of boolean.
+  @Override
+  public boolean create(PropCacheId propCacheId, Map<String,String> props)
+      throws PropStoreException {
+
+    VersionedProperties vProps = new VersionedProperties(props);
+
+    try {
+
+      var path = propCacheId.getPath();
+      if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL)) {
+        return false;
+      }
+
+      Stat stat = zrw.getStatus(path, propStoreWatcher);
+
+      if (stat.getVersion() != vProps.getNextVersion()) {
+        throw new PropStoreException(
+            "Invalid data version on create, received " + stat.getVersion(),
+            new IllegalStateException());
+      }
+
+      return true;
+
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public VersionedProperties get(final PropCacheId propCacheId) throws 
PropStoreException {
+    try {
+
+      checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.
+      propStoreWatcher.registerListener(propCacheId, this);
+      return cache.get(propCacheId);
+
+    } catch (Exception ex) {
+      throw new PropStoreException("read from prop store get() failed for: " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public boolean putAll(final PropCacheId propCacheId, final 
Map<String,String> props)
+      throws PropStoreException {
+
+    try {
+

Review comment:
       The only way to deal the concurrency problem at the Accumulo API level 
would be to expose something like the version for getting and setting 
properties in the API. The current APIs should probably retry.  If we have new 
APIs that expose version information, then those could fail if the version 
changed since reading the properties.

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+
+  private final VersionedPropCodec propCodec;
+
+  private final ReadyMonitor zkReadyMon;
+
+  private final ServerContext context;
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+
+  private Map<String,String> fixedProps = null;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param context
+   *          the server context
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   */
+  private ZooPropStore(final ServerContext context, final ReadyMonitor 
readyMonitor,
+      final PropStoreWatcher propStoreWatcher) {
+
+    this.context = context;
+    this.zrw = context.getZooReaderWriter();
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    this.propCodec = context.getVersionedPropertiesCodec();
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, 
context.getVersionedPropertiesCodec(),
+        propStoreWatcher, cacheMetrics);
+
+    cache = new CaffeineCache.Builder(propLoader, cacheMetrics).build();
+
+    try {
+      var path = ZooUtil.getRoot(context.getInstanceID());
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("have a ZooKeeper connection");
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException(
+            "Instance may not have been initialized, root node" + path + " 
does not exist");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node from 
ZooKeeper", ex);
+    }
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public synchronized static boolean initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheId sysPropsId = PropCacheId.forSystem(context.getInstanceID());
+    return createInitialProps(context, sysPropsId, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheId
+   *          a prop id
+   * @param props
+   *          initial properties
+   * @return true if props create, false if the node existed and 
initialization was skipped.
+   */
+  public static boolean createInitialProps(final ServerContext context,
+      final PropCacheId propCacheId, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheId.getPath())) {
+        return false;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      return zrw.putPersistentData(propCacheId.getPath(),
+          context.getVersionedPropertiesCodec().toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted creating node " + 
propCacheId, ex);
+    } catch (Exception ex) {
+      throw new IllegalStateException("Failed to create node " + propCacheId, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  // TODO - evaluate returning the props instead of boolean.
+  @Override
+  public boolean create(PropCacheId propCacheId, Map<String,String> props)
+      throws PropStoreException {
+
+    VersionedProperties vProps = new VersionedProperties(props);
+
+    try {
+
+      var path = propCacheId.getPath();
+      if (!zrw.putPersistentData(path, propCodec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL)) {
+        return false;
+      }
+
+      Stat stat = zrw.getStatus(path, propStoreWatcher);
+
+      if (stat.getVersion() != vProps.getNextVersion()) {
+        throw new PropStoreException(
+            "Invalid data version on create, received " + stat.getVersion(),
+            new IllegalStateException());
+      }
+
+      return true;
+
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public VersionedProperties get(final PropCacheId propCacheId) throws 
PropStoreException {
+    try {
+
+      checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.
+      propStoreWatcher.registerListener(propCacheId, this);
+      return cache.get(propCacheId);
+
+    } catch (Exception ex) {
+      throw new PropStoreException("read from prop store get() failed for: " + 
propCacheId, ex);
+    }
+  }
+
+  @Override
+  public boolean putAll(final PropCacheId propCacheId, final 
Map<String,String> props)
+      throws PropStoreException {
+
+    try {
+

Review comment:
       Another way to deal with concurrency at the API level instead of 
exposing version information would be a new API that supports conditional set.  
An API where a user could do something like set  `A=x,B=z` only if 
`A=r,B=y,C=m`.  This could be implemented on the backend using the version 
information w/o exposing the version information in the API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to