dlmarion commented on code in PR #5192:
URL: https://github.com/apache/accumulo/pull/5192#discussion_r1900952551


##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
@@ -51,10 +53,13 @@
  * A cache for values stored in ZooKeeper. Values are kept up to date as they 
change.
  */
 public class ZooCache {
+
+  public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

Review Comment:
   I'm wondering if we should change the name of `ZCacheWatcher` which is the 
watcher being used to process events from ZooKeeper and ends up calling the 
external watcher. Maybe just call it `ZooKeeperWatcher` or something that 
doesn't use `Cache`.



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If 
the delegate instance
+ * loses its session, it is replaced with a new instance to establish a new 
session. Any Watchers
+ * registered on a session will need to monitor for the session expired event 
triggered from the old
+ * delegate, and must be reset on the new session if you intend them to 
monitor any further events.
+ * That is no different than if you created a new ZooKeeper instance directly 
after the first one
+ * expired.
+ */
+public class ZooSession implements AutoCloseable {
+
+  public static class ZKUtil {
+    public static void deleteRecursive(ZooSession zk, final String pathRoot)
+        throws InterruptedException, KeeperException {
+      org.apache.zookeeper.ZKUtil.deleteRecursive(zk.verifyConnected(), 
pathRoot);
+    }
+
+    public static void visitSubTreeDFS(ZooSession zk, final String path, 
boolean watch,
+        StringCallback cb) throws KeeperException, InterruptedException {
+      org.apache.zookeeper.ZKUtil.visitSubTreeDFS(zk.verifyConnected(), path, 
watch, cb);
+    }
+  }
+
+  private static class ZooSessionWatcher implements Watcher {
+
+    private final String connectionName;
+    private final AtomicReference<KeeperState> lastState = new 
AtomicReference<>(null);
+
+    public ZooSessionWatcher(String connectionName) {
+      this.connectionName = connectionName;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      final var newState = event.getState();
+      var oldState = lastState.getAndUpdate(s -> newState);
+      if (oldState == null) {
+        log.debug("{} state changed to {}", connectionName, newState);
+      } else if (newState != oldState) {
+        log.debug("{} state changed from {} to {}", connectionName, oldState, 
newState);
+      }
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ZooSession.class);
+
+  private static void closeZk(ZooKeeper zk) {
+    if (zk != null) {
+      try {
+        zk.close();
+      } catch (InterruptedException e) {
+        // ZooKeeper doesn't actually throw this; it's just there for 
backwards compatibility
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static void digestAuth(ZooKeeper zoo, String secret) {
+    zoo.addAuthInfo("digest", ("accumulo:" + 
requireNonNull(secret)).getBytes(UTF_8));
+  }
+
+  private final AtomicBoolean closed = new AtomicBoolean();
+  private final AtomicLong connectCounter;
+  private final String connectString;
+  private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+  private final String instanceSecret;
+  private final String sessionName;
+  private final int timeout;
+  private final ZooReaderWriter zrw;
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param conf a convenient carrier of ZK connection information using 
Accumulo properties
+   */
+  public ZooSession(String clientName, AccumuloConfiguration conf) {
+    this(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+        (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+        conf.get(Property.INSTANCE_SECRET));
+  }
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param connectString in the form of host1:port1,host2:port2/chroot/path
+   * @param timeout in milliseconds
+   * @param instanceSecret instance secret (may be null)
+   */
+  public ZooSession(String clientName, String connectString, int timeout, 
String instanceSecret) {
+    // information needed to construct a ZooKeeper instance and add 
authentication
+    this.connectString = connectString;
+    this.timeout = timeout;
+    this.instanceSecret = instanceSecret;
+
+    // information for logging which instance of ZooSession this is
+    this.sessionName =
+        String.format("%s[%s_%s]", getClass().getSimpleName(), clientName, 
UUID.randomUUID());
+    this.connectCounter = new AtomicLong(); // incremented when we need to 
create a new delegate
+    this.zrw = new ZooReaderWriter(this);
+  }
+
+  private ZooKeeper verifyConnected() {
+    if (closed.get()) {
+      throw new IllegalStateException(sessionName + " was closed");
+    }
+    return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive()) 
? zk : reconnect());

Review Comment:
   I'm wondering about the memory effects of always updating the 
AtomicReference, even if it's unchanged. I'm wondering if this causes a CPU 
cache line invalidation unnecessarily. I don't have a definitive answer, but 
some of the things I looked at indicate that it does.
   
   @keith-turner  - do you know?



##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
   }
 
   /**
-   * Get the ZooKeeper digest based on the instance secret that is used within 
ZooKeeper for
-   * authentication. This method is primary intended to be used to validate 
ZooKeeper ACLs. Use
-   * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+   * Given a zooCache and instanceId, look up the instance name.
    */
-  public static Id getZkDigestAuthId(final String secret) {
+  public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+    requireNonNull(zk);
+    var instanceIdBytes = 
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+    for (String name : getInstanceNames(zk)) {
+      var bytes = getInstanceIdBytesFromName(zk, name);
+      if (Arrays.equals(bytes, instanceIdBytes)) {
+        return name;
+      }
+    }
+    return null;
+  }
+
+  private static List<String> getInstanceNames(ZooSession zk) {
     try {
-      final String scheme = "digest";
-      String auth = DigestAuthenticationProvider.generateDigest("accumulo:" + 
secret);
-      return new Id(scheme, auth);
-    } catch (NoSuchAlgorithmException ex) {
-      throw new IllegalArgumentException("Could not generate ZooKeeper digest 
string", ex);
+      return new ZooReader(zk).getChildren(Constants.ZROOT + 
Constants.ZINSTANCES);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted reading instance names from 
ZooKeeper", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to read instance names from 
ZooKeeper", e);
     }
   }
 
-  public static void digestAuth(ZooKeeper zoo, String secret) {
-    auth(zoo, "digest", ("accumulo:" + secret).getBytes(UTF_8));
+  private static byte[] getInstanceIdBytesFromName(ZooSession zk, String name) 
{
+    try {
+      return new ZooReader(zk)

Review Comment:
   Why not use `zk.asReader()`?



##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
   }
 
   /**
-   * Get the ZooKeeper digest based on the instance secret that is used within 
ZooKeeper for
-   * authentication. This method is primary intended to be used to validate 
ZooKeeper ACLs. Use
-   * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+   * Given a zooCache and instanceId, look up the instance name.
    */
-  public static Id getZkDigestAuthId(final String secret) {
+  public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+    requireNonNull(zk);
+    var instanceIdBytes = 
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+    for (String name : getInstanceNames(zk)) {
+      var bytes = getInstanceIdBytesFromName(zk, name);
+      if (Arrays.equals(bytes, instanceIdBytes)) {
+        return name;
+      }
+    }
+    return null;
+  }
+
+  private static List<String> getInstanceNames(ZooSession zk) {
     try {
-      final String scheme = "digest";
-      String auth = DigestAuthenticationProvider.generateDigest("accumulo:" + 
secret);
-      return new Id(scheme, auth);
-    } catch (NoSuchAlgorithmException ex) {
-      throw new IllegalArgumentException("Could not generate ZooKeeper digest 
string", ex);
+      return new ZooReader(zk).getChildren(Constants.ZROOT + 
Constants.ZINSTANCES);

Review Comment:
   Why not use `zk.asReader()` ?



##########
server/base/src/main/java/org/apache/accumulo/server/conf/util/ZooPropEditor.java:
##########
@@ -82,24 +82,26 @@ public void execute(String[] args) throws Exception {
     ZooPropEditor.Opts opts = new ZooPropEditor.Opts();
     opts.parseArgs(ZooPropEditor.class.getName(), args);
 
-    ZooReaderWriter zrw = new ZooReaderWriter(opts.getSiteConfiguration());
-
     var siteConfig = opts.getSiteConfiguration();
-    try (ServerContext context = new ServerContext(siteConfig)) {
-      PropStoreKey<?> propKey = getPropKey(context, opts);
-      switch (opts.getCmdMode()) {
-        case SET:
-          setProperty(context, propKey, opts);
-          break;
-        case DELETE:
-          deleteProperty(context, propKey, readPropNode(propKey, zrw), opts);
-          break;
-        case PRINT:
-          printProperties(context, propKey, readPropNode(propKey, zrw));
-          break;
-        case ERROR:
-        default:
-          throw new IllegalArgumentException("Invalid operation requested");
+    try (var zk = new ZooSession(getClass().getSimpleName(), siteConfig)) {
+      var zrw = zk.asReaderWriter();
+
+      try (ServerContext context = new ServerContext(siteConfig)) {

Review Comment:
   I think you could collapse these into one try-with-resources statement



##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -116,7 +122,7 @@ public static String getRoot(final InstanceId instanceId) {
   /**
    * This method will delete a node and all its children.
    */
-  public static void recursiveDelete(ZooKeeper zooKeeper, String zPath, 
NodeMissingPolicy policy)
+  public static void recursiveDelete(ZooSession zooKeeper, String zPath, 
NodeMissingPolicy policy)

Review Comment:
   Does this do something different than `ZooSession.ZKUtil.deleteRecursive` ?



##########
server/base/src/main/java/org/apache/accumulo/server/manager/state/DeadServerList.java:
##########
@@ -43,9 +43,9 @@ public class DeadServerList {
 
   public DeadServerList(ServerContext context) {
     this.path = context.getZooKeeperRoot() + Constants.ZDEADTSERVERS;
-    zoo = context.getZooReaderWriter();
+    zoo = context.getZooSession().asReaderWriter();
     try {
-      context.getZooReaderWriter().mkdirs(path);
+      context.getZooSession().asReaderWriter().mkdirs(path);

Review Comment:
   ```suggestion
         zoo.mkdirs(path);
   ```



##########
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java:
##########
@@ -644,74 +641,48 @@ private void verifyUp() throws InterruptedException, 
IOException {
       waitForProcessStart(tsp, "TabletServer" + tsExpectedCount);
     }
 
-    try (ZooKeeper zk = new ZooKeeper(getZooKeepers(), 60000, event -> 
log.warn("{}", event))) {
-
-      String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
-
-      while (!(zk.getState() == States.CONNECTED)) {
-        log.info("Waiting for ZK client to connect, state: {} - will retry", 
zk.getState());
-        Thread.sleep(1000);
-      }
-
-      String instanceId = null;
+    String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
+    String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + 
getInstanceName();
+    try (var zk = new ZooSession(MiniAccumuloClusterImpl.class.getSimpleName() 
+ ".verifyUp()",
+        getZooKeepers(), 60000, secret)) {
+      var rdr = new ZooReader(zk);

Review Comment:
   Use `zk.asReader()` ?



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java:
##########
@@ -678,7 +678,7 @@ public long getFlushID() throws NoNodeException {
     try {
       String zTablePath = tabletServer.getContext().getZooKeeperRoot() + 
Constants.ZTABLES + "/"
           + extent.tableId() + Constants.ZTABLE_FLUSH_ID;
-      String id = new String(context.getZooReaderWriter().getData(zTablePath), 
UTF_8);
+      String id = new 
String(context.getZooSession().asReaderWriter().getData(zTablePath), UTF_8);

Review Comment:
   Do we need ReaderWriter to `getData()` ? There is a `getData()` on 
`ZooSession`. There may actually be a lot of places where methods on 
`ZooReader` are being called where there is a similar method on `ZooSession`. 
I'm wondering if it might make sense to get rid of `ZooReader` and use only 
`ZooSession`.



##########
server/base/src/test/java/org/apache/accumulo/server/init/InitializeTest.java:
##########
@@ -59,59 +62,60 @@ public void setUp() {
     expect(sconf.get(Property.INSTANCE_SECRET))
         .andReturn(Property.INSTANCE_SECRET.getDefaultValue()).anyTimes();
     expect(sconf.get(Property.INSTANCE_ZK_HOST)).andReturn("zk1").anyTimes();
-    zoo = createMock(ZooReaderWriter.class);
+    zk = createMock(ZooSession.class);
+    zrw = new ZooReaderWriter(zk);
   }
 
   @AfterEach
   public void tearDown() {
-    verify(sconf, zoo, fs);
+    verify(sconf, zk, fs);
   }
 
   @Test
   public void testIsInitialized_HasInstanceId() throws Exception {
     expect(fs.exists(anyObject(Path.class))).andReturn(true);
-    replay(sconf, zoo, fs);
+    replay(sconf, zk, fs);
     assertTrue(Initialize.isInitialized(fs, initConfig));
   }
 
   @Test
   public void testIsInitialized_HasDataVersion() throws Exception {
     expect(fs.exists(anyObject(Path.class))).andReturn(false);
     expect(fs.exists(anyObject(Path.class))).andReturn(true);
-    replay(sconf, zoo, fs);
+    replay(sconf, zk, fs);
     assertTrue(Initialize.isInitialized(fs, initConfig));
   }
 
   @Test
   public void testCheckInit_NoZK() throws Exception {
-    expect(zoo.exists("/")).andReturn(false);
-    replay(sconf, zoo, fs);
-    assertThrows(IllegalStateException.class, () -> Initialize.checkInit(zoo, 
fs, initConfig));
+    expect(zk.exists("/", null)).andReturn(null);
+    replay(sconf, zk, fs);
+    assertThrows(IllegalStateException.class, () -> Initialize.checkInit(zrw, 
fs, initConfig));
   }
 
   @Test
   public void testCheckInit_AlreadyInit() throws Exception {
-    expect(zoo.exists("/")).andReturn(true);
+    expect(zk.exists("/", null)).andReturn(new Stat());
     expect(fs.exists(anyObject(Path.class))).andReturn(true);
-    replay(sconf, zoo, fs);
-    assertThrows(IOException.class, () -> Initialize.checkInit(zoo, fs, 
initConfig));
+    replay(sconf, zk, fs);
+    assertThrows(IOException.class, () -> Initialize.checkInit(zrw, fs, 
initConfig));
   }
 
   @Test
   public void testCheckInit_FSException() throws Exception {
-    expect(zoo.exists("/")).andReturn(true);
+    expect(zk.exists("/", null)).andReturn(new Stat());
     expect(fs.exists(anyObject(Path.class))).andThrow(new IOException());
-    replay(sconf, zoo, fs);
-    assertThrows(IOException.class, () -> Initialize.checkInit(zoo, fs, 
initConfig));
+    replay(sconf, zk, fs);
+    assertThrows(IOException.class, () -> Initialize.checkInit(zrw, fs, 
initConfig));

Review Comment:
   You could change this to the following to remove the `zrw` instance variable:
   
   ```
       expect(zk.asReaderWriter()).andReturn(new ZooReaderWriter(zk));
       replay(sconf, zk, fs);
       assertThrows(IOException.class, () -> 
Initialize.checkInit(zk.asReaderWriter(), fs, initConfig));
   ```
   
   I see now why you still need the ZR and ZRW constructors to be public. 
However, I think you could probably make them `protected` so that they are only 
constructed in `ZooSession`. For the tests, you could subclass them.
   
   In other tests you are using a mock ZRW, not sure if this is possible here



##########
server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java:
##########
@@ -704,7 +704,8 @@ public Pair<Long,CompactionConfig> getCompactionID() throws 
NoNodeException {
           + extent.tableId() + Constants.ZTABLE_COMPACT_ID;
 
       String[] tokens =
-          new String(context.getZooReaderWriter().getData(zTablePath), 
UTF_8).split(",");
+          new 
String(context.getZooSession().asReaderWriter().getData(zTablePath), UTF_8)

Review Comment:
   Same comment



##########
server/base/src/test/java/org/apache/accumulo/server/MockServerContext.java:
##########
@@ -47,22 +47,20 @@ public static ServerContext get() {
     return context;
   }
 
-  public static ServerContext getWithZK(InstanceId instanceID, String zk, int 
zkTimeout) {
+  public static ServerContext getWithMockZK(ZooSession zk) {
     var sc = get();
-    
expect(sc.getZooKeeperRoot()).andReturn(ZooUtil.getRoot(instanceID)).anyTimes();
-    expect(sc.getInstanceID()).andReturn(instanceID).anyTimes();
-    expect(sc.zkUserPath()).andReturn(ZooUtil.getRoot(instanceID) + 
Constants.ZUSERS).anyTimes();
-    expect(sc.getZooKeepers()).andReturn(zk).anyTimes();
-    expect(sc.getZooKeepersSessionTimeOut()).andReturn(zkTimeout).anyTimes();
+    var zrw = new ZooReaderWriter(zk);

Review Comment:
   ```suggestion
       var zrw = zk.asReaderWriter();
   ```



##########
server/base/src/test/java/org/apache/accumulo/server/conf/SystemConfigurationTest.java:
##########
@@ -78,7 +81,7 @@ public void initMocks() {
     VersionedProperties sysProps =
         new VersionedProperties(1, Instant.now(), Map.of(GC_PORT.getKey(), 
"1234",
             TSERV_SCAN_MAX_OPENFILES.getKey(), "19", 
TABLE_BLOOM_ENABLED.getKey(), "true"));
-    expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).times(2);
+    expect(propStore.get(eq(sysPropKey))).andReturn(sysProps).once();

Review Comment:
   Do you know why this changed?



##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
   }
 
   /**
-   * Get the ZooKeeper digest based on the instance secret that is used within 
ZooKeeper for
-   * authentication. This method is primary intended to be used to validate 
ZooKeeper ACLs. Use
-   * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+   * Given a zooCache and instanceId, look up the instance name.

Review Comment:
   ```suggestion
      * Given a ZooKeeper and instanceId, look up the instance name.
   ```



##########
test/src/main/java/org/apache/accumulo/test/conf/store/PropStoreZooKeeperIT.java:
##########
@@ -101,11 +100,11 @@ public static void shutdownZK() throws Exception {
 
   @BeforeEach
   public void setupZnodes() throws Exception {
-    var zrw = testZk.getZooReaderWriter();
+    var zrw = new ZooReaderWriter(zooKeeper);
     instanceId = InstanceId.of(UUID.randomUUID());
-    context = EasyMock.createNiceMock(ServerContext.class);
+    context = createMock(ServerContext.class);
     expect(context.getInstanceID()).andReturn(instanceId).anyTimes();
-    expect(context.getZooReaderWriter()).andReturn(zrw).anyTimes();
+    expect(context.getZooSession()).andReturn(zooKeeper).anyTimes();

Review Comment:
   Should `zooKeeper` be `zrw` on this line?



##########
server/base/src/main/java/org/apache/accumulo/server/security/delegation/ZooAuthenticationKeyWatcher.java:
##########
@@ -42,10 +43,10 @@ public class ZooAuthenticationKeyWatcher implements Watcher 
{
   private final ZooReader zk;
   private final String baseNode;
 
-  public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager 
secretManager, ZooReader zk,
+  public ZooAuthenticationKeyWatcher(AuthenticationTokenSecretManager 
secretManager, ZooSession zk,
       String baseNode) {
     this.secretManager = secretManager;
-    this.zk = zk;
+    this.zk = new ZooReader(zk);

Review Comment:
   use `zk.asReader()`?



##########
server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java:
##########
@@ -80,43 +81,46 @@ public static void main(String[] args) {
     boolean printErrors = opts.printErrors;
 
     listInstances(keepers, printAll, printErrors);
-
   }
 
-  static synchronized void listInstances(String keepers, boolean printAll, 
boolean printErrors) {
+  static synchronized void listInstances(String keepers, boolean printAll, 
boolean printErrors)
+      throws InterruptedException {
     errors = 0;
 
     System.out.println("INFO : Using ZooKeepers " + keepers);
-    ZooReader rdr = new ZooReader(keepers, ZOOKEEPER_TIMER_MILLIS);
-    ZooCache cache = new ZooCache(rdr, null);
+    try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers,
+        ZOOKEEPER_TIMER_MILLIS, null)) {
+      ZooReader rdr = new ZooReader(zk);

Review Comment:
   ```suggestion
         ZooReader rdr = zk.asReader();
   ```
   



##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -170,26 +176,78 @@ private static String getFmtTime(final long epoch) {
   }
 
   /**
-   * Get the ZooKeeper digest based on the instance secret that is used within 
ZooKeeper for
-   * authentication. This method is primary intended to be used to validate 
ZooKeeper ACLs. Use
-   * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+   * Given a zooCache and instanceId, look up the instance name.
    */
-  public static Id getZkDigestAuthId(final String secret) {
+  public static String getInstanceName(ZooSession zk, InstanceId instanceId) {
+    requireNonNull(zk);
+    var instanceIdBytes = 
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+    for (String name : getInstanceNames(zk)) {
+      var bytes = getInstanceIdBytesFromName(zk, name);
+      if (Arrays.equals(bytes, instanceIdBytes)) {
+        return name;
+      }
+    }
+    return null;
+  }
+
+  private static List<String> getInstanceNames(ZooSession zk) {
     try {
-      final String scheme = "digest";
-      String auth = DigestAuthenticationProvider.generateDigest("accumulo:" + 
secret);
-      return new Id(scheme, auth);
-    } catch (NoSuchAlgorithmException ex) {
-      throw new IllegalArgumentException("Could not generate ZooKeeper digest 
string", ex);
+      return new ZooReader(zk).getChildren(Constants.ZROOT + 
Constants.ZINSTANCES);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("Interrupted reading instance names from 
ZooKeeper", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to read instance names from 
ZooKeeper", e);
     }
   }
 
-  public static void digestAuth(ZooKeeper zoo, String secret) {
-    auth(zoo, "digest", ("accumulo:" + secret).getBytes(UTF_8));
+  private static byte[] getInstanceIdBytesFromName(ZooSession zk, String name) 
{
+    try {
+      return new ZooReader(zk)
+          .getData(Constants.ZROOT + Constants.ZINSTANCES + "/" + 
requireNonNull(name));
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(
+          "Interrupted reading InstanceId from ZooKeeper for instance named " 
+ name, e);
+    } catch (KeeperException e) {
+      log.warn("Failed to read InstanceId from ZooKeeper for instance named 
{}", name, e);
+      return null;
+    }
   }
 
-  public static void auth(ZooKeeper zoo, String scheme, byte[] auth) {
-    zoo.addAuthInfo(scheme, auth);
+  public static Map<String,InstanceId> getInstanceMap(ZooSession zk) {
+    Map<String,InstanceId> idMap = new TreeMap<>();
+    getInstanceNames(zk).forEach(name -> {
+      byte[] instanceId = getInstanceIdBytesFromName(zk, name);
+      if (instanceId != null) {
+        idMap.put(name, InstanceId.of(new String(instanceId, UTF_8)));
+      }
+    });
+    return idMap;
+  }
+
+  public static InstanceId getInstanceId(ZooSession zk, String name) {
+    byte[] data = getInstanceIdBytesFromName(zk, name);
+    if (data == null) {
+      throw new IllegalStateException("Instance name " + name + " does not 
exist in ZooKeeper. "
+          + "Run \"accumulo org.apache.accumulo.server.util.ListInstances\" to 
see a list.");
+    }
+    String instanceIdString = new String(data, UTF_8);
+    try {
+      // verify that the instanceId found via the name actually exists
+      if (new ZooReader(zk).getData(Constants.ZROOT + "/" + instanceIdString) 
== null) {

Review Comment:
   Same comment



##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If 
the delegate instance
+ * loses its session, it is replaced with a new instance to establish a new 
session. Any Watchers
+ * registered on a session will need to monitor for the session expired event 
triggered from the old
+ * delegate, and must be reset on the new session if you intend them to 
monitor any further events.
+ * That is no different than if you created a new ZooKeeper instance directly 
after the first one
+ * expired.
+ */
+public class ZooSession implements AutoCloseable {
+
+  public static class ZKUtil {
+    public static void deleteRecursive(ZooSession zk, final String pathRoot)
+        throws InterruptedException, KeeperException {
+      org.apache.zookeeper.ZKUtil.deleteRecursive(zk.verifyConnected(), 
pathRoot);
+    }
+
+    public static void visitSubTreeDFS(ZooSession zk, final String path, 
boolean watch,
+        StringCallback cb) throws KeeperException, InterruptedException {
+      org.apache.zookeeper.ZKUtil.visitSubTreeDFS(zk.verifyConnected(), path, 
watch, cb);
+    }
+  }
+
+  private static class ZooSessionWatcher implements Watcher {
+
+    private final String connectionName;
+    private final AtomicReference<KeeperState> lastState = new 
AtomicReference<>(null);
+
+    public ZooSessionWatcher(String connectionName) {
+      this.connectionName = connectionName;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      final var newState = event.getState();
+      var oldState = lastState.getAndUpdate(s -> newState);
+      if (oldState == null) {
+        log.debug("{} state changed to {}", connectionName, newState);
+      } else if (newState != oldState) {
+        log.debug("{} state changed from {} to {}", connectionName, oldState, 
newState);
+      }
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ZooSession.class);
+
+  private static void closeZk(ZooKeeper zk) {
+    if (zk != null) {
+      try {
+        zk.close();
+      } catch (InterruptedException e) {
+        // ZooKeeper doesn't actually throw this; it's just there for 
backwards compatibility
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static void digestAuth(ZooKeeper zoo, String secret) {
+    zoo.addAuthInfo("digest", ("accumulo:" + 
requireNonNull(secret)).getBytes(UTF_8));
+  }
+
+  private final AtomicBoolean closed = new AtomicBoolean();
+  private final AtomicLong connectCounter;
+  private final String connectString;
+  private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+  private final String instanceSecret;
+  private final String sessionName;
+  private final int timeout;
+  private final ZooReaderWriter zrw;
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param conf a convenient carrier of ZK connection information using 
Accumulo properties
+   */
+  public ZooSession(String clientName, AccumuloConfiguration conf) {
+    this(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+        (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+        conf.get(Property.INSTANCE_SECRET));
+  }
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param connectString in the form of host1:port1,host2:port2/chroot/path
+   * @param timeout in milliseconds
+   * @param instanceSecret instance secret (may be null)
+   */
+  public ZooSession(String clientName, String connectString, int timeout, 
String instanceSecret) {
+    // information needed to construct a ZooKeeper instance and add 
authentication
+    this.connectString = connectString;
+    this.timeout = timeout;
+    this.instanceSecret = instanceSecret;
+
+    // information for logging which instance of ZooSession this is
+    this.sessionName =
+        String.format("%s[%s_%s]", getClass().getSimpleName(), clientName, 
UUID.randomUUID());
+    this.connectCounter = new AtomicLong(); // incremented when we need to 
create a new delegate
+    this.zrw = new ZooReaderWriter(this);
+  }
+
+  private ZooKeeper verifyConnected() {
+    if (closed.get()) {
+      throw new IllegalStateException(sessionName + " was closed");
+    }
+    return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive()) 
? zk : reconnect());
+  }
+
+  private synchronized ZooKeeper reconnect() {
+    ZooKeeper zk;
+    if ((zk = delegate.get()) != null && zk.getState().isAlive()) {
+      return zk;
+    }
+    zk = null;
+    var reconnectName = String.format("%s#%s", sessionName, 
connectCounter.getAndIncrement());
+    log.debug("{} (re-)connecting to {} with timeout {}{}", reconnectName, 
connectString, timeout,
+        instanceSecret == null ? "" : " with auth");
+    final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+    int connectTimeWait = Math.min(10_000, timeout);
+    boolean tryAgain = true;
+    long sleepTime = 100;
+
+    long startTime = System.nanoTime();
+
+    while (tryAgain) {
+      try {
+        zk = new ZooKeeper(connectString, timeout, new 
ZooSessionWatcher(reconnectName));
+        // it may take some time to get connected to zookeeper if some of the 
servers are down
+        for (int i = 0; i < connectTimeWait / TIME_BETWEEN_CONNECT_CHECKS_MS 
&& tryAgain; i++) {
+          if (zk.getState().isConnected()) {
+            if (instanceSecret != null) {
+              digestAuth(zk, instanceSecret);
+            }
+            tryAgain = false;
+          } else {
+            UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+          }
+        }
+
+      } catch (IOException e) {
+        if (e instanceof UnknownHostException) {
+          /*
+           * Make sure we wait at least as long as the JVM TTL for negative 
DNS responses
+           */
+          int ttl = 
AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e);
+          sleepTime = Math.max(sleepTime, (ttl + 1) * 1000L);
+        }
+        log.warn("Connection to zooKeeper failed, will try again in "
+            + String.format("%.2f secs", sleepTime / 1000.0), e);
+      } finally {
+        if (tryAgain && zk != null) {
+          closeZk(zk);
+          zk = null;
+        }
+      }
+
+      long stopTime = System.nanoTime();
+      long duration = NANOSECONDS.toMillis(stopTime - startTime);
+
+      if (duration > 2L * timeout) {
+        throw new IllegalStateException("Failed to connect to zookeeper (" + 
connectString
+            + ") within 2x zookeeper timeout period " + timeout);
+      }
+
+      if (tryAgain) {
+        if (2L * timeout < duration + sleepTime + connectTimeWait) {
+          sleepTime = 2L * timeout - duration - connectTimeWait;
+        }
+        if (sleepTime < 0) {
+          connectTimeWait -= sleepTime;
+          sleepTime = 0;
+        }
+        UtilWaitThread.sleep(sleepTime);
+        if (sleepTime < 10000) {
+          sleepTime = sleepTime + (long) (sleepTime * 
RANDOM.get().nextDouble());
+        }
+      }
+    }
+    return zk;
+  }
+
+  public void addAuthInfo(String scheme, byte[] auth) {
+    verifyConnected().addAuthInfo(scheme, auth);
+  }
+
+  public String create(final String path, byte[] data, List<ACL> acl, 
CreateMode createMode)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().create(path, data, acl, createMode);
+  }
+
+  public void delete(final String path, int version) throws 
InterruptedException, KeeperException {
+    verifyConnected().delete(path, version);
+  }
+
+  public Stat exists(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().exists(path, watcher);
+  }
+
+  public List<ACL> getACL(final String path, Stat stat)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().getACL(path, stat);
+  }
+
+  public List<String> getChildren(final String path, Watcher watcher)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().getChildren(path, watcher);
+  }
+
+  public byte[] getData(final String path, Watcher watcher, Stat stat)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().getData(path, watcher, stat);
+  }
+
+  public long getSessionId() {
+    return verifyConnected().getSessionId();
+  }
+
+  public int getSessionTimeout() {
+    return verifyConnected().getSessionTimeout();
+  }
+
+  public void removeWatches(String path, Watcher watcher, WatcherType 
watcherType, boolean local)
+      throws InterruptedException, KeeperException {
+    verifyConnected().removeWatches(path, watcher, watcherType, local);
+  }
+
+  public Stat setData(final String path, byte[] data, int version)
+      throws KeeperException, InterruptedException {
+    return verifyConnected().setData(path, data, version);
+  }
+
+  public void sync(final String path, VoidCallback cb, Object ctx) {
+    verifyConnected().sync(path, cb, ctx);
+  }
+
+  @Override
+  public void close() {
+    if (closed.compareAndSet(false, true)) {
+      closeZk(delegate.getAndSet(null));
+    }
+  }
+
+  public void addAccumuloDigestAuth(String auth) {
+    digestAuth(verifyConnected(), auth);
+  }
+
+  public ZooReader asReader() {
+    return zrw;

Review Comment:
   If this class was located in the same package as `ZooReader` and 
`ZooReaderWriter`, then we could remove the `public` modifier from their 
constructors and make this the only way to construct one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to