ctubbsii commented on a change in pull request #2569:
URL: https://github.com/apache/accumulo/pull/2569#discussion_r828105000
##########
File path: test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
##########
@@ -572,7 +572,7 @@ public void addauths() throws Exception {
sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
}
- assertTrue("Could not successfully see updated authoriations", passed);
+ assertTrue("Could not successfully see updated authorizations", passed);
Review comment:
The changes in this class appear unrelated to this PR. If so, please
submit them in a separate PR, so we can merge them quickly, without slowing
down this one. Same for anything else minor in this that is unrelated.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/PropCacheId.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store;
+
+import static org.apache.accumulo.core.Constants.ZCONFIG;
+import static org.apache.accumulo.core.Constants.ZNAMESPACES;
+import static org.apache.accumulo.core.Constants.ZNAMESPACE_CONF;
+import static org.apache.accumulo.core.Constants.ZTABLES;
+import static org.apache.accumulo.core.Constants.ZTABLE_CONF;
+
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.StringJoiner;
+
+import org.apache.accumulo.core.data.InstanceId;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.server.ServerContext;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * Provides a strongly-typed id for storing properties in ZooKeeper. The path
in ZooKeeper is
+ * determined by the instance id and the type (system, namespace and table),
with different root
+ * paths.
+ * <p>
+ * Provides utility methods from constructing different id based on type and
methods to parse a
+ * ZooKeeper path and return a prop cache id.
+ */
+public class PropCacheId implements Comparable<PropCacheId> {
Review comment:
> Should / could this extend `AbstractId` ?
This is more of an abstract reference to a path in ZK rather than a
typed-POJO wrapper around a canonical String like AbstractId is. It's used more
like a cache key, than an identifier. Could be renamed to PropCacheKey.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
##########
@@ -158,10 +165,8 @@ public synchronized ServerConfigurationFactory
getServerConfFactory() {
@Override
public AccumuloConfiguration getConfiguration() {
if (systemConfig == null) {
- // system configuration uses its own instance of ZooCache
- // this could be useful to keep its update counter independent
- ZooCache propCache = new ZooCache(getZooReader(), null);
- systemConfig = new ZooConfiguration(this, propCache,
getSiteConfiguration());
+ systemConfig = new SystemConfiguration(log, this,
PropCacheId.forSystem(getInstanceID()),
Review comment:
I think you don't need to pass in the SystemContext logger... it can use
its own logger in SystemConfiguration.
##########
File path:
core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
##########
@@ -597,4 +599,13 @@ public T derive() {
* this configuration.
*/
public void invalidateCache() {}
+
+ /**
+ * get a parent configuration or null if it does not exist.
+ *
+ * @since 2.1.0
+ */
+ public AccumuloConfiguration getParent() {
+ return null;
Review comment:
I think the idea is that subclasses override this if they have a parent.
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropStoreWatcher.java
##########
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.store.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class serves as a translator between ZooKeeper events and converts
them to PropStore events.
+ * Using this as an intermediary, the external listeners do not need to set /
manage external
+ * ZooKeeper watchers, they can register for PropStore events if they need to
take active action on
+ * change detection.
+ * <p>
+ * Users of the PropStore.get() will get properties that match what is stored
in ZooKeeper for each
+ * call and do not need to manage any caching. However, the ability to receive
active notification
+ * without needed to register / manage ZooKeeper watchers external to the
PropStore is provided in
+ * case other code is relying on active notifications.
+ * <p>
+ * The notification occurs on a separate thread from the ZooKeeper
notification handling, but
+ * listeners should not perform lengthy operations on the notification thread
so that other listener
+ * notifications are not delayed.
+ */
+public class PropStoreWatcher implements Watcher {
+
+ private static final Logger log =
LoggerFactory.getLogger(PropStoreWatcher.class);
+
+ private final ExecutorService executorService =
+ ThreadPools.getServerThreadPools().createFixedThreadPool(1,
"zoo_change_update", false);
+
+ private final ReentrantReadWriteLock listenerLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock.ReadLock listenerReadLock =
listenerLock.readLock();
+ private final ReentrantReadWriteLock.WriteLock listenerWriteLock =
listenerLock.writeLock();
+
+ // access should be guarded by acquiring the listener read or write lock
+ private final Map<PropCacheId,Set<PropChangeListener>> listeners = new
HashMap<>();
+
+ private final ReadyMonitor zkReadyMonitor;
+
+ public PropStoreWatcher(final ReadyMonitor zkReadyMonitor) {
+ this.zkReadyMonitor = zkReadyMonitor;
+ }
+
+ public void registerListener(final PropCacheId propCacheId, final
PropChangeListener listener) {
+ listenerWriteLock.lock();
+ try {
+ Set<PropChangeListener> set = listeners.computeIfAbsent(propCacheId, s
-> new HashSet<>());
+ set.add(listener);
+ } finally {
+ listenerWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Process a ZooKeeper event. This method does not reset the watcher.
Subscribers are notified of
+ * the change - if they call get to update and respond to the change the
watcher will be (re)set
+ * then. This helps clean up watchers by not automatically re-adding the
watcher on the event but
+ * only if being used.
+ *
+ * @param event
+ * ZooKeeper event.
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ @Override
+ public void process(final WatchedEvent event) {
+
+ String path;
+ switch (event.getType()) {
+ case NodeDataChanged:
+ path = event.getPath();
+ log.trace("handle change event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(this::signalZkChangeEvent);
+ break;
+ case NodeDeleted:
+ path = event.getPath();
+ log.trace("handle delete event for path: {}", path);
+ PropCacheId.fromPath(path).ifPresent(cacheId -> {
+ // notify listeners
+ Set<PropChangeListener> snapshot = getListenerSnapshot(cacheId);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreDeleteEventTask(cacheId, snapshot));
+ }
+
+ listenerCleanup(cacheId);
+
+ });
+
+ break;
+ case None:
+ Event.KeeperState state = event.getState();
+ switch (state) {
+ // pause - could reconnect
+ case ConnectedReadOnly:
+ case Disconnected:
+ log.debug("ZooKeeper disconnected event received");
+ zkReadyMonitor.clearReady();
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ // okay
+ case SyncConnected:
+ log.debug("ZooKeeper connected event received");
+ zkReadyMonitor.setReady();
+ break;
+
+ // terminal - never coming back.
+ case Expired:
+ case Closed:
+ log.info("ZooKeeper connection closed event received");
+ zkReadyMonitor.clearReady();
+ zkReadyMonitor.setClosed(); // terminal condition
+ executorService.submit(new
PropStoreEventTask.PropStoreConnectionEventTask(null,
+ getAllListenersSnapshot()));
+ break;
+
+ default:
+ log.trace("ignoring zooKeeper state: {}", state);
+ }
+ break;
+ default:
+ break;
+ }
+
+ }
+
+ /**
+ * Submit task to notify registered listeners that the propCacheId node
received an event
+ * notification from ZooKeeper and should be updated. The process can be
initiated either by a
+ * ZooKeeper notification or a change detected in the cache based on a
ZooKeeper event.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ public void signalZkChangeEvent(final PropCacheId propCacheId) {
+ log.trace("signal ZooKeeper change event: {}", propCacheId);
+ Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheId);
+ log.trace("Sending change event to: {}", snapshot);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreZkChangeEventTask(propCacheId, snapshot));
+ }
+ }
+
+ /**
+ * Submit task to notify registered listeners that the propCacheId node
change was detected should
+ * be updated.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ @SuppressWarnings("FutureReturnValueIgnored") // currently, tasks are fire
and forget
+ public void signalCacheChangeEvent(final PropCacheId propCacheId) {
+ log.trace("cache change event: {}", propCacheId);
+ Set<PropChangeListener> snapshot = getListenerSnapshot(propCacheId);
+ if (snapshot != null) {
+ executorService
+ .submit(new
PropStoreEventTask.PropStoreCacheChangeEventTask(propCacheId, snapshot));
+ }
+ }
+
+ /**
+ * Clean-up the active listeners set when an entry is removed from the
cache, remove it from the
+ * active listeners.
+ *
+ * @param propCacheId
+ * the cache id
+ */
+ public void listenerCleanup(final PropCacheId propCacheId) {
+ listenerWriteLock.lock();
+ try {
+ listeners.remove(propCacheId);
+ } finally {
+ listenerWriteLock.unlock();
+ }
+ }
+
+ /**
+ * Get an immutable snapshot of the listeners for a prop cache id. The set
is intended for
+ * notification of changes for a specific prop cache id.
+ *
+ * @param PropCacheId
+ * the prop cache id
+ * @return an immutable copy of listeners.
+ */
+ private Set<PropChangeListener> getListenerSnapshot(final PropCacheId
PropCacheId) {
+
+ Set<PropChangeListener> snapshot = null;
+ listenerReadLock.lock();
+ try {
+ Set<PropChangeListener> set = listeners.get(PropCacheId);
+ if (set != null) {
+ snapshot = Set.copyOf(set);
+ }
+
+ } finally {
+ listenerReadLock.unlock();
+ }
+ return snapshot;
+ }
+
+ /**
+ * Get an immutable snapshot of the all listeners registered for event. The
set is intended for
+ * connection event notifications that are not specific to an individual
prop cache id.
+ *
+ * @return an immutable copy of all registered listeners.
+ */
+ private Set<PropChangeListener> getAllListenersSnapshot() {
+
+ Set<PropChangeListener> snapshot;
+ listenerReadLock.lock();
+ try {
+
+ snapshot = listeners.keySet().stream().flatMap(key ->
listeners.get(key).stream())
+ .collect(Collectors.toSet());
+
+ } finally {
+ listenerReadLock.unlock();
+ }
+ return Collections.unmodifiableSet(snapshot);
Review comment:
This would require static import on Collectors.collectingAndThen and
Collectors.toSet, as I have it below, but this could be simplified:
```suggestion
listenerReadLock.lock();
try {
return listeners.keySet().stream().flatMap(key ->
listeners.get(key).stream())
.collect(collectingAndThen(toSet(), Collections::unmodifiableSet));
} finally {
listenerReadLock.unlock();
}
```
##########
File path:
server/base/src/main/java/org/apache/accumulo/server/conf/util/ConfigPropertyPrinter.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.util;
+
+import java.io.BufferedWriter;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.accumulo.server.conf.codec.VersionedProperties;
+import org.apache.accumulo.server.conf.store.PropCacheId;
+import org.apache.accumulo.server.conf.store.impl.ZooPropStore;
+import org.apache.accumulo.start.spi.KeywordExecutable;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.auto.service.AutoService;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+// TODO - this is in progress and should not be merged without changes.
+// TODO - implement json output (or remove option)
+@AutoService(KeywordExecutable.class)
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_OUT",
+ justification = "app is run in same security context as user providing the
filename")
+public class ConfigPropertyPrinter implements KeywordExecutable {
+
+ private static final Logger log =
LoggerFactory.getLogger(ConfigPropertyPrinter.class);
+
+ public ConfigPropertyPrinter() {}
+
+ public static void main(String[] args) throws Exception {
+ new ConfigPropertyPrinter().execute(args);
+ }
+
+ @Override
+ public String keyword() {
+ return "config-property-print";
Review comment:
Maybe `print-config`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]