ctubbsii commented on code in PR #5192:
URL: https://github.com/apache/accumulo/pull/5192#discussion_r1901297178


##########
core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java:
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.zookeeper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.Watcher.WatcherType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A ZooKeeper client facade that maintains a ZooKeeper delegate instance. If 
the delegate instance
+ * loses its session, it is replaced with a new instance to establish a new 
session. Any Watchers
+ * registered on a session will need to monitor for the session expired event 
triggered from the old
+ * delegate, and must be reset on the new session if you intend them to 
monitor any further events.
+ * That is no different than if you created a new ZooKeeper instance directly 
after the first one
+ * expired.
+ */
+public class ZooSession implements AutoCloseable {
+
+  public static class ZKUtil {
+    public static void deleteRecursive(ZooSession zk, final String pathRoot)
+        throws InterruptedException, KeeperException {
+      org.apache.zookeeper.ZKUtil.deleteRecursive(zk.verifyConnected(), 
pathRoot);
+    }
+
+    public static void visitSubTreeDFS(ZooSession zk, final String path, 
boolean watch,
+        StringCallback cb) throws KeeperException, InterruptedException {
+      org.apache.zookeeper.ZKUtil.visitSubTreeDFS(zk.verifyConnected(), path, 
watch, cb);
+    }
+  }
+
+  private static class ZooSessionWatcher implements Watcher {
+
+    private final String connectionName;
+    private final AtomicReference<KeeperState> lastState = new 
AtomicReference<>(null);
+
+    public ZooSessionWatcher(String connectionName) {
+      this.connectionName = connectionName;
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+      final var newState = event.getState();
+      var oldState = lastState.getAndUpdate(s -> newState);
+      if (oldState == null) {
+        log.debug("{} state changed to {}", connectionName, newState);
+      } else if (newState != oldState) {
+        log.debug("{} state changed from {} to {}", connectionName, oldState, 
newState);
+      }
+    }
+  }
+
+  private static final Logger log = LoggerFactory.getLogger(ZooSession.class);
+
+  private static void closeZk(ZooKeeper zk) {
+    if (zk != null) {
+      try {
+        zk.close();
+      } catch (InterruptedException e) {
+        // ZooKeeper doesn't actually throw this; it's just there for 
backwards compatibility
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private static void digestAuth(ZooKeeper zoo, String secret) {
+    zoo.addAuthInfo("digest", ("accumulo:" + 
requireNonNull(secret)).getBytes(UTF_8));
+  }
+
+  private final AtomicBoolean closed = new AtomicBoolean();
+  private final AtomicLong connectCounter;
+  private final String connectString;
+  private final AtomicReference<ZooKeeper> delegate = new AtomicReference<>();
+  private final String instanceSecret;
+  private final String sessionName;
+  private final int timeout;
+  private final ZooReaderWriter zrw;
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param conf a convenient carrier of ZK connection information using 
Accumulo properties
+   */
+  public ZooSession(String clientName, AccumuloConfiguration conf) {
+    this(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+        (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+        conf.get(Property.INSTANCE_SECRET));
+  }
+
+  /**
+   * Construct a new ZooKeeper client, retrying indefinitely if it doesn't 
work right away. The
+   * caller is responsible for closing instances returned from this method.
+   *
+   * @param clientName a convenient name for logging its connection state 
changes
+   * @param connectString in the form of host1:port1,host2:port2/chroot/path
+   * @param timeout in milliseconds
+   * @param instanceSecret instance secret (may be null)
+   */
+  public ZooSession(String clientName, String connectString, int timeout, 
String instanceSecret) {
+    // information needed to construct a ZooKeeper instance and add 
authentication
+    this.connectString = connectString;
+    this.timeout = timeout;
+    this.instanceSecret = instanceSecret;
+
+    // information for logging which instance of ZooSession this is
+    this.sessionName =
+        String.format("%s[%s_%s]", getClass().getSimpleName(), clientName, 
UUID.randomUUID());
+    this.connectCounter = new AtomicLong(); // incremented when we need to 
create a new delegate
+    this.zrw = new ZooReaderWriter(this);
+  }
+
+  private ZooKeeper verifyConnected() {
+    if (closed.get()) {
+      throw new IllegalStateException(sessionName + " was closed");
+    }
+    return delegate.updateAndGet(zk -> (zk != null && zk.getState().isAlive()) 
? zk : reconnect());

Review Comment:
   I don't think the atomic update changes the memory effects. Those are 
inherent to the use of an atomic/volatile object. While that's something to be 
aware of, that's the trade-off we get for holding a singleton reference, and I 
don't think it has any substantial performance impact here, especially given 
that calls to this are adjacent to network IO, which is going to be 
substantially slower than any CPU cache effects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to