EdColeman commented on code in PR #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r847335370


##########
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java:
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.BiFunction;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedPropCodec;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheKey;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.apache.accumulo.server.conf.util.ConfigTransformer;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.annotations.VisibleForTesting;
+
+public class ZooPropStore implements PropStore, PropChangeListener {
+
+  private final static Logger log = 
LoggerFactory.getLogger(ZooPropStore.class);
+  private final static VersionedPropCodec codec = 
VersionedPropCodec.getDefault();
+
+  private final ZooReaderWriter zrw;
+  private final PropStoreWatcher propStoreWatcher;
+  private final PropCache cache;
+  private final PropStoreMetrics cacheMetrics = new PropStoreMetrics();
+  private final ReadyMonitor zkReadyMon;
+
+  /**
+   * Create instance using ZooPropStore.Builder
+   *
+   * @param instanceId
+   *          the instance id
+   * @param zrw
+   *          a wrapper set of utilities for accessing ZooKeeper.
+   * @param readyMonitor
+   *          coordination utility for ZooKeeper connection status.
+   * @param propStoreWatcher
+   *          an extended ZooKeeper watcher
+   * @param ticker
+   *          a synthetic clock used for testing.
+   */
+  private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw,
+      final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher,
+      final Ticker ticker) {
+
+    this.zrw = zrw;
+    this.zkReadyMon = readyMonitor;
+    this.propStoreWatcher = propStoreWatcher;
+
+    MetricsUtil.initializeProducers(cacheMetrics);
+
+    ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, 
cacheMetrics);
+
+    if (ticker == null) {
+      cache = new PropCacheCaffeineImpl.Builder(propLoader, 
cacheMetrics).build();
+    } else {
+      cache =
+          new PropCacheCaffeineImpl.Builder(propLoader, 
cacheMetrics).withTicker(ticker).build();
+    }
+
+    try {
+      var path = ZooUtil.getRoot(instanceId);
+      if (zrw.exists(path, propStoreWatcher)) {
+        log.debug("Have a ZooKeeper connection and found instance node: {}", 
instanceId);
+        zkReadyMon.setReady();
+      } else {
+        throw new IllegalStateException("Instance may not have been 
initialized, root node: " + path
+            + " does not exist in ZooKeeper");
+      }
+    } catch (InterruptedException | KeeperException ex) {
+      throw new IllegalStateException("Failed to read root node " + instanceId 
+ " from ZooKeeper",
+          ex);
+    }
+  }
+
+  public static PropStore initialize(final InstanceId instanceId, final 
ZooReaderWriter zrw) {
+    return new ZooPropStore.Builder(instanceId, zrw, 
zrw.getSessionTimeout()).build();
+  }
+
+  /**
+   * Create the system configuration node and initialize with empty props - 
used when creating a new
+   * Accumulo instance.
+   * <p>
+   * Needs to be called early in the Accumulo ZooKeeper initialization 
sequence so that correct
+   * watchers can be created for the instance when the PropStore is 
instantiated.
+   *
+   * @param instanceId
+   *          the instance uuid.
+   * @param zrw
+   *          a ZooReaderWriter
+   */
+  public static void instancePathInit(final InstanceId instanceId, final 
ZooReaderWriter zrw)
+      throws InterruptedException, KeeperException {
+    var sysPropPath = PropCacheKey.forSystem(instanceId).getPath();
+    VersionedProperties vProps = new VersionedProperties();
+    try {
+      var created =
+          zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+      if (!created) {
+        throw new IllegalStateException(
+            "Failed to create default system props during initialization at: 
{}" + sysPropPath);
+      }
+    } catch (IOException ex) {
+      throw new IllegalStateException(
+          "Failed to create default system props during initialization at: {}" 
+ sysPropPath, ex);
+    }
+  }
+
+  @Override
+  public boolean exists(final PropCacheKey propCacheKey) throws 
PropStoreException {
+    try {
+      if (zrw.exists(propCacheKey.getPath())) {
+        return true;
+      }
+
+    } catch (KeeperException ex) {
+      // ignore Keeper exception on check.
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new PropStoreException("Interrupted testing if node exists", ex);
+    }
+    return false;
+  }
+
+  /**
+   * Create initial system props for the instance. If the node already exists, 
no action is
+   * performed.
+   *
+   * @param context
+   *          the server context.
+   * @param initProps
+   *          map of k, v pairs of initial properties.
+   */
+  public synchronized static void initSysProps(final ServerContext context,
+      final Map<String,String> initProps) {
+    PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID());
+    createInitialProps(context, sysPropKey, initProps);
+  }
+
+  /**
+   * Create initial properties if they do not exist. If the node exists, 
initialization will be
+   * skipped.
+   *
+   * @param context
+   *          the system context
+   * @param propCacheKey
+   *          a prop id
+   * @param props
+   *          initial properties
+   */
+  public static void createInitialProps(final ServerContext context,
+      final PropCacheKey propCacheKey, Map<String,String> props) {
+
+    try {
+      ZooReaderWriter zrw = context.getZooReaderWriter();
+      if (zrw.exists(propCacheKey.getPath())) {
+        return;
+      }
+      VersionedProperties vProps = new VersionedProperties(props);
+      zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps),
+          ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new PropStoreException("Interrupted creating node " + 
propCacheKey, ex);
+    } catch (Exception ex) {
+      throw new PropStoreException("Failed to create node " + propCacheKey, 
ex);
+    }
+  }
+
+  public PropStoreMetrics getMetrics() {
+    return cacheMetrics;
+  }
+
+  @Override
+  public void create(PropCacheKey propCacheKey, Map<String,String> props) {
+
+    try {
+      VersionedProperties vProps = new VersionedProperties(props);
+      String path = propCacheKey.getPath();
+      zrw.putPersistentData(path, codec.toBytes(vProps), 
ZooUtil.NodeExistsPolicy.FAIL);
+    } catch (IOException | KeeperException | InterruptedException ex) {
+      throw new PropStoreException("Failed to serialize properties for " + 
propCacheKey, ex);
+    }
+  }
+
+  /**
+   * get or create properties from the store. If the property node does not 
exist in ZooKeeper,
+   * legacy properties exist, they will be converted to the new storage form 
and naming convention.
+   * The legacy properties are deleted once the new node format is written.
+   *
+   * @param propCacheKey
+   *          the prop cache key
+   * @return The versioned properties or null if the properties do not exist 
for the id.
+   * @throws PropStoreException
+   *           if the updates fails because of an underlying store exception
+   */
+  @Override
+  public @NonNull VersionedProperties get(final PropCacheKey propCacheKey)
+      throws PropStoreException {
+    checkZkConnection(); // if ZK not connected, block, do not just return a 
cached value.

Review Comment:
   On a disconnect, the cache will be cleared and the event propagated. This 
check the guaranties that all subsequent reads block in case ZooKeeper 
recovers. Any code that passes this check and then ZooKeeper disconnects will 
either be retrying (reties with zoo reader writer) or have the ZooKeeper calls 
fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to