EdColeman commented on code in PR #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r854660722


##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.accumulo.server.conf.util.ConfigTransformer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.annotations.VisibleForTesting;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+  private final static VersionedPropCodec codec = 
VersionedPropCodec.getDefault();
+
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+  private final ReadyMonitor zkReadyMon;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param instanceId
+   *          the instance id
+   * @param zrw
+   *          a wrapper set of utilities for accessing ZooKeeper.
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   * @param ticker
+   *          a synthetic clock used for testing.
+   */
+  private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw,
+      final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher,
+      final Ticker ticker) {
+
+    this.zrw = zrw;
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    MetricsUtil.initializeProducers(cacheMetrics);
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, 
cacheMetrics);
+
+    if (ticker == null) {
+      cache = new PropCacheCaffeineImpl.Builder(propLoader, 
cacheMetrics).build();
+    } else {
+      cache =
+          new PropCacheCaffeineImpl.Builder(propLoader, 
cacheMetrics).withTicker(ticker).build();
+    }
+
+    try {
+      var path = ZooUtil.getRoot(instanceId);
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("Have a ZooKeeper connection and found instance node: {}", 
instanceId);
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException("Instance may not have been 
initialized, root node: " + path
+            + " does not exist in ZooKeeper");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node " + instanceId 
+ " from ZooKeeper",
+          ex);
+    }
+  }
+
+  public static PropStore initialize(final InstanceId instanceId, final 
ZooReaderWriter zrw) {
+    return new ZooPropStore.Builder(instanceId, zrw, 
zrw.getSessionTimeout()).build();
+  }
+
+  /**
+   * Create the system configuration node and initialize with empty props - 
used when creating a new
+   * Accumulo instance.
+   * <p>
+   * Needs to be called early in the Accumulo ZooKeeper initialization 
sequence so that correct
+   * watchers can be created for the instance when the PropStore is 
instantiated.
+   *
+   * @param instanceId
+   *          the instance uuid.
+   * @param zrw
+   *          a ZooReaderWriter
+   */
+  public static void instancePathInit(final InstanceId instanceId, final 
ZooReaderWriter zrw)
+      throws InterruptedException, KeeperException {
+    var sysPropPath = PropCacheKey.forSystem(instanceId).getPath();
+    VersionedProperties vProps = new VersionedProperties();
+    try {
+      var created =
+          zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+      if (!created) {
+        throw new IllegalStateException(
+            "Failed to create default system props during initialization at: 
{}" + sysPropPath);
+      }
+    } catch (IOException ex) {
+      throw new IllegalStateException(
+          "Failed to create default system props during initialization at: {}" 
+ sysPropPath, ex);
+    }
+  }
+
+  @Override
+  public boolean exists(final PropCacheKey propCacheKey) throws 
PropStoreException {
+    try {
+      if (zrw.exists(propCacheKey.getPath())) {
+        return true;
+      }
+
+    } catch (KeeperException ex) {
+      // ignore Keeper exception on check.
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new PropStoreException("Interrupted testing if node exists", ex);
+    }
+    return false;
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   */
+  public synchronized static void initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID());
+    createInitialProps(context, sysPropKey, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheKey
+   *          a prop id
+   * @param props
+   *          initial properties
+   */
+  public static void createInitialProps(final ServerContext context,
+      final PropCacheKey propCacheKey, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheKey.getPath())) {
+        return;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps),
+          ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new PropStoreException("Interrupted creating node " + 
propCacheKey, ex);
+    } catch (Exception ex) {
+      throw new PropStoreException("Failed to create node " + propCacheKey, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  @Override
+  public void create(PropCacheKey propCacheKey, Map<String,String> props) {
+
+    try {
+      VersionedProperties vProps = new VersionedProperties(props);
+      String path = propCacheKey.getPath();
+      zrw.putPersistentData(path, codec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheKey, ex);
+    }
+  }
+
+  /**
+   * get or create properties from the store. If the property node does not 
exist in ZooKeeper,
+   * legacy properties exist, they will be converted to the new storage form 
and naming convention.
+   * The legacy properties are deleted once the new node format is written.
+   *
+   * @param propCacheKey
+   *          the prop cache key
+   * @return The versioned properties or null if the properties do not exist 
for the id.
+   * @throws PropStoreException
+   *           if the updates fails because of an underlying store exception
+   */
+  @Override
+  public @NonNull VersionedProperties get(final PropCacheKey propCacheKey)
+      throws PropStoreException {
+    checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.

Review Comment:
   Again - this code was to handle the case when ZooKeepeer sessions errors 
would be transient. Primarily the case of a ZooKeeper node failure, but when 
there is still a quorum.  When that happens, ZooKeeper sends a disconnected 
event and then reconnected event, if the connection established before 
time-out. In those cases, you are in an unknown state while disconnected - you 
have not failed, but you are also unable to establish the current state, so you 
likel;y should not proceed as if everything is okay either.
   
   The check / ready monitor will block until the connection event is received 
- future reads may succeed, but if you allow them to try without the connection 
they will fail.  So the code is making a best effort to handle that situation. 
Because of the asynchronous nature and the reality that failures can happen 
anywhere, I don't think that the race can be completely eliminated, but the 
errors are handled and the code should allow for a more gracefully handling of 
disconnect events than just allowing the errors to occur.
   
   Transient ZooKeeper errors are not uncommon, and this would should allow 
better handling of things like a rolling ZooKeeper restart. If that is not 
handled 100% in the current code, this still allows us to move in that 
direction and then add additional handling that where it is needed.
   
   I can remove the code if you disagree - or maybe I've missed the point and 
we are talking past each other?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to