keith-turner commented on code in PR #2569: URL: https://github.com/apache/accumulo/pull/2569#discussion_r855524825
########## server/base/src/main/java/org/apache/accumulo/server/conf/util/TransformToken.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Arrays; +import java.util.Objects; +import java.util.UUID; + +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a token used in property conversion. The token is used to limit the number of processes + * that try to create a property node and transform the legacy property format to the 2.1 encoded + * properties. Processes do not queue for a token (using a sequential node) - processes should look + * for the token to exist, and if present wait and then periodically re-check for the property node + * to be created by the process that created / has the token. + * <p> + * Features + * <ul> + * <li>Uses ephemeral node that will be removed if transform process terminates without + * completing</li> + * <li>Watcher not necessary - the existence of the node and uuid in data sufficient to detect + * changes.</li> + * </ul> + */ +public class TransformToken { + public static final String TRANSFORM_TOKEN = "/transform_token"; + private static final Logger log = LoggerFactory.getLogger(TransformToken.class); + private final TokenUUID tokenUUID = new TokenUUID(); + private final String path; + private final ZooReaderWriter zrw; + private boolean haveToken = false; + + private TransformToken(final @NonNull PropCacheKey key, final ZooReaderWriter zrw) { + path = key.getBasePath() + TRANSFORM_TOKEN; + this.zrw = zrw; + } + + /** + * Create a lock node in ZooKeeper using an ephemeral node. Will not throw and exception except on + * an interrupt. If the lock node is created, the returned lock will be locked. If another lock + * already exists, the lock is unlocked and the caller can decide to either wait for the resource + * to be created by the thread that created the lock, or try calling to {@code lock} to succeed + * + * @param key + * a PropCacheKey that defines the storage location of the created lock and the + * associated property nodes. + * @param zrw + * a ZooReaderWriter + * @return an TransformLock instance. + * @throws PropStoreException + * is the lock creation fails due to an underlying ZooKeeper exception. + */ + public static TransformToken createToken(final @NonNull PropCacheKey key, + final ZooReaderWriter zrw) { + TransformToken token = new TransformToken(key, zrw); + token.haveToken = token.holdToken(); + return token; + } + + public boolean holdToken() { + if (haveToken) { + return true; + } + try { + // existence check should be lighter-weight than failing on NODE_EXISTS exception + if (zrw.exists(path)) { + return false; + } + // if this completes this thread has created the lock + zrw.putEphemeralData(path, tokenUUID.asBytes()); + log.trace("wrote property upgrade lock: {} - {}", path, tokenUUID); + return true; + } catch (KeeperException ex) { + log.debug( + "Failed to write transform lock for " + path + " another process may have created one", + ex); + return false; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted getting transform lock", ex); + } + } + + /** + * Return the token status + * + * @return true if this instance has created the token, false otherwise. + */ + public boolean haveToken() { + return haveToken; + } + + /** + * Verify lock is still present and valid while keeping the lock. + * + * @return true if lock is valid, false otherwise + */ + public boolean validateToken() { + try { + byte[] readId = zrw.getData(path); + log.trace("validate token: read: {} - expected: {}", readId, tokenUUID); + return Arrays.equals(readId, tokenUUID.asBytes()); + } catch (KeeperException ex) { + throw new PropStoreException("Failed to validate lock", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted while validating lock", ex); + } + } + + /** + * If the lock was created by this instance the uuid created nad the uuid stored in the ZooKeeper + * data will match. + */ + public void releaseToken() { + try { + log.trace("releaseToken called - {} - exists in ZooKeeper: {}", path, zrw.exists(path)); Review Comment: May want to avoid the ZK call when trace logging is not enabled. ```suggestion if(log.isTraceEnabled()){ log.trace("releaseToken called - {} - exists in ZooKeeper: {}", path, zrw.exists(path)); } ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.util.ConfigTransformer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + private final static VersionedPropCodec codec = VersionedPropCodec.getDefault(); + + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + private final ReadyMonitor zkReadyMon; + + /** + * Create instance using ZooPropStore.Builder + * + * @param instanceId + * the instance id + * @param zrw + * a wrapper set of utilities for accessing ZooKeeper. + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + * @param ticker + * a synthetic clock used for testing. + */ + private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, + final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher, + final Ticker ticker) { + + this.zrw = zrw; + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, cacheMetrics); + + if (ticker == null) { + cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + } else { + cache = + new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).withTicker(ticker).build(); + } + + try { + var path = ZooUtil.getRoot(instanceId); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException("Instance may not have been initialized, root node: " + path + + " does not exist in ZooKeeper"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper", + ex); + } + } + + public static PropStore initialize(final InstanceId instanceId, final ZooReaderWriter zrw) { + return new ZooPropStore.Builder(instanceId, zrw, zrw.getSessionTimeout()).build(); + } + + /** + * Create the system configuration node and initialize with empty props - used when creating a new + * Accumulo instance. + * <p> + * Needs to be called early in the Accumulo ZooKeeper initialization sequence so that correct + * watchers can be created for the instance when the PropStore is instantiated. + * + * @param instanceId + * the instance uuid. + * @param zrw + * a ZooReaderWriter + */ + public static void instancePathInit(final InstanceId instanceId, final ZooReaderWriter zrw) + throws InterruptedException, KeeperException { + var sysPropPath = PropCacheKey.forSystem(instanceId).getPath(); + VersionedProperties vProps = new VersionedProperties(); + try { + var created = + zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + if (!created) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath); + } + } catch (IOException ex) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath, ex); + } + } + + @Override + public boolean exists(final PropCacheKey propCacheKey) throws PropStoreException { + try { + if (zrw.exists(propCacheKey.getPath())) { + return true; + } + } catch (KeeperException ex) { + // ignore Keeper exception on check. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted testing if node exists", ex); + } + return false; + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + */ + public synchronized static void initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID()); + createInitialProps(context, sysPropKey, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheKey + * a prop id + * @param props + * initial properties + */ + public static void createInitialProps(final ServerContext context, + final PropCacheKey propCacheKey, Map<String,String> props) { + log.trace("createInitialProps() called for {}", propCacheKey); + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheKey.getPath())) { + return; + } + log.debug("Creating initial property node for {}", propCacheKey); + VersionedProperties vProps = new VersionedProperties(props); + zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps), + ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted creating node " + propCacheKey, ex); + } catch (Exception ex) { + throw new PropStoreException("Failed to create node " + propCacheKey, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + @Override + public void create(PropCacheKey propCacheKey, Map<String,String> props) { + + try { + VersionedProperties vProps = new VersionedProperties(props); + String path = propCacheKey.getPath(); + zrw.putPersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheKey, ex); + } + } + + /** + * get or create properties from the store. If the property node does not exist in ZooKeeper, + * legacy properties exist, they will be converted to the new storage form and naming convention. + * The legacy properties are deleted once the new node format is written. + * + * @param propCacheKey + * the prop cache key + * @return The versioned properties. + * @throws PropStoreException + * if the updates fails because of an underlying store exception + */ + @Override + public @NonNull VersionedProperties get(final PropCacheKey propCacheKey) + throws PropStoreException { + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. + propStoreWatcher.registerListener(propCacheKey, this); + + var props = cache.get(propCacheKey); + if (props != null) { + return props; + } + + if (propCacheKey.getIdType() == PropCacheKey.IdType.SYSTEM) { + return new ConfigTransformer(zrw, codec, propStoreWatcher).transform(propCacheKey); + } + + throw new PropStoreException( + "Invalid request for " + propCacheKey + ", the property node does not exist", null); + } + + /** + * Convenience method for utilities that may not have a PropStore read the encoded properties + * directly from ZooKeeper. This allows utilities access when there is a ZooKeeper, may there may + * not be a full instance running. All exception handling is left to the caller. + * + * @param propCacheKey + * the prop cache key + * @param watcher + * a prop store watcher that will receive / handle ZooKeeper events. + * @param zrw + * a ZooReaderWriter + * @return the versioned properties or null if the node does not exist. + * @throws IOException + * if the underlying data from the ZooKeeper node cannot be decoded. + * @throws KeeperException + * if a ZooKeeper exception occurs + * @throws InterruptedException + * if the ZooKeeper read was interrupted. + */ + public static @Nullable VersionedProperties readFromZk(final PropCacheKey propCacheKey, + final PropStoreWatcher watcher, final ZooReaderWriter zrw) + throws IOException, KeeperException, InterruptedException { + if (zrw.exists(propCacheKey.getPath())) { + Stat stat = new Stat(); + byte[] bytes = zrw.getData(propCacheKey.getPath(), watcher, stat); + return codec.fromBytes(stat.getVersion(), bytes); + } + return null; Review Comment: The following change should be more efficient for the case where the prop exists (assuming thats the most frequent case) and avoids a race condition. ```suggestion try { Stat stat = new Stat(); byte[] bytes = zrw.getData(propCacheKey.getPath(), watcher, stat); return codec.fromBytes(stat.getVersion(), bytes); } catch (NoNodeException e) { return null; } ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigPropertyPrinter.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.cli.ServerUtilOpts; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.auto.service.AutoService; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// TODO - this is in progress and should not be merged without changes. +// TODO - implement json output (or remove option) +@AutoService(KeywordExecutable.class) +@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT", + justification = "app is run in same security context as user providing the filename") +public class ConfigPropertyPrinter implements KeywordExecutable { + + private static final Logger log = LoggerFactory.getLogger(ConfigPropertyPrinter.class); + + public ConfigPropertyPrinter() {} + + public static void main(String[] args) throws Exception { + new ConfigPropertyPrinter().execute(args); + } + + @Override + public String keyword() { + return "config-print"; + } + + @Override + public String description() { + return "prints the properties stored in ZooKeeper"; + } + + @Override + public void execute(String[] args) throws Exception { + + ConfigPropertyPrinter.Opts opts = new ConfigPropertyPrinter.Opts(); + opts.parseArgs(ConfigPropertyPrinter.class.getName(), args); + + ServerContext context = opts.getServerContext(); + + print(context, opts.outfile, opts.jsonFmt); + + } + + public void print(final ServerContext context, final String outfile, final boolean jsonFmt) { + + Map<String,String> namespaces; + Map<String,String> tables; + + AccumuloClient client = Accumulo.newClient().from(context.getProperties()).build(); + + ZooReaderWriter zrw = context.getZooReaderWriter(); + + try { + namespaces = client.namespaceOperations().namespaceIdMap(); + tables = client.tableOperations().tableIdMap(); + } catch (AccumuloException | AccumuloSecurityException ex) { + throw new IllegalStateException("Failed to read namespaces / tables", ex); + } + + log.info("Namespaces: {}", namespaces); + log.info("Tables: {}", tables); + + PrintStream origStream = null; + + try { + + OutputStream outStream; + + if (outfile == null || "".equals(outfile)) { + log.info("No output file, using stdout."); + origStream = System.out; + outStream = System.out; + } else { + outStream = new FileOutputStream(outfile); + } + + try (PrintWriter writer = + new PrintWriter(new BufferedWriter(new OutputStreamWriter(outStream)))) { + + Stat stat = new Stat(); + byte[] bytes; + + try { + bytes = zrw.getData(PropCacheKey.forSystem(context).getPath(), stat); + VersionedProperties sysProps = + VersionedPropCodec.getDefault().fromBytes(stat.getVersion(), bytes); + printProps(writer, "System", sysProps); + } catch (KeeperException.NoNodeException nex) { + // skip on no node. Review Comment: Would it made sense to print an empty set here? `printProps(writer, "System", VersionProperties.EMPTY);` ########## server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java: ########## @@ -397,21 +399,25 @@ private void alterNamespaceProperty(TCredentials c, String namespace, String pro try { if (value == null) { - NamespacePropUtil.removeNamespaceProperty(manager.getContext(), namespaceId, property); + NamespacePropUtil.factory().removeProperties(manager.getContext(), namespaceId, + List.of(property)); } else { - NamespacePropUtil.setNamespaceProperty(manager.getContext(), namespaceId, property, value); + NamespacePropUtil.factory().setProperties(manager.getContext(), namespaceId, + Map.of(property, value)); + } + } catch (PropStoreException ex) { + if (ex.getCause() instanceof KeeperException.NoNodeException) { Review Comment: Would you expect the ZK exception to always be one level deep as the cause? If not may need to loop/recurse to search for a ZK cause. ########## server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java: ########## @@ -0,0 +1,530 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.store.impl; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; + +import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.metrics.MetricsUtil; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.fate.zookeeper.ZooUtil; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCache; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropChangeListener; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.accumulo.server.conf.util.ConfigTransformer; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; + +public class ZooPropStore implements PropStore, PropChangeListener { + + private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class); + private final static VersionedPropCodec codec = VersionedPropCodec.getDefault(); + + private final ZooReaderWriter zrw; + private final PropStoreWatcher propStoreWatcher; + private final PropCache cache; + private final PropStoreMetrics cacheMetrics = new PropStoreMetrics(); + private final ReadyMonitor zkReadyMon; + + /** + * Create instance using ZooPropStore.Builder + * + * @param instanceId + * the instance id + * @param zrw + * a wrapper set of utilities for accessing ZooKeeper. + * @param readyMonitor + * coordination utility for ZooKeeper connection status. + * @param propStoreWatcher + * an extended ZooKeeper watcher + * @param ticker + * a synthetic clock used for testing. + */ + private ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, + final ReadyMonitor readyMonitor, final PropStoreWatcher propStoreWatcher, + final Ticker ticker) { + + this.zrw = zrw; + this.zkReadyMon = readyMonitor; + this.propStoreWatcher = propStoreWatcher; + + MetricsUtil.initializeProducers(cacheMetrics); + + ZooPropLoader propLoader = new ZooPropLoader(zrw, codec, propStoreWatcher, cacheMetrics); + + if (ticker == null) { + cache = new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).build(); + } else { + cache = + new PropCacheCaffeineImpl.Builder(propLoader, cacheMetrics).withTicker(ticker).build(); + } + + try { + var path = ZooUtil.getRoot(instanceId); + if (zrw.exists(path, propStoreWatcher)) { + log.debug("Have a ZooKeeper connection and found instance node: {}", instanceId); + zkReadyMon.setReady(); + } else { + throw new IllegalStateException("Instance may not have been initialized, root node: " + path + + " does not exist in ZooKeeper"); + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper", + ex); + } + } + + public static PropStore initialize(final InstanceId instanceId, final ZooReaderWriter zrw) { + return new ZooPropStore.Builder(instanceId, zrw, zrw.getSessionTimeout()).build(); + } + + /** + * Create the system configuration node and initialize with empty props - used when creating a new + * Accumulo instance. + * <p> + * Needs to be called early in the Accumulo ZooKeeper initialization sequence so that correct + * watchers can be created for the instance when the PropStore is instantiated. + * + * @param instanceId + * the instance uuid. + * @param zrw + * a ZooReaderWriter + */ + public static void instancePathInit(final InstanceId instanceId, final ZooReaderWriter zrw) + throws InterruptedException, KeeperException { + var sysPropPath = PropCacheKey.forSystem(instanceId).getPath(); + VersionedProperties vProps = new VersionedProperties(); + try { + var created = + zrw.putPersistentData(sysPropPath, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + if (!created) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath); + } + } catch (IOException ex) { + throw new IllegalStateException( + "Failed to create default system props during initialization at: {}" + sysPropPath, ex); + } + } + + @Override + public boolean exists(final PropCacheKey propCacheKey) throws PropStoreException { + try { + if (zrw.exists(propCacheKey.getPath())) { + return true; + } + } catch (KeeperException ex) { + // ignore Keeper exception on check. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted testing if node exists", ex); + } + return false; + } + + /** + * Create initial system props for the instance. If the node already exists, no action is + * performed. + * + * @param context + * the server context. + * @param initProps + * map of k, v pairs of initial properties. + */ + public synchronized static void initSysProps(final ServerContext context, + final Map<String,String> initProps) { + PropCacheKey sysPropKey = PropCacheKey.forSystem(context.getInstanceID()); + createInitialProps(context, sysPropKey, initProps); + } + + /** + * Create initial properties if they do not exist. If the node exists, initialization will be + * skipped. + * + * @param context + * the system context + * @param propCacheKey + * a prop id + * @param props + * initial properties + */ + public static void createInitialProps(final ServerContext context, + final PropCacheKey propCacheKey, Map<String,String> props) { + log.trace("createInitialProps() called for {}", propCacheKey); + try { + ZooReaderWriter zrw = context.getZooReaderWriter(); + if (zrw.exists(propCacheKey.getPath())) { + return; + } + log.debug("Creating initial property node for {}", propCacheKey); + VersionedProperties vProps = new VersionedProperties(props); + zrw.putPersistentData(propCacheKey.getPath(), codec.toBytes(vProps), + ZooUtil.NodeExistsPolicy.FAIL); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted creating node " + propCacheKey, ex); + } catch (Exception ex) { + throw new PropStoreException("Failed to create node " + propCacheKey, ex); + } + } + + public PropStoreMetrics getMetrics() { + return cacheMetrics; + } + + @Override + public void create(PropCacheKey propCacheKey, Map<String,String> props) { + + try { + VersionedProperties vProps = new VersionedProperties(props); + String path = propCacheKey.getPath(); + zrw.putPersistentData(path, codec.toBytes(vProps), ZooUtil.NodeExistsPolicy.FAIL); + } catch (IOException | KeeperException | InterruptedException ex) { + throw new PropStoreException("Failed to serialize properties for " + propCacheKey, ex); + } + } + + /** + * get or create properties from the store. If the property node does not exist in ZooKeeper, + * legacy properties exist, they will be converted to the new storage form and naming convention. + * The legacy properties are deleted once the new node format is written. + * + * @param propCacheKey + * the prop cache key + * @return The versioned properties. + * @throws PropStoreException + * if the updates fails because of an underlying store exception + */ + @Override + public @NonNull VersionedProperties get(final PropCacheKey propCacheKey) + throws PropStoreException { + checkZkConnection(); // if ZK not connected, block, do not just return a cached value. + propStoreWatcher.registerListener(propCacheKey, this); + + var props = cache.get(propCacheKey); + if (props != null) { + return props; + } + + if (propCacheKey.getIdType() == PropCacheKey.IdType.SYSTEM) { + return new ConfigTransformer(zrw, codec, propStoreWatcher).transform(propCacheKey); + } + + throw new PropStoreException( + "Invalid request for " + propCacheKey + ", the property node does not exist", null); + } + + /** + * Convenience method for utilities that may not have a PropStore read the encoded properties + * directly from ZooKeeper. This allows utilities access when there is a ZooKeeper, may there may + * not be a full instance running. All exception handling is left to the caller. + * + * @param propCacheKey + * the prop cache key + * @param watcher + * a prop store watcher that will receive / handle ZooKeeper events. + * @param zrw + * a ZooReaderWriter + * @return the versioned properties or null if the node does not exist. + * @throws IOException + * if the underlying data from the ZooKeeper node cannot be decoded. + * @throws KeeperException + * if a ZooKeeper exception occurs + * @throws InterruptedException + * if the ZooKeeper read was interrupted. + */ + public static @Nullable VersionedProperties readFromZk(final PropCacheKey propCacheKey, + final PropStoreWatcher watcher, final ZooReaderWriter zrw) + throws IOException, KeeperException, InterruptedException { + if (zrw.exists(propCacheKey.getPath())) { + Stat stat = new Stat(); + byte[] bytes = zrw.getData(propCacheKey.getPath(), watcher, stat); + return codec.fromBytes(stat.getVersion(), bytes); + } + return null; + } + + /** + * Copies all mappings from the specified map and into the existing property values and stores + * them into the backend store. New keys are added and keys the may have existed in the current + * properties are overwritten. + * <p> + * If multiple threads attempt to update values concurrently, this method will automatically + * retry. If the threads are setting different keys / values the result will be the sum of the + * changes. If the concurrent threads are attempting to set a value(s) for the same key(s), the + * value(s) will be the set to the values provided by the last thread to complete. The order is + * indeterminate. + * + * @param propCacheKey + * the prop cache id + * @param props + * a map of property k,v pairs + * @throws PropStoreException + * if the values cannot be written or if an underlying store exception occurs. + */ + @Override + public void putAll(PropCacheKey propCacheKey, Map<String,String> props) { + mutateVersionedProps(propCacheKey, VersionedProperties::addOrUpdate, props); Review Comment: I think the following would be save and may avoid some unnecessary work. I noticed the FATE code that creates a table may sometimes call this code with an empty map. ```suggestion public void putAll(PropCacheKey propCacheKey, Map<String,String> props) { if(props.isEmpty()) return; mutateVersionedProps(propCacheKey, VersionedProperties::addOrUpdate, props); ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * This class provides a secondary, local cache of properties and is intended to be used to optimize + * constructing a configuration hierarchy from the underlying properties stored in ZooKeeper. + * <p> + * Configurations and especially the derivers use an updateCount to detect configuration changes. + * The updateCount is checked frequently and the configuration hierarchy rebuilt when a change is + * detected. + */ +public class PropSnapshot { + + private final Lock updateLock = new ReentrantLock(); + private final AtomicBoolean needsUpdate = new AtomicBoolean(true); + private final AtomicReference<VersionedProperties> vPropRef = new AtomicReference<>(); + private final PropCacheKey propCacheKey; + private final PropStore propStore; + + public PropSnapshot(final PropCacheKey propCacheKey, final PropStore propStore) { + this.propCacheKey = propCacheKey; + this.propStore = propStore; + + updateSnapshot(); + } + + /** + * Get the current snapshot - updating if necessary. + * + * @return the current property snapshot. + */ + public @NonNull VersionedProperties get() { + if (needsUpdate.get()) { + updateSnapshot(); + } + var answer = vPropRef.get(); + if (answer == null) { + throw new PropStoreException("Invalid state for property snapshot, no value has been set", + null); + } + return answer; + } + + /** + * Signal the current snapshot is invalid and needs to be updated on next access. + */ + public void requireUpdate() { + updateLock.lock(); + try { + needsUpdate.set(true); + } finally { + updateLock.unlock(); + } + } + + /** + * Update the current snapshot if a refresh is required. + * + * @throws PropStoreException + * if the properties cannot be retrieved from the underlying store. + */ + private void updateSnapshot() throws PropStoreException { + if (!needsUpdate.get()) { + return; + } + updateLock.lock(); + try { + var vProps = propStore.get(propCacheKey); Review Comment: Possible that multiple threads were waiting on the lock. Could check again after getting the lock to see if its really needed. ```suggestion if (!needsUpdate.get()) { return; } var vProps = propStore.get(propCacheKey); ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigPropertyPrinter.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.cli.ServerUtilOpts; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.auto.service.AutoService; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// TODO - this is in progress and should not be merged without changes. +// TODO - implement json output (or remove option) +@AutoService(KeywordExecutable.class) +@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT", + justification = "app is run in same security context as user providing the filename") +public class ConfigPropertyPrinter implements KeywordExecutable { + + private static final Logger log = LoggerFactory.getLogger(ConfigPropertyPrinter.class); + + public ConfigPropertyPrinter() {} + + public static void main(String[] args) throws Exception { + new ConfigPropertyPrinter().execute(args); + } + + @Override + public String keyword() { + return "config-print"; + } + + @Override + public String description() { + return "prints the properties stored in ZooKeeper"; + } + + @Override + public void execute(String[] args) throws Exception { + + ConfigPropertyPrinter.Opts opts = new ConfigPropertyPrinter.Opts(); + opts.parseArgs(ConfigPropertyPrinter.class.getName(), args); + + ServerContext context = opts.getServerContext(); + + print(context, opts.outfile, opts.jsonFmt); + + } + + public void print(final ServerContext context, final String outfile, final boolean jsonFmt) { + + Map<String,String> namespaces; + Map<String,String> tables; + + AccumuloClient client = Accumulo.newClient().from(context.getProperties()).build(); + + ZooReaderWriter zrw = context.getZooReaderWriter(); + + try { + namespaces = client.namespaceOperations().namespaceIdMap(); + tables = client.tableOperations().tableIdMap(); + } catch (AccumuloException | AccumuloSecurityException ex) { + throw new IllegalStateException("Failed to read namespaces / tables", ex); + } + + log.info("Namespaces: {}", namespaces); + log.info("Tables: {}", tables); + + PrintStream origStream = null; + + try { + + OutputStream outStream; + + if (outfile == null || "".equals(outfile)) { + log.info("No output file, using stdout."); + origStream = System.out; + outStream = System.out; + } else { + outStream = new FileOutputStream(outfile); + } + + try (PrintWriter writer = + new PrintWriter(new BufferedWriter(new OutputStreamWriter(outStream)))) { + + Stat stat = new Stat(); + byte[] bytes; + + try { + bytes = zrw.getData(PropCacheKey.forSystem(context).getPath(), stat); + VersionedProperties sysProps = + VersionedPropCodec.getDefault().fromBytes(stat.getVersion(), bytes); + printProps(writer, "System", sysProps); + } catch (KeeperException.NoNodeException nex) { + // skip on no node. + } + + for (Map.Entry<String,String> e : namespaces.entrySet()) { + try { + bytes = zrw.getData( + PropCacheKey.forNamespace(context, NamespaceId.of(e.getValue())).getPath(), stat); + VersionedProperties nsProps = + VersionedPropCodec.getDefault().fromBytes(stat.getVersion(), bytes); + printProps(writer, e.getKey(), nsProps); + } catch (KeeperException.NoNodeException nex) { + // skip on no node. + } + } + + for (Map.Entry<String,String> e : tables.entrySet()) { + try { + bytes = zrw.getData(PropCacheKey.forTable(context, TableId.of(e.getValue())).getPath(), + stat); + VersionedProperties tsProps = + VersionedPropCodec.getDefault().fromBytes(stat.getVersion(), bytes); + printProps(writer, e.getKey(), tsProps); + } catch (KeeperException.NoNodeException nex) { + // skip on no node. + } + } + } catch (InterruptedException | KeeperException ex) { + throw new IllegalStateException("ZooKeeper exception reading properties", ex); + } + outStream.close(); + if (origStream != null) { + System.setOut(origStream); + } + + } catch (IOException ex) { + throw new IllegalStateException("Invalid file", ex); + } + + } + + private void printProps(final PrintWriter writer, final String name, + final VersionedProperties props) { + writer.println("************* name: '" + name + "', size: " + props.getProperties().size()); + Map<String,String> sorted = new TreeMap<>(props.getProperties()); + writer.println(name); + sorted + .forEach((k, v) -> writer.printf("%s, %s%n", k, Property.isSensitive(v) ? "<hidden>" : v)); + } + + private void printToWriter(PrintWriter writer) { + writer.println("Printer to a writer"); + } Review Comment: ```suggestion ``` ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -261,6 +262,19 @@ synchronized void setManagerState(ManagerState newState) { } if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) { + try { + log.info("Starting property conversion"); + var context = getContext(); + ConfigPropertyUpgrader configUpgrader = new ConfigPropertyUpgrader(); + configUpgrader.doUpgrade(context.getInstanceID(), context.getZooReaderWriter()); + log.info("Completed property conversion"); + } catch (Exception ex) { + throw new IllegalStateException("Failed to convert properties to single node format", ex); + } + } + + if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) { + convertPropsToSingleNode(); Review Comment: This code seems redundant, seems like the same thing is done twice (this methods impl seems like its the same as the code block above). It would be much better to move this code into Upgrader9to10.upgradeZookeeper(). The manager used to have a lot of upgrade code from many different versions all mixed together. It got to a point where it was impossible to tell what upgrade code went with which version. Having a class that handles each version keeps things for different versions separate in the code. Also using the existing upgrade code ensures that upgrade changes are applied in the correct order if there are multiple versions to upgrade. ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/TransformToken.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Arrays; +import java.util.Objects; +import java.util.UUID; + +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a token used in property conversion. The token is used to limit the number of processes + * that try to create a property node and transform the legacy property format to the 2.1 encoded + * properties. Processes do not queue for a token (using a sequential node) - processes should look + * for the token to exist, and if present wait and then periodically re-check for the property node + * to be created by the process that created / has the token. + * <p> + * Features + * <ul> + * <li>Uses ephemeral node that will be removed if transform process terminates without + * completing</li> + * <li>Watcher not necessary - the existence of the node and uuid in data sufficient to detect + * changes.</li> + * </ul> + */ +public class TransformToken { + public static final String TRANSFORM_TOKEN = "/transform_token"; + private static final Logger log = LoggerFactory.getLogger(TransformToken.class); + private final TokenUUID tokenUUID = new TokenUUID(); + private final String path; + private final ZooReaderWriter zrw; + private boolean haveToken = false; + + private TransformToken(final @NonNull PropCacheKey key, final ZooReaderWriter zrw) { + path = key.getBasePath() + TRANSFORM_TOKEN; + this.zrw = zrw; + } + + /** + * Create a lock node in ZooKeeper using an ephemeral node. Will not throw and exception except on + * an interrupt. If the lock node is created, the returned lock will be locked. If another lock + * already exists, the lock is unlocked and the caller can decide to either wait for the resource + * to be created by the thread that created the lock, or try calling to {@code lock} to succeed + * + * @param key + * a PropCacheKey that defines the storage location of the created lock and the + * associated property nodes. + * @param zrw + * a ZooReaderWriter + * @return an TransformLock instance. + * @throws PropStoreException + * is the lock creation fails due to an underlying ZooKeeper exception. + */ + public static TransformToken createToken(final @NonNull PropCacheKey key, + final ZooReaderWriter zrw) { + TransformToken token = new TransformToken(key, zrw); + token.haveToken = token.holdToken(); + return token; + } + + public boolean holdToken() { + if (haveToken) { + return true; + } + try { + // existence check should be lighter-weight than failing on NODE_EXISTS exception + if (zrw.exists(path)) { + return false; + } + // if this completes this thread has created the lock + zrw.putEphemeralData(path, tokenUUID.asBytes()); + log.trace("wrote property upgrade lock: {} - {}", path, tokenUUID); + return true; + } catch (KeeperException ex) { + log.debug( + "Failed to write transform lock for " + path + " another process may have created one", + ex); + return false; + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted getting transform lock", ex); + } + } + + /** + * Return the token status + * + * @return true if this instance has created the token, false otherwise. + */ + public boolean haveToken() { + return haveToken; + } + + /** + * Verify lock is still present and valid while keeping the lock. + * + * @return true if lock is valid, false otherwise + */ + public boolean validateToken() { + try { + byte[] readId = zrw.getData(path); + log.trace("validate token: read: {} - expected: {}", readId, tokenUUID); + return Arrays.equals(readId, tokenUUID.asBytes()); + } catch (KeeperException ex) { + throw new PropStoreException("Failed to validate lock", ex); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new PropStoreException("Interrupted while validating lock", ex); + } + } + + /** + * If the lock was created by this instance the uuid created nad the uuid stored in the ZooKeeper + * data will match. + */ + public void releaseToken() { + try { + log.trace("releaseToken called - {} - exists in ZooKeeper: {}", path, zrw.exists(path)); + + Stat stat = new Stat(); + byte[] readId = zrw.getData(path, stat); + if (!Arrays.equals(readId, tokenUUID.asBytes())) { + throw new PropStoreException("tried to release a token that was not held by current thread", + null); + } + + log.trace("releaseToken read id: {} - exists: {}", readId, zrw.exists(path)); Review Comment: May want to avoid the ZK call here when trace is not enabled. ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/PropSnapshot.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStore; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * This class provides a secondary, local cache of properties and is intended to be used to optimize + * constructing a configuration hierarchy from the underlying properties stored in ZooKeeper. + * <p> + * Configurations and especially the derivers use an updateCount to detect configuration changes. + * The updateCount is checked frequently and the configuration hierarchy rebuilt when a change is + * detected. + */ +public class PropSnapshot { + + private final Lock updateLock = new ReentrantLock(); + private final AtomicBoolean needsUpdate = new AtomicBoolean(true); + private final AtomicReference<VersionedProperties> vPropRef = new AtomicReference<>(); + private final PropCacheKey propCacheKey; + private final PropStore propStore; + + public PropSnapshot(final PropCacheKey propCacheKey, final PropStore propStore) { + this.propCacheKey = propCacheKey; + this.propStore = propStore; + + updateSnapshot(); + } + + /** + * Get the current snapshot - updating if necessary. + * + * @return the current property snapshot. + */ + public @NonNull VersionedProperties get() { + if (needsUpdate.get()) { + updateSnapshot(); + } Review Comment: The function checks needsUpdate, so its redundant to check here. ```suggestion updateSnapshot(); ``` ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigPropertyPrinter.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import java.io.BufferedWriter; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.cli.ServerUtilOpts; +import org.apache.accumulo.server.conf.codec.VersionedPropCodec; +import org.apache.accumulo.server.conf.codec.VersionedProperties; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.start.spi.KeywordExecutable; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.auto.service.AutoService; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +// TODO - this is in progress and should not be merged without changes. +// TODO - implement json output (or remove option) +@AutoService(KeywordExecutable.class) +@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT", + justification = "app is run in same security context as user providing the filename") +public class ConfigPropertyPrinter implements KeywordExecutable { + + private static final Logger log = LoggerFactory.getLogger(ConfigPropertyPrinter.class); + + public ConfigPropertyPrinter() {} + + public static void main(String[] args) throws Exception { + new ConfigPropertyPrinter().execute(args); + } + + @Override + public String keyword() { + return "config-print"; + } + + @Override + public String description() { + return "prints the properties stored in ZooKeeper"; + } + + @Override + public void execute(String[] args) throws Exception { + + ConfigPropertyPrinter.Opts opts = new ConfigPropertyPrinter.Opts(); + opts.parseArgs(ConfigPropertyPrinter.class.getName(), args); + + ServerContext context = opts.getServerContext(); + + print(context, opts.outfile, opts.jsonFmt); + + } + + public void print(final ServerContext context, final String outfile, final boolean jsonFmt) { Review Comment: Nothing is done with the jsonFmt variable. ########## server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java: ########## @@ -160,20 +160,23 @@ private void createDirectories(VolumeManager fs, String... dirs) throws IOExcept } } - private void initSystemTablesConfig() throws IOException, InterruptedException, KeeperException { - setTableProperties(RootTable.ID, initConfig.getRootTableConf()); - setTableProperties(RootTable.ID, initConfig.getRootMetaConf()); - setTableProperties(MetadataTable.ID, initConfig.getRootMetaConf()); - setTableProperties(MetadataTable.ID, initConfig.getMetaTableConf()); - setTableProperties(REPL_TABLE_ID, initConfig.getReplTableConf()); + private void initSystemTablesConfig(final ServerContext context) + throws IOException, InterruptedException, KeeperException { + setTableProperties(context, RootTable.ID, initConfig.getRootTableConf()); + setTableProperties(context, RootTable.ID, initConfig.getRootMetaConf()); + setTableProperties(context, MetadataTable.ID, initConfig.getRootMetaConf()); + setTableProperties(context, MetadataTable.ID, initConfig.getMetaTableConf()); + setTableProperties(context, REPL_TABLE_ID, initConfig.getReplTableConf()); } - private void setTableProperties(TableId tableId, HashMap<String,String> props) - throws IOException, InterruptedException, KeeperException { - for (Map.Entry<String,String> entry : props.entrySet()) { - if (!TablePropUtil.setTableProperty(zoo, zkRoot, tableId, entry.getKey(), entry.getValue())) { - throw new IOException("Cannot create per-table property " + entry.getKey()); - } + private void setTableProperties(final ServerContext context, TableId tableId, + HashMap<String,String> props) throws IOException, InterruptedException, KeeperException { + var propStore = context.getPropStore(); + PropCacheKey tablePropKey = PropCacheKey.forTable(context, tableId); + if (propStore.exists(tablePropKey)) { + propStore.putAll(tablePropKey, props); + } else { Review Comment: Its been a while since I have looked at the Accumulo init code, does that code make an attempt to be idempotent? ########## server/base/src/main/java/org/apache/accumulo/server/conf/util/TransformToken.java: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.conf.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Arrays; +import java.util.Objects; +import java.util.UUID; + +import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.server.conf.store.PropCacheKey; +import org.apache.accumulo.server.conf.store.PropStoreException; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides a token used in property conversion. The token is used to limit the number of processes + * that try to create a property node and transform the legacy property format to the 2.1 encoded + * properties. Processes do not queue for a token (using a sequential node) - processes should look + * for the token to exist, and if present wait and then periodically re-check for the property node + * to be created by the process that created / has the token. + * <p> + * Features + * <ul> + * <li>Uses ephemeral node that will be removed if transform process terminates without + * completing</li> + * <li>Watcher not necessary - the existence of the node and uuid in data sufficient to detect + * changes.</li> + * </ul> + */ +public class TransformToken { + public static final String TRANSFORM_TOKEN = "/transform_token"; + private static final Logger log = LoggerFactory.getLogger(TransformToken.class); + private final TokenUUID tokenUUID = new TokenUUID(); + private final String path; + private final ZooReaderWriter zrw; + private boolean haveToken = false; + + private TransformToken(final @NonNull PropCacheKey key, final ZooReaderWriter zrw) { + path = key.getBasePath() + TRANSFORM_TOKEN; + this.zrw = zrw; + } + + /** + * Create a lock node in ZooKeeper using an ephemeral node. Will not throw and exception except on + * an interrupt. If the lock node is created, the returned lock will be locked. If another lock + * already exists, the lock is unlocked and the caller can decide to either wait for the resource + * to be created by the thread that created the lock, or try calling to {@code lock} to succeed + * + * @param key + * a PropCacheKey that defines the storage location of the created lock and the + * associated property nodes. + * @param zrw + * a ZooReaderWriter + * @return an TransformLock instance. + * @throws PropStoreException + * is the lock creation fails due to an underlying ZooKeeper exception. + */ + public static TransformToken createToken(final @NonNull PropCacheKey key, + final ZooReaderWriter zrw) { + TransformToken token = new TransformToken(key, zrw); + token.haveToken = token.holdToken(); + return token; + } + + public boolean holdToken() { + if (haveToken) { + return true; + } + try { + // existence check should be lighter-weight than failing on NODE_EXISTS exception + if (zrw.exists(path)) { + return false; + } + // if this completes this thread has created the lock + zrw.putEphemeralData(path, tokenUUID.asBytes()); + log.trace("wrote property upgrade lock: {} - {}", path, tokenUUID); + return true; Review Comment: should haveToken be set to true before returning? ########## server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java: ########## @@ -62,23 +63,36 @@ public class TableManager { private final String zkRoot; private final InstanceId instanceID; private final ZooReaderWriter zoo; - private ZooCache zooStateCache; + private final ZooCache zooStateCache; - public static void prepareNewNamespaceState(ZooReaderWriter zoo, InstanceId instanceId, - NamespaceId namespaceId, String namespace, NodeExistsPolicy existsPolicy) - throws KeeperException, InterruptedException { + public static void prepareNewNamespaceState(ZooReaderWriter zoo, final PropStore propStore, + InstanceId instanceId, NamespaceId namespaceId, String namespace, + NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException { log.debug("Creating ZooKeeper entries for new namespace {} (ID: {})", namespace, namespaceId); String zPath = Constants.ZROOT + "/" + instanceId + Constants.ZNAMESPACES + "/" + namespaceId; zoo.putPersistentData(zPath, new byte[0], existsPolicy); zoo.putPersistentData(zPath + Constants.ZNAMESPACE_NAME, namespace.getBytes(UTF_8), existsPolicy); zoo.putPersistentData(zPath + Constants.ZNAMESPACE_CONF, new byte[0], existsPolicy); + + PropCacheKey propKey = PropCacheKey.forNamespace(instanceId, namespaceId); + if (!propStore.exists(propKey)) { + propStore.create(propKey, Map.of()); Review Comment: If multiple threads could call this, this code can have race condition. However not sure of the context in which this particular code is used or if that is a concern. If it does matter a model of throwing exceptions when it exists (like ZK does and accumulo create table) can avoid race conditions. ########## server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java: ########## @@ -96,8 +110,21 @@ public static void prepareNewTableState(ZooReaderWriter zoo, InstanceId instance zoo.putPersistentData(zTablePath + Constants.ZTABLE_COMPACT_CANCEL_ID, ZERO_BYTE, existsPolicy); zoo.putPersistentData(zTablePath + Constants.ZTABLE_STATE, state.name().getBytes(UTF_8), existsPolicy); + PropCacheKey propKey = PropCacheKey.forTable(instanceId, tableId); + if (!propStore.exists(propKey)) { + propStore.create(propKey, Map.of()); + } + } + + public static void prepareNewTableState(final ServerContext context, TableId tableId, + NamespaceId namespaceId, String tableName, TableState state, NodeExistsPolicy existsPolicy) + throws KeeperException, InterruptedException { + prepareNewTableState(context.getZooReaderWriter(), context.getPropStore(), + context.getInstanceID(), tableId, namespaceId, tableName, state, existsPolicy); } + // context, REPL_TABLE_ID, replicationTableName, + // TableState.OFFLINE, Namespace.ACCUMULO.id(), ZooUtil.NodeExistsPolicy.FAIL Review Comment: What are the comments? ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -274,6 +288,18 @@ synchronized void setManagerState(ManagerState newState) { } } + private void convertPropsToSingleNode() { + try { + log.info("Starting property conversion"); + var context = getContext(); + ConfigPropertyUpgrader configUpgrader = new ConfigPropertyUpgrader(); + configUpgrader.doUpgrade(context.getInstanceID(), context.getZooReaderWriter()); + log.info("Completed property conversion"); + } catch (Exception ex) { + throw new IllegalStateException("Failed to convert properties to single node format", ex); + } + } + Review Comment: If this code is in fact redundant, maybe it could be deleted. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
