dlmarion commented on code in PR #5192:
URL: https://github.com/apache/accumulo/pull/5192#discussion_r1900952551
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -51,10 +53,13 @@
* A cache for values stored in ZooKeeper. Values are kept up to date as they
change.
*/
public class ZooCache {
+
+ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}
Review Comment:
I'm wondering if we should change the name of `ZCacheWatcher` which is the
watcher being used to process events from ZooKeeper and ends up calling the
external watcher. Maybe just call it `ZooKeeperWatcher` or something that
doesn't use `Cache`.
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If
the delegate instance
+ * loses its session, it is replaced with a new instance to establish a new
session. Any Watchers
+ * registered on a session will need to monitor for the session expired event
triggered from the old
+ * delegate, and must be reset on the new session if you intend them to
monitor any further events.
+ * That is no different than if you created a new ZooKeeper instance directly
after the first one
+ * expired.
+ */
+public class ZooSession implements AutoCloseable {
+
+ public static class ZKUtil {
+ public static void deleteRecursive(ZooSession zk, final String pathRoot)
+ throws InterruptedException, KeeperException {
+ org.apache.zookeeper.ZKUtil.deleteRecursive(zk.verifyConnected(),
pathRoot);
+ }
+
+ public static void visitSubTreeDFS(ZooSession zk, final String path,
boolean watch,
+ StringCallback cb) throws KeeperException, InterruptedException {
+ org.apache.zookeeper.ZKUtil.visitSubTreeDFS(zk.verifyConnected(), path,
watch, cb);
+ }
+ }
+
+ private static class ZooSessionWatcher implements Watcher {
+
+ private final String connectionName;
+ private final AtomicReference<KeeperState> lastState = new
AtomicReference<>(null);
+
+ public ZooSessionWatcher(String connectionName) {
+ this.connectionName = connectionName;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ final var newState = event.getState();
+ var oldState = lastState.getAndUpdate(s -> newState);
+ if (oldState == null) {
+ log.debug("{} state changed to {}", connectionName, newState);
+ } else if (newState != oldState) {
+ log.debug("{} state changed from {} to {}", connectionName, oldState,
newState);
+ }
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ZooSession.class);
+
+ private static void closeZk(ZooKeeper zk) {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ // ZooKeeper doesn't actually throw this; it's just there for
backwards compatibility
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static void digestAuth(ZooKeeper zoo, String secret) {
+ zoo.addAuthInfo("digest", ("accumulo:" +
requireNonNull(secret)).getBytes(UTF_8));
+ }
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicLong connectCounter;
+ private final String connectString;
+ private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+ private final String instanceSecret;
+ private final String sessionName;
+ private final int timeout;
+ private final ZooReaderWriter zrw;
+
+ /**
+ * Construct a new ZooKeeper client, retrying indefinitely if it doesn't
work right away. The
+ * caller is responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param conf a convenient carrier of ZK connection information using
Accumulo properties
+ */
+ public ZooSession(String clientName, AccumuloConfiguration conf) {
+ this(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+ (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+ conf.get(Property.INSTANCE_SECRET));
+ }
+
+ /**
+ * Construct a new ZooKeeper client, retrying indefinitely if it doesn't
work right away. The
+ * caller is responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param connectString in the form of host1:port1,host2:port2/chroot/path
+ * @param timeout in milliseconds
+ * @param instanceSecret instance secret (may be null)
+ */
+ public ZooSession(String clientName, String connectString, int timeout,
String instanceSecret) {
+ // information needed to construct a ZooKeeper instance and add
authentication
+ this.connectString = connectString;
+ this.timeout = timeout;
+ this.instanceSecret = instanceSecret;
+
+ // information for logging which instance of ZooSession this is
+ this.sessionName =
+ String.format("%s[%s_%s]", getClass().getSimpleName(), clientName,
UUID.randomUUID());
+ this.connectCounter = new AtomicLong(); // incremented when we need to
create a new delegate
+ this.zrw = new ZooReaderWriter(this);
+ }
+
+ private ZooKeeper verifyConnected() {
+ if (closed.get()) {
+ throw new IllegalStateException(sessionName + " was closed");
+ }
+ return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive())
? zk : reconnect());
Review Comment:
I'm wondering about the memory effects of always updating the
AtomicReference, even if it's unchanged. I'm wondering if this causes a CPU
cache line invalidation unnecessarily. I don't have a definitive answer, but
some of the things I looked at indicate that it does.
@keith-turner - do you know?
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
}
/**
- * Get the ZooKeeper digest based on the instance secret that is used within
ZooKeeper for
- * authentication. This method is primary intended to be used to validate
ZooKeeper ACLs. Use
- * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+ * Given a zooCache and instanceId, look up the instance name.
*/
- public static Id getZkDigestAuthId(final String secret) {
+ public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+ requireNonNull(zk);
+ var instanceIdBytes =
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+ for (String name : getInstanceNames(zk)) {
+ var bytes = getInstanceIdBytesFromName(zk, name);
+ if (Arrays.equals(bytes, instanceIdBytes)) {
+ return name;
+ }
+ }
+ return null;
+ }
+
+ private static List<String> getInstanceNames(ZooSession zk) {
try {
- final String scheme = "digest";
- String auth = DigestAuthenticationProvider.generateDigest("accumulo:" +
secret);
- return new Id(scheme, auth);
- } catch (NoSuchAlgorithmException ex) {
- throw new IllegalArgumentException("Could not generate ZooKeeper digest
string", ex);
+ return new ZooReader(zk).getChildren(Constants.ZROOT +
Constants.ZINSTANCES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Interrupted reading instance names from
ZooKeeper", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Failed to read instance names from
ZooKeeper", e);
}
}
- public static void digestAuth(ZooKeeper zoo, String secret) {
- auth(zoo, "digest", ("accumulo:" + secret).getBytes(UTF_8));
+ private static byte[] getInstanceIdBytesFromName(ZooSession zk, String name)
{
+ try {
+ return new ZooReader(zk)
Review Comment:
Why not use `zk.asReader()`?
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
}
/**
- * Get the ZooKeeper digest based on the instance secret that is used within
ZooKeeper for
- * authentication. This method is primary intended to be used to validate
ZooKeeper ACLs. Use
- * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+ * Given a zooCache and instanceId, look up the instance name.
*/
- public static Id getZkDigestAuthId(final String secret) {
+ public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+ requireNonNull(zk);
+ var instanceIdBytes =
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+ for (String name : getInstanceNames(zk)) {
+ var bytes = getInstanceIdBytesFromName(zk, name);
+ if (Arrays.equals(bytes, instanceIdBytes)) {
+ return name;
+ }
+ }
+ return null;
+ }
+
+ private static List<String> getInstanceNames(ZooSession zk) {
try {
- final String scheme = "digest";
- String auth = DigestAuthenticationProvider.generateDigest("accumulo:" +
secret);
- return new Id(scheme, auth);
- } catch (NoSuchAlgorithmException ex) {
- throw new IllegalArgumentException("Could not generate ZooKeeper digest
string", ex);
+ return new ZooReader(zk).getChildren(Constants.ZROOT +
Constants.ZINSTANCES);
Review Comment:
Why not use `zk.asReader()` ?
##########
server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java:
##########
@@ -82,24 +82,26 @@ public void execute(String[] args) throws Exception {
ZooPropEditor.Opts opts = new ZooPropEditor.Opts();
opts.parseArgs(ZooPropEditor.class.getName(), args);
- ZooReaderWriter zrw = new ZooReaderWriter(opts.getSiteConfiguration());
-
var siteConfig = opts.getSiteConfiguration();
- try (ServerContext context = new ServerContext(siteConfig)) {
- PropStoreKey<?> propKey = getPropKey(context, opts);
- switch (opts.getCmdMode()) {
- case SET:
- setProperty(context, propKey, opts);
- break;
- case DELETE:
- deleteProperty(context, propKey, readPropNode(propKey, zrw), opts);
- break;
- case PRINT:
- printProperties(context, propKey, readPropNode(propKey, zrw));
- break;
- case ERROR:
- default:
- throw new IllegalArgumentException("Invalid operation requested");
+ try (var zk = new ZooSession(getClass().getSimpleName(), siteConfig)) {
+ var zrw = zk.asReaderWriter();
+
+ try (ServerContext context = new ServerContext(siteConfig)) {
Review Comment:
I think you could collapse these into one try-with-resources statement
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -116,7 +122,7 @@ public static String getRoot(final InstanceId instanceId) {
/**
* This method will delete a node and all its children.
*/
- public static void recursiveDelete(ZooKeeper zooKeeper, String zPath,
NodeMissingPolicy policy)
+ public static void recursiveDelete(ZooSession zooKeeper, String zPath,
NodeMissingPolicy policy)
Review Comment:
Does this do something different than `ZooSession.ZKUtil.deleteRecursive` ?
##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java:
##########
@@ -43,9 +43,9 @@ public class DeadServerList {
public DeadServerList(ServerContext context) {
this.path = context.getZooKeeperRoot() + Constants.ZDEADTSERVERS;
- zoo = context.getZooReaderWriter();
+ zoo = context.getZooSession().asReaderWriter();
try {
- context.getZooReaderWriter().mkdirs(path);
+ context.getZooSession().asReaderWriter().mkdirs(path);
Review Comment:
```suggestion
zoo.mkdirs(path);
```
##########
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java:
##########
@@ -644,74 +641,48 @@ private void verifyUp() throws InterruptedException,
IOException {
waitForProcessStart(tsp, "TabletServer" + tsExpectedCount);
}
- try (ZooKeeper zk = new ZooKeeper(getZooKeepers(), 60000, event ->
log.warn("{}", event))) {
-
- String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
-
- while (!(zk.getState() == States.CONNECTED)) {
- log.info("Waiting for ZK client to connect, state: {} - will retry",
zk.getState());
- Thread.sleep(1000);
- }
-
- String instanceId = null;
+ String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
+ String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" +
getInstanceName();
+ try (var zk = new ZooSession(MiniAccumuloClusterImpl.class.getSimpleName()
+ ".verifyUp()",
+ getZooKeepers(), 60000, secret)) {
+ var rdr = new ZooReader(zk);
Review Comment:
Use `zk.asReader()` ?
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java:
##########
@@ -678,7 +678,7 @@ public long getFlushID() throws NoNodeException {
try {
String zTablePath = tabletServer.getContext().getZooKeeperRoot() +
Constants.ZTABLES + "/"
+ extent.tableId() + Constants.ZTABLE_FLUSH_ID;
- String id = new String(context.getZooReaderWriter().getData(zTablePath),
UTF_8);
+ String id = new
String(context.getZooSession().asReaderWriter().getData(zTablePath), UTF_8);
Review Comment:
Do we need ReaderWriter to `getData()` ? There is a `getData()` on
`ZooSession`. There may actually be a lot of places where methods on
`ZooReader` are being called where there is a similar method on `ZooSession`.
I'm wondering if it might make sense to get rid of `ZooReader` and use only
`ZooSession`.
##########
server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java:
##########
@@ -59,59 +62,60 @@ public void setUp() {
expect(sconf.get(Property.INSTANCE_SECRET))
.andReturn(Property.INSTANCE_SECRET.getDefaultValue()).anyTimes();
expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1").anyTimes();
- zoo = createMock(ZooReaderWriter.class);
+ zk = createMock(ZooSession.class);
+ zrw = new ZooReaderWriter(zk);
}
@AfterEach
public void tearDown() {
- verify(sconf, zoo, fs);
+ verify(sconf, zk, fs);
}
@Test
public void testIsInitialized_HasInstanceId() throws Exception {
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(sconf, zoo, fs);
+ replay(sconf, zk, fs);
assertTrue(Initialize.isInitialized(fs, initConfig));
}
@Test
public void testIsInitialized_HasDataVersion() throws Exception {
expect(fs.exists(anyObject(Path.class))).andReturn(false);
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(sconf, zoo, fs);
+ replay(sconf, zk, fs);
assertTrue(Initialize.isInitialized(fs, initConfig));
}
@Test
public void testCheckInit_NoZK() throws Exception {
- expect(zoo.exists("/")).andReturn(false);
- replay(sconf, zoo, fs);
- assertThrows(IllegalStateException.class, () -> Initialize.checkInit(zoo,
fs, initConfig));
+ expect(zk.exists("/", null)).andReturn(null);
+ replay(sconf, zk, fs);
+ assertThrows(IllegalStateException.class, () -> Initialize.checkInit(zrw,
fs, initConfig));
}
@Test
public void testCheckInit_AlreadyInit() throws Exception {
- expect(zoo.exists("/")).andReturn(true);
+ expect(zk.exists("/", null)).andReturn(new Stat());
expect(fs.exists(anyObject(Path.class))).andReturn(true);
- replay(sconf, zoo, fs);
- assertThrows(IOException.class, () -> Initialize.checkInit(zoo, fs,
initConfig));
+ replay(sconf, zk, fs);
+ assertThrows(IOException.class, () -> Initialize.checkInit(zrw, fs,
initConfig));
}
@Test
public void testCheckInit_FSException() throws Exception {
- expect(zoo.exists("/")).andReturn(true);
+ expect(zk.exists("/", null)).andReturn(new Stat());
expect(fs.exists(anyObject(Path.class))).andThrow(new IOException());
- replay(sconf, zoo, fs);
- assertThrows(IOException.class, () -> Initialize.checkInit(zoo, fs,
initConfig));
+ replay(sconf, zk, fs);
+ assertThrows(IOException.class, () -> Initialize.checkInit(zrw, fs,
initConfig));
Review Comment:
You could change this to the following to remove the `zrw` instance variable:
```
expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk));
replay(sconf, zk, fs);
assertThrows(IOException.class, () ->
Initialize.checkInit(zk.asReaderWriter(), fs, initConfig));
```
I see now why you still need the ZR and ZRW constructors to be public.
However, I think you could probably make them `protected` so that they are only
constructed in `ZooSession`. For the tests, you could subclass them.
In other tests you are using a mock ZRW, not sure if this is possible here
##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java:
##########
@@ -704,7 +704,8 @@ public Pair<Long,CompactionConfig> getCompactionID() throws
NoNodeException {
+ extent.tableId() + Constants.ZTABLE_COMPACT_ID;
String[] tokens =
- new String(context.getZooReaderWriter().getData(zTablePath),
UTF_8).split(",");
+ new
String(context.getZooSession().asReaderWriter().getData(zTablePath), UTF_8)
Review Comment:
Same comment
##########
server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java:
##########
@@ -47,22 +47,20 @@ public static ServerContext get() {
return context;
}
- public static ServerContext getWithZK(InstanceId instanceID, String zk, int
zkTimeout) {
+ public static ServerContext getWithMockZK(ZooSession zk) {
var sc = get();
-
expect(sc.getZooKeeperRoot()).andReturn(ZooUtil.getRoot(instanceID)).anyTimes();
- expect(sc.getInstanceID()).andReturn(instanceID).anyTimes();
- expect(sc.zkUserPath()).andReturn(ZooUtil.getRoot(instanceID) +
Constants.ZUSERS).anyTimes();
- expect(sc.getZooKeepers()).andReturn(zk).anyTimes();
- expect(sc.getZooKeepersSessionTimeOut()).andReturn(zkTimeout).anyTimes();
+ var zrw = new ZooReaderWriter(zk);
Review Comment:
```suggestion
var zrw = zk.asReaderWriter();
```
##########
server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java:
##########
@@ -78,7 +81,7 @@ public void initMocks() {
VersionedProperties sysProps =
new VersionedProperties(1, Instant.now(), Map.of(GC_PORT.getKey(),
"1234",
TSERV_SCAN_MAX_OPENFILES.getKey(), "19",
TABLE_BLOOM_ENABLED.getKey(), "true"));
- expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).times(2);
+ expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).once();
Review Comment:
Do you know why this changed?
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
}
/**
- * Get the ZooKeeper digest based on the instance secret that is used within
ZooKeeper for
- * authentication. This method is primary intended to be used to validate
ZooKeeper ACLs. Use
- * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+ * Given a zooCache and instanceId, look up the instance name.
Review Comment:
```suggestion
* Given a ZooKeeper and instanceId, look up the instance name.
```
##########
test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java:
##########
@@ -101,11 +100,11 @@ public static void shutdownZK() throws Exception {
@BeforeEach
public void setupZnodes() throws Exception {
- var zrw = testZk.getZooReaderWriter();
+ var zrw = new ZooReaderWriter(zooKeeper);
instanceId = InstanceId.of(UUID.randomUUID());
- context = EasyMock.createNiceMock(ServerContext.class);
+ context = createMock(ServerContext.class);
expect(context.getInstanceID()).andReturn(instanceId).anyTimes();
- expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
+ expect(context.getZooSession()).andReturn(zooKeeper).anyTimes();
Review Comment:
Should `zooKeeper` be `zrw` on this line?
##########
server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java:
##########
@@ -42,10 +43,10 @@ public class ZooAuthenticationKeyWatcher implements Watcher
{
private final ZooReader zk;
private final String baseNode;
- public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager
secretManager, ZooReader zk,
+ public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager
secretManager, ZooSession zk,
String baseNode) {
this.secretManager = secretManager;
- this.zk = zk;
+ this.zk = new ZooReader(zk);
Review Comment:
use `zk.asReader()`?
##########
server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java:
##########
@@ -80,43 +81,46 @@ public static void main(String[] args) {
boolean printErrors = opts.printErrors;
listInstances(keepers, printAll, printErrors);
-
}
- static synchronized void listInstances(String keepers, boolean printAll,
boolean printErrors) {
+ static synchronized void listInstances(String keepers, boolean printAll,
boolean printErrors)
+ throws InterruptedException {
errors = 0;
System.out.println("INFO : Using ZooKeepers " + keepers);
- ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS);
- ZooCache cache = new ZooCache(rdr, null);
+ try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers,
+ ZOOKEEPER_TIMER_MILLIS, null)) {
+ ZooReader rdr = new ZooReader(zk);
Review Comment:
```suggestion
ZooReader rdr = zk.asReader();
```
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
}
/**
- * Get the ZooKeeper digest based on the instance secret that is used within
ZooKeeper for
- * authentication. This method is primary intended to be used to validate
ZooKeeper ACLs. Use
- * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+ * Given a zooCache and instanceId, look up the instance name.
*/
- public static Id getZkDigestAuthId(final String secret) {
+ public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+ requireNonNull(zk);
+ var instanceIdBytes =
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+ for (String name : getInstanceNames(zk)) {
+ var bytes = getInstanceIdBytesFromName(zk, name);
+ if (Arrays.equals(bytes, instanceIdBytes)) {
+ return name;
+ }
+ }
+ return null;
+ }
+
+ private static List<String> getInstanceNames(ZooSession zk) {
try {
- final String scheme = "digest";
- String auth = DigestAuthenticationProvider.generateDigest("accumulo:" +
secret);
- return new Id(scheme, auth);
- } catch (NoSuchAlgorithmException ex) {
- throw new IllegalArgumentException("Could not generate ZooKeeper digest
string", ex);
+ return new ZooReader(zk).getChildren(Constants.ZROOT +
Constants.ZINSTANCES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Interrupted reading instance names from
ZooKeeper", e);
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Failed to read instance names from
ZooKeeper", e);
}
}
- public static void digestAuth(ZooKeeper zoo, String secret) {
- auth(zoo, "digest", ("accumulo:" + secret).getBytes(UTF_8));
+ private static byte[] getInstanceIdBytesFromName(ZooSession zk, String name)
{
+ try {
+ return new ZooReader(zk)
+ .getData(Constants.ZROOT + Constants.ZINSTANCES + "/" +
requireNonNull(name));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(
+ "Interrupted reading InstanceId from ZooKeeper for instance named "
+ name, e);
+ } catch (KeeperException e) {
+ log.warn("Failed to read InstanceId from ZooKeeper for instance named
{}", name, e);
+ return null;
+ }
}
- public static void auth(ZooKeeper zoo, String scheme, byte[] auth) {
- zoo.addAuthInfo(scheme, auth);
+ public static Map<String,InstanceId> getInstanceMap(ZooSession zk) {
+ Map<String,InstanceId> idMap = new TreeMap<>();
+ getInstanceNames(zk).forEach(name -> {
+ byte[] instanceId = getInstanceIdBytesFromName(zk, name);
+ if (instanceId != null) {
+ idMap.put(name, InstanceId.of(new String(instanceId, UTF_8)));
+ }
+ });
+ return idMap;
+ }
+
+ public static InstanceId getInstanceId(ZooSession zk, String name) {
+ byte[] data = getInstanceIdBytesFromName(zk, name);
+ if (data == null) {
+ throw new IllegalStateException("Instance name " + name + " does not
exist in ZooKeeper. "
+ + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to
see a list.");
+ }
+ String instanceIdString = new String(data, UTF_8);
+ try {
+ // verify that the instanceId found via the name actually exists
+ if (new ZooReader(zk).getData(Constants.ZROOT + "/" + instanceIdString)
== null) {
Review Comment:
Same comment
##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If
the delegate instance
+ * loses its session, it is replaced with a new instance to establish a new
session. Any Watchers
+ * registered on a session will need to monitor for the session expired event
triggered from the old
+ * delegate, and must be reset on the new session if you intend them to
monitor any further events.
+ * That is no different than if you created a new ZooKeeper instance directly
after the first one
+ * expired.
+ */
+public class ZooSession implements AutoCloseable {
+
+ public static class ZKUtil {
+ public static void deleteRecursive(ZooSession zk, final String pathRoot)
+ throws InterruptedException, KeeperException {
+ org.apache.zookeeper.ZKUtil.deleteRecursive(zk.verifyConnected(),
pathRoot);
+ }
+
+ public static void visitSubTreeDFS(ZooSession zk, final String path,
boolean watch,
+ StringCallback cb) throws KeeperException, InterruptedException {
+ org.apache.zookeeper.ZKUtil.visitSubTreeDFS(zk.verifyConnected(), path,
watch, cb);
+ }
+ }
+
+ private static class ZooSessionWatcher implements Watcher {
+
+ private final String connectionName;
+ private final AtomicReference<KeeperState> lastState = new
AtomicReference<>(null);
+
+ public ZooSessionWatcher(String connectionName) {
+ this.connectionName = connectionName;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ final var newState = event.getState();
+ var oldState = lastState.getAndUpdate(s -> newState);
+ if (oldState == null) {
+ log.debug("{} state changed to {}", connectionName, newState);
+ } else if (newState != oldState) {
+ log.debug("{} state changed from {} to {}", connectionName, oldState,
newState);
+ }
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ZooSession.class);
+
+ private static void closeZk(ZooKeeper zk) {
+ if (zk != null) {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ // ZooKeeper doesn't actually throw this; it's just there for
backwards compatibility
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static void digestAuth(ZooKeeper zoo, String secret) {
+ zoo.addAuthInfo("digest", ("accumulo:" +
requireNonNull(secret)).getBytes(UTF_8));
+ }
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicLong connectCounter;
+ private final String connectString;
+ private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+ private final String instanceSecret;
+ private final String sessionName;
+ private final int timeout;
+ private final ZooReaderWriter zrw;
+
+ /**
+ * Construct a new ZooKeeper client, retrying indefinitely if it doesn't
work right away. The
+ * caller is responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param conf a convenient carrier of ZK connection information using
Accumulo properties
+ */
+ public ZooSession(String clientName, AccumuloConfiguration conf) {
+ this(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+ (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+ conf.get(Property.INSTANCE_SECRET));
+ }
+
+ /**
+ * Construct a new ZooKeeper client, retrying indefinitely if it doesn't
work right away. The
+ * caller is responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param connectString in the form of host1:port1,host2:port2/chroot/path
+ * @param timeout in milliseconds
+ * @param instanceSecret instance secret (may be null)
+ */
+ public ZooSession(String clientName, String connectString, int timeout,
String instanceSecret) {
+ // information needed to construct a ZooKeeper instance and add
authentication
+ this.connectString = connectString;
+ this.timeout = timeout;
+ this.instanceSecret = instanceSecret;
+
+ // information for logging which instance of ZooSession this is
+ this.sessionName =
+ String.format("%s[%s_%s]", getClass().getSimpleName(), clientName,
UUID.randomUUID());
+ this.connectCounter = new AtomicLong(); // incremented when we need to
create a new delegate
+ this.zrw = new ZooReaderWriter(this);
+ }
+
+ private ZooKeeper verifyConnected() {
+ if (closed.get()) {
+ throw new IllegalStateException(sessionName + " was closed");
+ }
+ return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive())
? zk : reconnect());
+ }
+
+ private synchronized ZooKeeper reconnect() {
+ ZooKeeper zk;
+ if ((zk = delegate.get()) != null && zk.getState().isAlive()) {
+ return zk;
+ }
+ zk = null;
+ var reconnectName = String.format("%s#%s", sessionName,
connectCounter.getAndIncrement());
+ log.debug("{} (re-)connecting to {} with timeout {}{}", reconnectName,
connectString, timeout,
+ instanceSecret == null ? "" : " with auth");
+ final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+ int connectTimeWait = Math.min(10_000, timeout);
+ boolean tryAgain = true;
+ long sleepTime = 100;
+
+ long startTime = System.nanoTime();
+
+ while (tryAgain) {
+ try {
+ zk = new ZooKeeper(connectString, timeout, new
ZooSessionWatcher(reconnectName));
+ // it may take some time to get connected to zookeeper if some of the
servers are down
+ for (int i = 0; i < connectTimeWait / TIME_BETWEEN_CONNECT_CHECKS_MS
&& tryAgain; i++) {
+ if (zk.getState().isConnected()) {
+ if (instanceSecret != null) {
+ digestAuth(zk, instanceSecret);
+ }
+ tryAgain = false;
+ } else {
+ UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+ }
+ }
+
+ } catch (IOException e) {
+ if (e instanceof UnknownHostException) {
+ /*
+ * Make sure we wait at least as long as the JVM TTL for negative
DNS responses
+ */
+ int ttl =
AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e);
+ sleepTime = Math.max(sleepTime, (ttl + 1) * 1000L);
+ }
+ log.warn("Connection to zooKeeper failed, will try again in "
+ + String.format("%.2f secs", sleepTime / 1000.0), e);
+ } finally {
+ if (tryAgain && zk != null) {
+ closeZk(zk);
+ zk = null;
+ }
+ }
+
+ long stopTime = System.nanoTime();
+ long duration = NANOSECONDS.toMillis(stopTime - startTime);
+
+ if (duration > 2L * timeout) {
+ throw new IllegalStateException("Failed to connect to zookeeper (" +
connectString
+ + ") within 2x zookeeper timeout period " + timeout);
+ }
+
+ if (tryAgain) {
+ if (2L * timeout < duration + sleepTime + connectTimeWait) {
+ sleepTime = 2L * timeout - duration - connectTimeWait;
+ }
+ if (sleepTime < 0) {
+ connectTimeWait -= sleepTime;
+ sleepTime = 0;
+ }
+ UtilWaitThread.sleep(sleepTime);
+ if (sleepTime < 10000) {
+ sleepTime = sleepTime + (long) (sleepTime *
RANDOM.get().nextDouble());
+ }
+ }
+ }
+ return zk;
+ }
+
+ public void addAuthInfo(String scheme, byte[] auth) {
+ verifyConnected().addAuthInfo(scheme, auth);
+ }
+
+ public String create(final String path, byte[] data, List<ACL> acl,
CreateMode createMode)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().create(path, data, acl, createMode);
+ }
+
+ public void delete(final String path, int version) throws
InterruptedException, KeeperException {
+ verifyConnected().delete(path, version);
+ }
+
+ public Stat exists(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().exists(path, watcher);
+ }
+
+ public List<ACL> getACL(final String path, Stat stat)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().getACL(path, stat);
+ }
+
+ public List<String> getChildren(final String path, Watcher watcher)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().getChildren(path, watcher);
+ }
+
+ public byte[] getData(final String path, Watcher watcher, Stat stat)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().getData(path, watcher, stat);
+ }
+
+ public long getSessionId() {
+ return verifyConnected().getSessionId();
+ }
+
+ public int getSessionTimeout() {
+ return verifyConnected().getSessionTimeout();
+ }
+
+ public void removeWatches(String path, Watcher watcher, WatcherType
watcherType, boolean local)
+ throws InterruptedException, KeeperException {
+ verifyConnected().removeWatches(path, watcher, watcherType, local);
+ }
+
+ public Stat setData(final String path, byte[] data, int version)
+ throws KeeperException, InterruptedException {
+ return verifyConnected().setData(path, data, version);
+ }
+
+ public void sync(final String path, VoidCallback cb, Object ctx) {
+ verifyConnected().sync(path, cb, ctx);
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ closeZk(delegate.getAndSet(null));
+ }
+ }
+
+ public void addAccumuloDigestAuth(String auth) {
+ digestAuth(verifyConnected(), auth);
+ }
+
+ public ZooReader asReader() {
+ return zrw;
Review Comment:
If this class was located in the same package as `ZooReader` and
`ZooReaderWriter`, then we could remove the `public` modifier from their
constructors and make this the only way to construct one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]