EdColeman commented on code in PR #2569: URL: https://github.com/apache/accumulo/pull/2569#discussion_r858064514
########## server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.util.ConfigTransformer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + private final static VersionedPropCodec codec = VersionedPropCodec.getDefault(); + + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + private final ReadyMonitor zkReadyMon; + + /** + * Create instance using ZooPropStore.Builder + * + * @param instanceId + * the instance id + * @param zrw + * a wrapper set of utilities for accessing ZooKeeper. + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + * @param ticker + * a synthetic clock used for testing. + */ + private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, + final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher, + final Ticker ticker) { + + this.zrw = zrw; + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, cacheMetrics); + + if (ticker == null) { + cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + } else { + cache = + new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).withTicker(ticker).build(); + } + + try { + var path = ZooUtil.getRoot(instanceId); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException("Instance may not have been initialized, root node: " + path + + " does not exist in ZooKeeper"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper", + ex); + } + } + + public static PropStore initialize(final InstanceId instanceId, final ZooReaderWriter zrw) { + return new ZooPropStore.Builder(instanceId, zrw, zrw.getSessionTimeout()).build(); + } + + /** + * Create the system configuration node and initialize with empty props - used when creating a new + * Accumulo instance. + * <p> + * Needs to be called early in the Accumulo ZooKeeper initialization sequence so that correct + * watchers can be created for the instance when the PropStore is instantiated. + * + * @param instanceId + * the instance uuid. + * @param zrw + * a ZooReaderWriter + */ + public static void instancePathInit(final InstanceId instanceId, final ZooReaderWriter zrw) + throws InterruptedException, KeeperException { + var sysPropPath = PropCacheKey.forSystem(instanceId).getPath(); + VersionedProperties vProps = new VersionedProperties(); + try { + var created = + zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + if (!created) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath); + } + } catch (IOException ex) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath, ex); + } + } + + @Override + public boolean exists(final PropCacheKey propCacheKey) throws PropStoreException { + try { + if (zrw.exists(propCacheKey.getPath())) { + return true; + } + } catch (KeeperException ex) { + // ignore Keeper exception on check. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted testing if node exists", ex); + } + return false; + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + */ + public synchronized static void initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID()); + createInitialProps(context, sysPropKey, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheKey + * a prop id + * @param props + * initial properties + */ + public static void createInitialProps(final ServerContext context, + final PropCacheKey propCacheKey, Map<String,String> props) { + log.trace("createInitialProps() called for {}", propCacheKey); + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheKey.getPath())) { + return; + } + log.debug("Creating initial property node for {}", propCacheKey); + VersionedProperties vProps = new VersionedProperties(props); + zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps), + ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted creating node " + propCacheKey, ex); + } catch (Exception ex) { + throw new PropStoreException("Failed to create node " + propCacheKey, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + @Override + public void create(PropCacheKey propCacheKey, Map<String,String> props) { + + try { + VersionedProperties vProps = new VersionedProperties(props); + String path = propCacheKey.getPath(); + zrw.putPersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheKey, ex); + } + } + + /** + * get or create properties from the store. If the property node does not exist in ZooKeeper, + * legacy properties exist, they will be converted to the new storage form and naming convention. + * The legacy properties are deleted once the new node format is written. + * + * @param propCacheKey + * the prop cache key + * @return The versioned properties. + * @throws PropStoreException + * if the updates fails because of an underlying store exception + */ + @Override + public @NonNull VersionedProperties get(final PropCacheKey propCacheKey) + throws PropStoreException { + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. + propStoreWatcher.registerListener(propCacheKey, this); + + var props = cache.get(propCacheKey); + if (props != null) { + return props; + } + + if (propCacheKey.getIdType() == PropCacheKey.IdType.SYSTEM) { + return new ConfigTransformer(zrw, codec, propStoreWatcher).transform(propCacheKey); + } + + throw new PropStoreException( + "Invalid request for " + propCacheKey + ", the property node does not exist", null); + } + + /** + * Convenience method for utilities that may not have a PropStore read the encoded properties + * directly from ZooKeeper. This allows utilities access when there is a ZooKeeper, may there may + * not be a full instance running. All exception handling is left to the caller. + * + * @param propCacheKey + * the prop cache key + * @param watcher + * a prop store watcher that will receive / handle ZooKeeper events. + * @param zrw + * a ZooReaderWriter + * @return the versioned properties or null if the node does not exist. + * @throws IOException + * if the underlying data from the ZooKeeper node cannot be decoded. + * @throws KeeperException + * if a ZooKeeper exception occurs + * @throws InterruptedException + * if the ZooKeeper read was interrupted. + */ + public static @Nullable VersionedProperties readFromZk(final PropCacheKey propCacheKey, + final PropStoreWatcher watcher, final ZooReaderWriter zrw) + throws IOException, KeeperException, InterruptedException { + if (zrw.exists(propCacheKey.getPath())) { + Stat stat = new Stat(); + byte[] bytes = zrw.getData(propCacheKey.getPath(), watcher, stat); + return codec.fromBytes(stat.getVersion(), bytes); + } + return null; Review Comment: Resolved with e8cbf539bd -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
