EdColeman commented on a change in pull request #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r829036590
##########
File path:
core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
##########
@@ -617,6 +617,13 @@
String METRICS_UPDATE_WALOG_WRITE = METRICS_UPDATE_PREFIX + "walog.write";
String METRICS_UPDATE_MUTATION_ARRAY_SIZE = METRICS_UPDATE_PREFIX +
"mutation.arrays.size";
+ String METRICS_PROPSTORE_PREFIX = "accumulo.prop.store.";
Review comment:
Fixed in e7dc935faf
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set /
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive
active notification
+ * without needed to register / manage ZooKeeper watchers external to the
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+ private final ExecutorService executorService =
+ ThreadPools.getServerThreadPools().createFixedThreadPool(1,
"zoo_change_update", false);
+
+ private final ReentrantReadWriteLock listenerLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock listenerReadLock =
listenerLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock listenerWriteLock =
listenerLock.writeLock();
+
+ // access should be guarded by acquiring the listener read or write lock
+ private final Map<PropCacheId,Set<PropChangeListener>> listeners = new
HashMap<>();
+
+ private final ReadyMonitor zkReadyMonitor;
+
+ public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+ this.zkReadyMonitor = zkReadyMonitor;
+ }
+
+ public void registerListener(final PropCacheId propCacheId, final
PropChangeListener listener) {
+ listenerWriteLock.lock();
+ try {
+ Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheId, s
-> new HashSet<>());
+ set.add(listener);
+ } finally {
+ listenerWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Process a ZooKeeper event. This method does not reset the watcher.
Subscribers are notified of
+ * the change - if they call get to update and respond to the change the
watcher will be (re)set
+ * then. This helps clean up watchers by not automatically re-adding the
watcher on the event but
+ * only if being used.
+ *
+ * @param event
+ * ZooKeeper event.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ @Override
+ public void process(final WatchedEvent event) {
+
+ String path;
+ switch (event.getType()) {
+ case NodeDataChanged:
+ path = event.getPath();
+ log.trace("handle change event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(this::signalZkChangeEvent);
+ break;
+ case NodeDeleted:
+ path = event.getPath();
+ log.trace("handle delete event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(cacheId -> {
+ // notify listeners
+ Set<PropChangeListener> snapshot = getListenerSnapshot(cacheId);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreDeleteEventTask(cacheId, snapshot));
+ }
+
+ listenerCleanup(cacheId);
+
+ });
+
+ break;
+ case None:
+ Event.KeeperState state = event.getState();
+ switch (state) {
+ // pause - could reconnect
+ case ConnectedReadOnly:
+ case Disconnected:
+ log.debug("ZooKeeper disconnected event received");
+ zkReadyMonitor.clearReady();
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ // okay
+ case SyncConnected:
+ log.debug("ZooKeeper connected event received");
+ zkReadyMonitor.setReady();
+ break;
+
+ // terminal - never coming back.
+ case Expired:
+ case Closed:
+ log.info("ZooKeeper connection closed event received");
+ zkReadyMonitor.clearReady();
+ zkReadyMonitor.setClosed(); // terminal condition
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ default:
+ log.trace("ignoring zooKeeper state: {}", state);
+ }
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ /**
+ * Submit task to notify registered listeners that the propCacheId node
received an event
+ * notification from ZooKeeper and should be updated. The process can be
initiated either by a
+ * ZooKeeper notification or a change detected in the cache based on a
ZooKeeper event.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
Review comment:
fixed in e7dc935faf
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+ public static final String PROP_NODE_NAME = "encoded_props";
+
+ // indices for path.split();
+ public static final int TYPE_TOKEN_POSITION = 3;
+ public static final int IID_TOKEN_POSITION = 2;
+ public static final int ID_TOKEN_POSITION = 4;
+
+ // remove starting slash from constant.
+ public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+ public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+ private final String path;
+ private final IdType idType;
+ private final NamespaceId namespaceId;
+ private final TableId tableId;
+
+ private PropCacheId(final String path, final IdType idType, final
NamespaceId namespaceId,
+ final TableId tableId) {
+ this.path = path;
+ this.idType = idType;
+ this.namespaceId = namespaceId;
+ this.tableId = tableId;
+ }
+
+ /**
+ * Instantiate a system prop cache id using the instance id from the context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final ServerContext context) {
+ return forSystem(context.getInstanceID());
+ }
+
+ /**
+ * Instantiate a system prop cache id.
+ *
+ * @param instanceId
+ * the instance id.
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final InstanceId instanceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" +
PROP_NODE_NAME,
+ IdType.SYSTEM, null, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final ServerContext context,
+ final NamespaceId namespaceId) {
+ return forNamespace(context.getInstanceID(), namespaceId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final InstanceId instanceId,
+ final NamespaceId namespaceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZNAMESPACES + "/" +
namespaceId.canonical()
+ + ZNAMESPACE_CONF + "/" + PROP_NODE_NAME, IdType.NAMESPACE,
namespaceId, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final ServerContext context, final
TableId tableId) {
+ return forTable(context.getInstanceID(), tableId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final InstanceId instanceId, final
TableId tableId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZTABLES + "/" +
tableId.canonical()
+ + ZTABLE_CONF + "/" + PROP_NODE_NAME, IdType.TABLE, null, tableId);
+ }
+
+ /**
+ * Determine the prop cache id from a ZooKeeper path
+ *
+ * @param path
+ * the path
+ * @return the prop cache id
+ */
+ public static Optional<PropCacheId> fromPath(final String path) {
+ String[] tokens = path.split("/");
+
+ InstanceId instanceId = InstanceId.of(tokens[IID_TOKEN_POSITION]);
+
+ IdType type = extractType(tokens);
+
+ switch (type) {
+ case SYSTEM:
+ return Optional.of(PropCacheId.forSystem(instanceId));
+ case NAMESPACE:
+ return Optional
+ .of(PropCacheId.forNamespace(instanceId,
NamespaceId.of(tokens[ID_TOKEN_POSITION])));
+ case TABLE:
+ return Optional.of(PropCacheId.forTable(instanceId,
TableId.of(tokens[ID_TOKEN_POSITION])));
+ case UNKNOWN:
+ default:
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Determine if the IdType is system, namespace or table from a tokenized
path. To be a valid id,
+ * the final token is PROP_NODE_NAME and then the type is defined if the
path has table or
+ * namespace in the path, otherwise it is assumed to be system.
+ *
+ * @param tokens
+ * a path split into String[] of tokens
+ * @return the id type.
+ */
+ public static IdType extractType(final String[] tokens) {
+ if (tokens.length == 0 || !tokens[tokens.length -
1].equals(PROP_NODE_NAME)) {
+ // without tokens or it does not end with PROP_NAME_NAME
+ return IdType.UNKNOWN;
+ }
+ if (tokens[TYPE_TOKEN_POSITION].equals(TABLES_NODE_NAME)) {
+ return IdType.TABLE;
+ }
+ if (tokens[TYPE_TOKEN_POSITION].equals(NAMESPACE_NODE_NAME)) {
+ return IdType.NAMESPACE;
+ }
+ return IdType.SYSTEM;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public IdType getIdType() {
+ return idType;
+ }
+
+ @Override
+ public int compareTo(@NonNull PropCacheId other) {
+ return
Comparator.comparing(PropCacheId::getIdType).thenComparing(PropCacheId::getPath)
+ .compare(this, other);
+ }
+
+ // TODO - remove optional and return null.
+ /**
+ * If the prop cache is for a namespace, return the namespace id.
+ *
+ * @return the namespace id.
+ */
+ public Optional<NamespaceId> getNamespaceId() {
+ return Optional.ofNullable(namespaceId);
+ }
+
+ /**
+ * if the prop cache is for a table, return the table id.
+ *
+ * @return the table id.
+ */
+ public Optional<TableId> getTableId() {
+ return Optional.ofNullable(tableId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ PropCacheId that = (PropCacheId) o;
+ return path.equals(that.path);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path);
+ }
+
+ @Override
+ public String toString() {
+ switch (idType) {
+ case SYSTEM:
+ return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[",
"]")
+ .add("idType=System").toString();
+ case NAMESPACE:
+ return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[",
"]")
+ .add("idType=Namespace").add("namespaceId=" +
namespaceId).toString();
+ case TABLE:
+ return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[",
"]")
+ .add("idType=Table").add("tableId=" + tableId).toString();
+ default:
+ return new StringJoiner(", ", PropCacheId.class.getSimpleName() + "[",
"]")
+ .add("idType=" + idType).add("namespaceId=" +
namespaceId).add("tableId=" + tableId)
+ .add("path='" + path + "'").toString();
+ }
+
+ }
+
+ /**
+ * Define types of properties stored in zookeeper. Note: default properties
are not in zookeeper
+ * but come from code.
+ */
+ public enum IdType {
+ UNKNOWN, SYSTEM, NAMESPACE, TABLE
+ }
Review comment:
The intent was to separate the path and the ZooKeeper structure from the
PropCacheKey. It is important to know the type, but the path is an
implementation detail. The ZooKeeper storage internals may have leaked over
time, but the original intent was that things using a prop store would not know
that the storage was actually ZooKeeper. In place where this abstraction has
failed / leaked my position would be to fix the leak.
I may have failed in this, but it was my original intent and warrants
additional discussion if there are benefits from the abstraction, but it seems
to me that it would provide benefits to "hide" ZooKeeper abstractions where
possible.
##########
File path:
core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
##########
@@ -597,4 +599,13 @@ public T derive() {
* this configuration.
*/
public void invalidateCache() {}
+
+ /**
+ * get a parent configuration or null if it does not exist.
+ *
+ * @since 2.1.0
+ */
+ public AccumuloConfiguration getParent() {
+ return null;
Review comment:
When would it be appropriate to throw an Exception and what exception
type do you suggest?
The code that uses Accumulo configuration blindly calls getParent to walk up
the hierarchy if it exists. For the existing cases that do not have a parent
(DefaultConfiguration, ConfigurationCopy) there is no parent. In cases like
system, namespace and table, calling getParent returns the next configuration
up the chain. Getting a null indicates that the top of the hierarchy has been
reached and is not an exception. When would an exception be appropriate?
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
Review comment:
And will change again in response to suggestion to rename to
PropCacheKey - trying to push now, but receiving GH internal error.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/CaffeineCache.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.annotations.VisibleForTesting;
+
+public class CaffeineCache implements PropCache {
Review comment:
Renamed to PropCacheCaffeineImpl - the intent is that Caffeine is one
possible implementation and to allow for others if that ever becomes desirable
- PropCacheImpl seems to imply the only one that might exist (too me)
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+ public static final String PROP_NODE_NAME = "encoded_props";
+
+ // indices for path.split();
+ public static final int TYPE_TOKEN_POSITION = 3;
+ public static final int IID_TOKEN_POSITION = 2;
+ public static final int ID_TOKEN_POSITION = 4;
+
+ // remove starting slash from constant.
+ public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+ public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+ private final String path;
+ private final IdType idType;
+ private final NamespaceId namespaceId;
+ private final TableId tableId;
+
+ private PropCacheId(final String path, final IdType idType, final
NamespaceId namespaceId,
+ final TableId tableId) {
+ this.path = path;
+ this.idType = idType;
+ this.namespaceId = namespaceId;
+ this.tableId = tableId;
+ }
+
+ /**
+ * Instantiate a system prop cache id using the instance id from the context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final ServerContext context) {
+ return forSystem(context.getInstanceID());
+ }
+
+ /**
+ * Instantiate a system prop cache id.
+ *
+ * @param instanceId
+ * the instance id.
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final InstanceId instanceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" +
PROP_NODE_NAME,
+ IdType.SYSTEM, null, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final ServerContext context,
+ final NamespaceId namespaceId) {
+ return forNamespace(context.getInstanceID(), namespaceId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final InstanceId instanceId,
+ final NamespaceId namespaceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZNAMESPACES + "/" +
namespaceId.canonical()
+ + ZNAMESPACE_CONF + "/" + PROP_NODE_NAME, IdType.NAMESPACE,
namespaceId, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final ServerContext context, final
TableId tableId) {
+ return forTable(context.getInstanceID(), tableId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final InstanceId instanceId, final
TableId tableId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZTABLES + "/" +
tableId.canonical()
+ + ZTABLE_CONF + "/" + PROP_NODE_NAME, IdType.TABLE, null, tableId);
+ }
+
+ /**
+ * Determine the prop cache id from a ZooKeeper path
+ *
+ * @param path
+ * the path
+ * @return the prop cache id
+ */
+ public static Optional<PropCacheId> fromPath(final String path) {
+ String[] tokens = path.split("/");
+
+ InstanceId instanceId = InstanceId.of(tokens[IID_TOKEN_POSITION]);
+
+ IdType type = extractType(tokens);
+
+ switch (type) {
+ case SYSTEM:
+ return Optional.of(PropCacheId.forSystem(instanceId));
+ case NAMESPACE:
+ return Optional
+ .of(PropCacheId.forNamespace(instanceId,
NamespaceId.of(tokens[ID_TOKEN_POSITION])));
+ case TABLE:
+ return Optional.of(PropCacheId.forTable(instanceId,
TableId.of(tokens[ID_TOKEN_POSITION])));
+ case UNKNOWN:
+ default:
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Determine if the IdType is system, namespace or table from a tokenized
path. To be a valid id,
+ * the final token is PROP_NODE_NAME and then the type is defined if the
path has table or
+ * namespace in the path, otherwise it is assumed to be system.
+ *
+ * @param tokens
+ * a path split into String[] of tokens
+ * @return the id type.
+ */
+ public static IdType extractType(final String[] tokens) {
+ if (tokens.length == 0 || !tokens[tokens.length -
1].equals(PROP_NODE_NAME)) {
+ // without tokens or it does not end with PROP_NAME_NAME
+ return IdType.UNKNOWN;
+ }
+ if (tokens[TYPE_TOKEN_POSITION].equals(TABLES_NODE_NAME)) {
+ return IdType.TABLE;
+ }
+ if (tokens[TYPE_TOKEN_POSITION].equals(NAMESPACE_NODE_NAME)) {
+ return IdType.NAMESPACE;
+ }
+ return IdType.SYSTEM;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public IdType getIdType() {
+ return idType;
+ }
+
+ @Override
+ public int compareTo(@NonNull PropCacheId other) {
+ return
Comparator.comparing(PropCacheId::getIdType).thenComparing(PropCacheId::getPath)
+ .compare(this, other);
+ }
+
+ // TODO - remove optional and return null.
+ /**
+ * If the prop cache is for a namespace, return the namespace id.
+ *
+ * @return the namespace id.
+ */
+ public Optional<NamespaceId> getNamespaceId() {
+ return Optional.ofNullable(namespaceId);
+ }
+
+ /**
+ * if the prop cache is for a table, return the table id.
+ *
+ * @return the table id.
+ */
+ public Optional<TableId> getTableId() {
+ return Optional.ofNullable(tableId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
Review comment:
Both should be equivalent - ultimately path is the canonical source for
comparison. The compareTo uses the type to provide a sort order of system,
namespace and then table to support utilities that provide human friendly
output.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+ public static final String PROP_NODE_NAME = "encoded_props";
+
+ // indices for path.split();
+ public static final int TYPE_TOKEN_POSITION = 3;
+ public static final int IID_TOKEN_POSITION = 2;
+ public static final int ID_TOKEN_POSITION = 4;
+
+ // remove starting slash from constant.
+ public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+ public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+ private final String path;
+ private final IdType idType;
+ private final NamespaceId namespaceId;
+ private final TableId tableId;
+
+ private PropCacheId(final String path, final IdType idType, final
NamespaceId namespaceId,
+ final TableId tableId) {
+ this.path = path;
+ this.idType = idType;
+ this.namespaceId = namespaceId;
+ this.tableId = tableId;
+ }
+
+ /**
+ * Instantiate a system prop cache id using the instance id from the context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final ServerContext context) {
+ return forSystem(context.getInstanceID());
+ }
+
+ /**
+ * Instantiate a system prop cache id.
+ *
+ * @param instanceId
+ * the instance id.
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final InstanceId instanceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" +
PROP_NODE_NAME,
Review comment:
Maybe as a follow-on. There is a lot of intermediate caching already -
that with a new cache implementation and being able to get properties as a
single call instead of reading all of the children, the original assumptions
made with caching can be simplified or eliminated.
Rather than add more caching now, things will be easier to determine where
and how things can be cached is that code is simplified. The simplification
was deferred as part of this PR in the hope that keeping as much of the
original code / functionality would make review of this PR somewhat simpler.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/ZooBasedConfiguration.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.accumulo.server.conf.store.PropStoreException;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+
+/**
+ * Instances maintain a local cache of the AccumuloConfiguration hierarchy
that will be consistent
+ * with stored properties.
+ * <p>
+ * When calling getProperties - the local copy will be updated if ZooKeeper
changes have been
+ * received.
+ * <p>
+ * The getUpdateCount() provides an optimization for clients - the count can
be used to detect
+ * changes without reading the properties. When the update count changes, the
next getProperties
+ * call will update the local copy and the change count.
+ */
+public class ZooBasedConfiguration extends AccumuloConfiguration implements
PropChangeListener {
+
+ protected final Logger log;
+ private final AccumuloConfiguration parent;
+ private final PropCacheId propCacheId;
+ private final PropStore propStore;
+
+ private final AtomicReference<PropSnapshot> snapshotRef = new
AtomicReference<>(null);
+
+ public ZooBasedConfiguration(Logger log, ServerContext context, PropCacheId
propCacheId,
+ AccumuloConfiguration parent) {
+ this.log = requireNonNull(log, "a Logger must be supplied");
+ requireNonNull(context, "the context cannot be null");
+ this.propCacheId = requireNonNull(propCacheId, "a PropCacheId must be
supplied");
+ this.parent = requireNonNull(parent, "An AccumuloConfiguration parent must
be supplied");
+
+ this.propStore =
+ requireNonNull(context.getPropStore(), "The PropStore must be supplied
and exist");
+
+ propStore.registerAsListener(propCacheId, this);
+
+ snapshotRef.set(updateSnapshot());
+
+ }
+
+ public long getDataVersion() {
+ var snapshot = snapshotRef.get();
+ if (snapshot == null) {
+ return updateSnapshot().getDataVersion();
+ }
+ return snapshot.getDataVersion();
+ }
+
+ /**
+ * The update count is the sum of the change count of this configuration and
the change counts of
+ * the parents. The count is used to detect if any changes occurred in the
configuration hierarchy
+ * and if the configuration needs to be recalculated to maintain consistency
with values in the
+ * backend store.
+ * <p>
+ * The count is required to be an increasing value.
+ */
+ @Override
+ public long getUpdateCount() {
+ long count = 0;
+ long dataVersion = 0;
+ for (AccumuloConfiguration p = this; p != null; p = p.getParent()) {
+ if (p instanceof ZooBasedConfiguration) {
+ dataVersion = ((ZooBasedConfiguration) p).getDataVersion();
+ } else {
+ dataVersion = p.getUpdateCount();
+ }
+ count += dataVersion;
+ }
+
+ log.trace("update count result for: {} - data version: {} update: {}",
propCacheId, dataVersion,
+ count);
+ return count;
+ }
+
+ @Override
+ public AccumuloConfiguration getParent() {
+ return parent;
+ }
+
+ public PropCacheId getCacheId() {
+ return propCacheId;
+ }
+
+ @Override
+ public @Nullable String get(final Property property) {
+ Map<String,String> props = getSnapshot();
+ String value = props.get(property.getKey());
+ if (value != null) {
+ return value;
+ }
+ AccumuloConfiguration parent = getParent();
+ if (parent != null) {
+ return parent.get(property);
+ }
+ return null;
+ }
+
+ @Override
+ public void getProperties(final Map<String,String> props, final
Predicate<String> filter) {
+
+ parent.getProperties(props, filter);
+
+ Map<String,String> theseProps = getSnapshot();
+
+ log.trace("getProperties() for: {} filter: {}, have: {}, passed: {}",
getCacheId(), filter,
+ theseProps, props);
+
+ for (Map.Entry<String,String> p : theseProps.entrySet()) {
+ if (filter.test(p.getKey()) && p.getValue() != null) {
+ log.trace("passed filter - add to map: {} = {}", p.getKey(),
p.getValue());
+ props.put(p.getKey(), p.getValue());
+ }
+ }
+ }
+
+ @Override
+ public boolean isPropertySet(final Property property) {
+
+ Map<String,String> theseProps = getSnapshot();
+
+ if (theseProps.get(property.getKey()) != null) {
+ return true;
+ }
+
+ return getParent().isPropertySet(property);
+
+ }
+
+ public Map<String,String> getSnapshot() {
+ if (snapshotRef.get() == null) {
+ return updateSnapshot().getProps();
+ }
+ return snapshotRef.get().getProps();
+ }
+
+ @Override
+ public void invalidateCache() {
+ snapshotRef.set(null);
+ }
+
+ private final Lock updateLock = new ReentrantLock();
+
+ private @NonNull PropSnapshot updateSnapshot() throws PropStoreException {
+
+ PropSnapshot localSnapshot = snapshotRef.get();
Review comment:
The Caffeine cache has two timeout for cache eviction. One is a hard
eviction and the other is more "soft".
The hard eviction occurs if a cache item has not been accessed after the
timeout period. This allows the cache to evict items that are not being used -
the use cases would be when a table migrates to another server - or if there
are tables that are infrequently accessed.
The soft eviction is being leveraged to ensure eventual consistency without
needing solely relying on watchers. When the soft-timeout is reached and an
item is accessed in the cache - the cache return the current value and then
runs an async task to check / reload the value from the back end source of
truth (ZooKeeper in this case). Currently, that check compares the node
version. If the version is the same, no other action is required. If the node
changed, and the version does not match, the new value is loaded and an event
is signaled so that the change is propagated.
The use case for this is for things like iterators - the iterator can keep
accessing the cache value without pausing but be able to reach if a change not
signaled by a watcher is detected.
##########
File path:
core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
##########
@@ -183,7 +185,7 @@ public long getUpdateCount() {
localPrefixes.putAll(cachedPrefixProps);
// put the updates
- prefixProps = new PrefixProps(propMap, updateCount);
+ prefixProps = new PrefixProps(propMap, getUpdateCount());
Review comment:
Thanks for catching this - Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
Review comment:
Changed in 6df9f23cf9 to be PropCacheKey
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+ public static final String PROP_NODE_NAME = "encoded_props";
+
+ // indices for path.split();
+ public static final int TYPE_TOKEN_POSITION = 3;
+ public static final int IID_TOKEN_POSITION = 2;
+ public static final int ID_TOKEN_POSITION = 4;
+
+ // remove starting slash from constant.
+ public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+ public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+ private final String path;
+ private final IdType idType;
+ private final NamespaceId namespaceId;
+ private final TableId tableId;
+
+ private PropCacheId(final String path, final IdType idType, final
NamespaceId namespaceId,
+ final TableId tableId) {
+ this.path = path;
+ this.idType = idType;
+ this.namespaceId = namespaceId;
+ this.tableId = tableId;
+ }
+
+ /**
+ * Instantiate a system prop cache id using the instance id from the context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final ServerContext context) {
+ return forSystem(context.getInstanceID());
+ }
+
+ /**
+ * Instantiate a system prop cache id.
+ *
+ * @param instanceId
+ * the instance id.
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final InstanceId instanceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" +
PROP_NODE_NAME,
+ IdType.SYSTEM, null, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final ServerContext context,
+ final NamespaceId namespaceId) {
+ return forNamespace(context.getInstanceID(), namespaceId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param namespaceId
+ * the namespace id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forNamespace(final InstanceId instanceId,
+ final NamespaceId namespaceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZNAMESPACES + "/" +
namespaceId.canonical()
+ + ZNAMESPACE_CONF + "/" + PROP_NODE_NAME, IdType.NAMESPACE,
namespaceId, null);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final ServerContext context, final
TableId tableId) {
+ return forTable(context.getInstanceID(), tableId);
+ }
+
+ /**
+ * Instantiate a namespace prop cache id using the instance id from the
context.
+ *
+ * @param instanceId
+ * the instance id
+ * @param tableId
+ * the table id
+ * @return a prop cache id a namespaces properties,
+ */
+ public static PropCacheId forTable(final InstanceId instanceId, final
TableId tableId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZTABLES + "/" +
tableId.canonical()
+ + ZTABLE_CONF + "/" + PROP_NODE_NAME, IdType.TABLE, null, tableId);
+ }
+
+ /**
+ * Determine the prop cache id from a ZooKeeper path
+ *
+ * @param path
+ * the path
+ * @return the prop cache id
+ */
+ public static Optional<PropCacheId> fromPath(final String path) {
+ String[] tokens = path.split("/");
+
Review comment:
Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set /
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive
active notification
+ * without needed to register / manage ZooKeeper watchers external to the
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+ private final ExecutorService executorService =
+ ThreadPools.getServerThreadPools().createFixedThreadPool(1,
"zoo_change_update", false);
Review comment:
Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+
+public class PropStoreMetrics implements MetricsProducer {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreMetrics.class);
+
+ private Timer load;
+ private Counter refresh;
+ private Counter refreshLoad;
+ private Counter eviction;
+ private Counter zkError;
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+
+ load = Timer.builder(METRICS_PROPSTORE_LOAD_TIMER).description("prop store
load time")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refresh =
+ Counter.builder(METRICS_PROPSTORE_REFRESH_COUNT).description("prop
store refresh count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refreshLoad = Counter.builder(METRICS_PROPSTORE_REFRESH_LOAD_COUNT)
+ .description("prop store refresh load
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ eviction =
+ Counter.builder(METRICS_PROPSTORE_EVICTION_COUNT).description("prop
store eviction count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ zkError = Counter.builder(METRICS_PROPSTORE_ZK_ERROR_COUNT)
+ .description("prop store ZooKeeper error
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ }
+
+ public PropStoreMetrics() {
+ log.info("Creating PropStore metrics");
Review comment:
Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+
+public class PropStoreMetrics implements MetricsProducer {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreMetrics.class);
+
+ private Timer load;
+ private Counter refresh;
+ private Counter refreshLoad;
+ private Counter eviction;
+ private Counter zkError;
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+
+ load = Timer.builder(METRICS_PROPSTORE_LOAD_TIMER).description("prop store
load time")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refresh =
+ Counter.builder(METRICS_PROPSTORE_REFRESH_COUNT).description("prop
store refresh count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refreshLoad = Counter.builder(METRICS_PROPSTORE_REFRESH_LOAD_COUNT)
+ .description("prop store refresh load
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ eviction =
+ Counter.builder(METRICS_PROPSTORE_EVICTION_COUNT).description("prop
store eviction count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ zkError = Counter.builder(METRICS_PROPSTORE_ZK_ERROR_COUNT)
+ .description("prop store ZooKeeper error
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ }
+
+ public PropStoreMetrics() {
+ log.info("Creating PropStore metrics");
+ }
+
+ public void addLoadTime(final long value) {
+ log.info("Load time: {}", value);
Review comment:
Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreMetrics.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Timer;
+
+public class PropStoreMetrics implements MetricsProducer {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreMetrics.class);
+
+ private Timer load;
+ private Counter refresh;
+ private Counter refreshLoad;
+ private Counter eviction;
+ private Counter zkError;
+
+ @Override
+ public void registerMetrics(MeterRegistry registry) {
+
+ load = Timer.builder(METRICS_PROPSTORE_LOAD_TIMER).description("prop store
load time")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refresh =
+ Counter.builder(METRICS_PROPSTORE_REFRESH_COUNT).description("prop
store refresh count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ refreshLoad = Counter.builder(METRICS_PROPSTORE_REFRESH_LOAD_COUNT)
+ .description("prop store refresh load
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ eviction =
+ Counter.builder(METRICS_PROPSTORE_EVICTION_COUNT).description("prop
store eviction count")
+ .tags(MetricsUtil.getCommonTags()).register(registry);
+
+ zkError = Counter.builder(METRICS_PROPSTORE_ZK_ERROR_COUNT)
+ .description("prop store ZooKeeper error
count").tags(MetricsUtil.getCommonTags())
+ .register(registry);
+
+ }
+
+ public PropStoreMetrics() {
+ log.info("Creating PropStore metrics");
+ }
+
+ public void addLoadTime(final long value) {
+ log.info("Load time: {}", value);
+ load.record(Duration.ofMillis(value));
+ log.info("Load count: {} time:{}", load.count(),
load.totalTime(TimeUnit.MILLISECONDS));
Review comment:
Fixed in 6df9f23cf9
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/CaffeineCache.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Objects;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCache;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.annotations.VisibleForTesting;
+
+public class CaffeineCache implements PropCache {
Review comment:
Changed in 6df9f23cf9 - renamed to PropCacheCaffeineImpl
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
+
+ public static final String PROP_NODE_NAME = "encoded_props";
+
+ // indices for path.split();
+ public static final int TYPE_TOKEN_POSITION = 3;
+ public static final int IID_TOKEN_POSITION = 2;
+ public static final int ID_TOKEN_POSITION = 4;
+
+ // remove starting slash from constant.
+ public static final String TABLES_NODE_NAME = ZTABLES.substring(1);
+ public static final String NAMESPACE_NODE_NAME = ZNAMESPACES.substring(1);
+
+ private final String path;
+ private final IdType idType;
+ private final NamespaceId namespaceId;
+ private final TableId tableId;
+
+ private PropCacheId(final String path, final IdType idType, final
NamespaceId namespaceId,
+ final TableId tableId) {
+ this.path = path;
+ this.idType = idType;
+ this.namespaceId = namespaceId;
+ this.tableId = tableId;
+ }
+
+ /**
+ * Instantiate a system prop cache id using the instance id from the context.
+ *
+ * @param context
+ * the system context specifying the instance id
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final ServerContext context) {
+ return forSystem(context.getInstanceID());
+ }
+
+ /**
+ * Instantiate a system prop cache id.
+ *
+ * @param instanceId
+ * the instance id.
+ * @return a prop cache id for system properties,
+ */
+ public static PropCacheId forSystem(final InstanceId instanceId) {
+ return new PropCacheId(ZooUtil.getRoot(instanceId) + ZCONFIG + "/" +
PROP_NODE_NAME,
Review comment:
At this point I believe the sentiment is to get a release sooner. It
will depend on time-frames and other priorities. There may be some easier
things that might want to be done now, but there seems to be other portions
that would need to wait for 3.0.
I don't intent to tackle it now - at least until these changes are merged
and then it can be revisited and prioritized wrt a release.
##########
File path:
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
##########
@@ -183,8 +183,15 @@ private void checkForMerge(final long tid, final Manager
manager) throws Excepti
VolumeManager fs = manager.getVolumeManager();
final Path bulkDir = new Path(bulkInfo.sourceDir);
- int maxTablets =
Integer.parseInt(manager.getContext().getTableConfiguration(bulkInfo.tableId)
- .get(Property.TABLE_BULK_MAX_TABLETS));
+ String value = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+ .get(Property.TABLE_BULK_MAX_TABLETS);
+ if (value == null) {
+ value = Property.TABLE_BULK_MAX_TABLETS.getDefaultValue();
+ log.info("Property not found " + Property.TABLE_BULK_MAX_TABLETS + "
using default: " + value
+ + " for tableId: " + bulkInfo.tableId + " using default: " + value);
+ }
+
+ int maxTablets = Integer.parseInt(value);
Review comment:
Fixed it e9588a17cc - the issue was introduced because findbug spotted
the value could be null - so Integer.parseInt was not advised. Modified it to
throw IllegalStateException if the value could not be found instead of
continuing.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigConverter.java
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringJoiner;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.conf.DeprecatedPropertyUtil;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropStore;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// TODO - this is in progress and should not be merged without changes.
+// TODO - needs upgrade integration and testing.
+/**
+ * Convert pre-2.1 system, namespace and table properties to PropEncoded
format.
+ *
+ * <pre>
+ * Source ZooKeeper paths:
+ * srcSysPath - system config source = /accumulo/[iid-id]/config;
+ * srcNsBasePath - namespace config source /accumulo/[iid]/namespaces;
+ * srcTableBasePath - table config source /accumulo/[iid]/tables;
+ * </pre>
+ */
+public class ConfigConverter {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConfigConverter.class);
+
+ private final ZooReaderWriter zrw;
+ private final InstanceId instanceId;
+
+ private final PropStore propStore;
+
+ private final String zkBasePath; // base path for accumulo instance -
/accumulo/[iid]
+
+ private final Set<String> legacyPaths = new HashSet<>();
+
+ public ConfigConverter(final ServerContext context) {
+
+ instanceId = context.getInstanceID();
+ zrw = context.getZooReaderWriter();
+ propStore = context.getPropStore();
+
+ zkBasePath = ZooUtil.getRoot(instanceId);
+ }
+
+ public synchronized static void convert(final ServerContext context,
+ final boolean deleteWhenComplete) {
+ ConfigConverter converter = new ConfigConverter(context);
+ converter.convertSys();
+ converter.convertNamespace();
+ converter.convertTables();
+
+ if (deleteWhenComplete) {
+ converter.removeLegacyPaths();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", ConfigConverter.class.getSimpleName() + "[",
"]")
+ .add("converted=" + legacyPaths).toString();
+ }
+
+ public void convertSys() {
+ var sysId = PropCacheId.forSystem(instanceId);
+ var zkPathSysConfig = zkBasePath + Constants.ZCONFIG;
+
+ Map<String,String> props = readLegacyProps(zkPathSysConfig);
+
+ Map<String,String> renamedProps = new HashMap<>();
+ props.forEach((original, value) -> {
+ var finalName = DeprecatedPropertyUtil.getReplacementName(original,
+ (log, replacement) -> log
+ .info("Automatically renaming deprecated property '{}' with its
replacement '{}'"
+ + " in ZooKeeper configuration upgrade.", original,
replacement));
+ renamedProps.put(finalName, value);
+ });
+
+ log.info("system props: {} -> {}", props, renamedProps);
+
+ writeConverted(sysId, renamedProps, zkPathSysConfig);
+
+ // delete - the confirmation and then delete done in two steps so that the
removal is atomic.
+ // If the props were deleted as confirmed
+ }
+
+ public void convertNamespace() {
+ var zkPathNamespaceBase = zkBasePath + Constants.ZNAMESPACES;
+ try {
+ List<String> namespaces = zrw.getChildren(zkPathNamespaceBase);
+ for (String namespace : namespaces) {
+ String zkPropBasePath = zkPathNamespaceBase + "/" + namespace +
Constants.ZNAMESPACE_CONF;
+ log.info("NS:{} base path: {}", namespace, zkPropBasePath);
+ Map<String,String> props = readLegacyProps(zkPropBasePath);
+ log.info("Namespace props: {} - {}", namespace, props);
+ writeConverted(PropCacheId.forNamespace(instanceId,
NamespaceId.of(namespace)), props,
+ zkPropBasePath);
+ }
+ } catch (KeeperException ex) {
+ throw new IllegalStateException(
+ "Failed to convert namespace from ZooKeeper for path: " +
zkPathNamespaceBase, ex);
+ } catch (InterruptedException ex) {
+ throw new IllegalStateException(
+ "Interrupted reading namespaces from ZooKeeper for path: " +
zkPathNamespaceBase, ex);
+ }
+ }
+
+ public void convertTables() {
+ var zkPathTableBase = zkBasePath + Constants.ZTABLES;
+ try {
+ List<String> tables = zrw.getChildren(zkPathTableBase);
+ for (String table : tables) {
+ String zkPropBasePath = zkPathTableBase + "/" + table +
Constants.ZTABLE_CONF;
+ log.info("table:{} base path: {}", table, zkPropBasePath);
+ Map<String,String> props = readLegacyProps(zkPropBasePath);
+ log.info("table props: {} - {}", table, props);
+ writeConverted(PropCacheId.forTable(instanceId, TableId.of(table)),
props, zkPropBasePath);
+ }
+ } catch (KeeperException ex) {
+ throw new IllegalStateException(
+ "Failed to convert tables from ZooKeeper for path: " +
zkPathTableBase, ex);
+ } catch (InterruptedException ex) {
+ throw new IllegalStateException(
+ "Interrupted reading namespaces from ZooKeeper for path: " +
zkPathTableBase, ex);
+ }
+ }
+
+ private void removeLegacyPaths() {
+ for (String path : legacyPaths) {
+ log.debug("delete ZooKeeper path: {}", path);
+ try {
+ zrw.delete(path);
+ } catch (KeeperException ex) {
+ log.warn(
+ "Failed to delete path on property conversion " + path + ",
reason" + ex.getMessage());
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ }
+ }
+ }
+
+ private Map<String,String> readLegacyProps(final String path) {
+ requireNonNull(path, "A ZooKeeper path for configuration properties must
be supplied");
+ Map<String,String> props = new HashMap<>();
+ try {
+ List<String> children = zrw.getChildren(path);
+ log.info("Looking in: {}, found: {}", path, children);
+ for (String child : children) {
+ if (Property.isValidPropertyKey(child)) {
+ byte[] bytes = zrw.getData(path + "/" + child);
+ props.put(child, new String(bytes, UTF_8));
+ legacyPaths.add(path + "/" + child);
+ } else {
+ log.info("Skipping invalid property: {} in {}", child, path);
+ }
+ }
+ } catch (KeeperException ex) {
+ throw new IllegalStateException("Failed to get children from ZooKeeper
for path: " + path,
+ ex);
+ } catch (InterruptedException ex) {
+ throw new IllegalStateException(
+ "Interrupted reading children from ZooKeeper for path: " + path, ex);
+ }
+ return props;
+ }
+
+ private void writeConverted(final PropCacheId propCacheId, final
Map<String,String> props,
Review comment:
I need to look at the conversion / upgrade as a whole before release.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreEventTask.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Set;
+
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+
+/**
+ * Provides a simple runnable base task for notifying listeners for PropStore
event change
+ * notifications.
+ */
+public abstract class PropStoreEventTask implements Runnable {
+
+ private final PropCacheId propCacheId;
+ private final Set<PropChangeListener> listeners;
+
+ private PropStoreEventTask(final PropCacheId propCacheId,
+ final Set<PropChangeListener> listeners) {
+ this.propCacheId = propCacheId;
+ this.listeners = listeners;
+ }
+
+ public static class PropStoreZkChangeEventTask extends PropStoreEventTask {
+
+ PropStoreZkChangeEventTask(final PropCacheId propCacheId,
+ final Set<PropChangeListener> listeners) {
+ super(propCacheId, listeners);
+ }
+
+ @Override
+ public void run() {
+ super.listeners.forEach(listener ->
listener.zkChangeEvent(super.propCacheId));
+ }
+ }
+
+ public static class PropStoreCacheChangeEventTask extends PropStoreEventTask
{
+
+ PropStoreCacheChangeEventTask(final PropCacheId propCacheId,
+ final Set<PropChangeListener> listeners) {
+ super(propCacheId, listeners);
+ }
+
+ @Override
+ public void run() {
+ super.listeners.forEach(listener ->
listener.cacheChangeEvent(super.propCacheId));
+ }
+ }
+
+ public static class PropStoreDeleteEventTask extends PropStoreEventTask {
+
+ PropStoreDeleteEventTask(final PropCacheId propCacheId,
+ final Set<PropChangeListener> listeners) {
+ super(propCacheId, listeners);
+ }
+
+ @Override
+ public void run() {
+ super.listeners.forEach(listener ->
listener.deleteEvent(super.propCacheId));
+ }
+ }
+
+ public static class PropStoreConnectionEventTask extends PropStoreEventTask {
+
+ PropStoreConnectionEventTask(final PropCacheId propCacheId,
+ final Set<PropChangeListener> listeners) {
+ super(null, listeners);
+ }
+
+ @Override
+ public void run() {
+ super.listeners.forEach(listener -> listener.connectionEvent());
Review comment:
Fixed in eafaa7da6e
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set /
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive
active notification
+ * without needed to register / manage ZooKeeper watchers external to the
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+ private final ExecutorService executorService =
+ ThreadPools.getServerThreadPools().createFixedThreadPool(1,
"zoo_change_update", false);
+
+ private final ReentrantReadWriteLock listenerLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock listenerReadLock =
listenerLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock listenerWriteLock =
listenerLock.writeLock();
+
+ // access should be guarded by acquiring the listener read or write lock
+ private final Map<PropCacheId,Set<PropChangeListener>> listeners = new
HashMap<>();
+
+ private final ReadyMonitor zkReadyMonitor;
+
+ public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+ this.zkReadyMonitor = zkReadyMonitor;
+ }
+
+ public void registerListener(final PropCacheId propCacheId, final
PropChangeListener listener) {
+ listenerWriteLock.lock();
+ try {
+ Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheId, s
-> new HashSet<>());
+ set.add(listener);
+ } finally {
+ listenerWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Process a ZooKeeper event. This method does not reset the watcher.
Subscribers are notified of
+ * the change - if they call get to update and respond to the change the
watcher will be (re)set
+ * then. This helps clean up watchers by not automatically re-adding the
watcher on the event but
+ * only if being used.
+ *
+ * @param event
+ * ZooKeeper event.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ @Override
+ public void process(final WatchedEvent event) {
+
+ String path;
+ switch (event.getType()) {
+ case NodeDataChanged:
+ path = event.getPath();
+ log.trace("handle change event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(this::signalZkChangeEvent);
+ break;
+ case NodeDeleted:
+ path = event.getPath();
+ log.trace("handle delete event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(cacheId -> {
+ // notify listeners
+ Set<PropChangeListener> snapshot = getListenerSnapshot(cacheId);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreDeleteEventTask(cacheId, snapshot));
+ }
+
+ listenerCleanup(cacheId);
+
+ });
+
+ break;
+ case None:
+ Event.KeeperState state = event.getState();
+ switch (state) {
+ // pause - could reconnect
+ case ConnectedReadOnly:
+ case Disconnected:
+ log.debug("ZooKeeper disconnected event received");
+ zkReadyMonitor.clearReady();
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ // okay
+ case SyncConnected:
+ log.debug("ZooKeeper connected event received");
+ zkReadyMonitor.setReady();
+ break;
+
+ // terminal - never coming back.
+ case Expired:
+ case Closed:
+ log.info("ZooKeeper connection closed event received");
+ zkReadyMonitor.clearReady();
+ zkReadyMonitor.setClosed(); // terminal condition
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ default:
+ log.trace("ignoring zooKeeper state: {}", state);
+ }
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ /**
+ * Submit task to notify registered listeners that the propCacheId node
received an event
+ * notification from ZooKeeper and should be updated. The process can be
initiated either by a
+ * ZooKeeper notification or a change detected in the cache based on a
ZooKeeper event.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ public void signalZkChangeEvent(final PropCacheId propCacheId) {
+ log.trace("signal ZooKeeper change event: {}", propCacheId);
+ Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheId);
+ log.trace("Sending change event to: {}", snapshot);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreZkChangeEventTask(propCacheId, snapshot));
+ }
+ }
+
+ /**
+ * Submit task to notify registered listeners that the propCacheId node
change was detected should
+ * be updated.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ public void signalCacheChangeEvent(final PropCacheId propCacheId) {
+ log.trace("cache change event: {}", propCacheId);
+ Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheId);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreCacheChangeEventTask(propCacheId, snapshot));
+ }
+ }
+
+ /**
+ * Clean-up the active listeners set when an entry is removed from the
cache, remove it from the
+ * active listeners.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ public void listenerCleanup(final PropCacheId propCacheId) {
+ listenerWriteLock.lock();
+ try {
+ listeners.remove(propCacheId);
+ } finally {
+ listenerWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Get an immutable snapshot of the listeners for a prop cache id. The set
is intended for
+ * notification of changes for a specific prop cache id.
+ *
+ * @param PropCacheId
+ * the prop cache id
+ * @return an immutable copy of listeners.
+ */
+ private Set<PropChangeListener> getListenerSnapshot(final PropCacheId
PropCacheId) {
+
+ Set<PropChangeListener> snapshot = null;
+ listenerReadLock.lock();
+ try {
+ Set<PropChangeListener> set = listeners.get(PropCacheId);
+ if (set != null) {
+ snapshot = Set.copyOf(set);
+ }
+
+ } finally {
+ listenerReadLock.unlock();
+ }
+ return snapshot;
+ }
+
+ /**
+ * Get an immutable snapshot of the all listeners registered for event. The
set is intended for
+ * connection event notifications that are not specific to an individual
prop cache id.
+ *
+ * @return an immutable copy of all registered listeners.
+ */
+ private Set<PropChangeListener> getAllListenersSnapshot() {
+
+ Set<PropChangeListener> snapshot;
+ listenerReadLock.lock();
+ try {
+
+ snapshot = listeners.keySet().stream().flatMap(key ->
listeners.get(key).stream())
+ .collect(Collectors.toSet());
+
+ } finally {
+ listenerReadLock.unlock();
+ }
+ return Collections.unmodifiableSet(snapshot);
Review comment:
Fixed in eafaa7da6e - also addressed a checkstyle issue on connection
events that did not use prop cache key by adding second constructor.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigPropertyPrinter.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.util;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.impl.ZooPropStore;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+// TODO - this is in progress and should not be merged without changes.
+// TODO - implement json output (or remove option)
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+ justification = "app is run in same security context as user providing the
filename")
+public class ConfigPropertyPrinter implements KeywordExecutable {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConfigPropertyPrinter.class);
+
+ public ConfigPropertyPrinter() {}
+
+ public static void main(String[] args) throws Exception {
+ new ConfigPropertyPrinter().execute(args);
+ }
+
+ @Override
+ public String keyword() {
+ return "config-property-print";
Review comment:
Fixed in eafaa7da6e
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/RuntimeFixedProperties.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to a manage a fixed set of defined properties (designated in
Properties as fixed).
+ * Certain properties are stored for persistence across restarts, they are
read during start-up and
+ * remain unchanged for the life of the instance. Any updates to the
properties will only be
+ * reflected with a restart.
+ * <p>
+ * Note that there are no guarantees that all services will always have the
same values. If a fixed
+ * property value is changed and if all services are not restarted, they would
be operating with
+ * different values.
+ */
+public class RuntimeFixedProperties {
Review comment:
Would prefer to leave it in place and evaluated with this PR
##########
File path:
server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/PrepBulkImport.java
##########
@@ -183,8 +183,15 @@ private void checkForMerge(final long tid, final Manager
manager) throws Excepti
VolumeManager fs = manager.getVolumeManager();
final Path bulkDir = new Path(bulkInfo.sourceDir);
- int maxTablets =
Integer.parseInt(manager.getContext().getTableConfiguration(bulkInfo.tableId)
- .get(Property.TABLE_BULK_MAX_TABLETS));
+ String value = manager.getContext().getTableConfiguration(bulkInfo.tableId)
+ .get(Property.TABLE_BULK_MAX_TABLETS);
+ if (value == null) {
+ value = Property.TABLE_BULK_MAX_TABLETS.getDefaultValue();
+ log.info("Property not found " + Property.TABLE_BULK_MAX_TABLETS + "
using default: " + value
+ + " for tableId: " + bulkInfo.tableId + " using default: " + value);
+ }
+
+ int maxTablets = Integer.parseInt(value);
Review comment:
With the original code unchanged the maven build / verify fails with the
following:
```
[ERROR] Medium: Possible null pointer dereference in
org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImport.checkForMerge(long,
Manager) due to return value of called method
[org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImport,
org.apache.accumulo.manager.tableOps.bulkVer2.PrepBulkImport] Method invoked at
PrepBulkImport.java:[line 186]Known null at PrepBulkImport.java:[line 187]
NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE
```
Wrapping value in requireNonNull fails with the same message.
```
requireNonNull(value,"The property: " +
Property.TABLE_BULK_MAX_TABLETS.getKey()
+ " was not found for tableId: " + bulkInfo.tableId);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]