dlmarion commented on code in PR #5192:
URL: https://github.com/apache/accumulo/pull/5192#discussion_r1888489508
##########
core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java:
##########
@@ -523,7 +526,7 @@ public long timeCreated(long tid) {
verifyReserved(tid);
try {
- Stat stat = zk.getZooKeeper().exists(getTXPath(tid), false);
+ Stat stat = zk.exists(getTXPath(tid), false);
Review Comment:
Is there a reason that this was not changed to use `zrw`? If it's changed,
then I think we can remove the `zk` instance variable.
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java:
##########
Review Comment:
For a shared ZooCache we probably want to remove the `clear()` method or
make it private. The places that were creating their own ZooCache and calling
`clear` should call `clear(Predicate<String>)` instead and remove everything
that starts with the prefix that the code handles. `clear(Predicate<String>)`
was added in `main` (see
[here](https://github.com/apache/accumulo/blob/main/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java#L518))
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -126,9 +126,7 @@ public class ClientContext implements AccumuloClient {
private static final Logger log =
LoggerFactory.getLogger(ClientContext.class);
private final ClientInfo info;
- private InstanceId instanceId;
- private final ZooReader zooReader;
Review Comment:
I'm thinking that we should still have ZooReader / ZooReaderWriter
instances on the context objects. How does removing them from the context
objects improve things?
##########
core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java:
##########
@@ -169,27 +212,198 @@ private static String getFmtTime(final long epoch) {
return fmt.format(timestamp);
}
+ public static void digestAuth(ZooKeeper zoo, String secret) {
+ zoo.addAuthInfo("digest", ("accumulo:" + secret).getBytes(UTF_8));
+ }
+
/**
- * Get the ZooKeeper digest based on the instance secret that is used within
ZooKeeper for
- * authentication. This method is primary intended to be used to validate
ZooKeeper ACLs. Use
- * {@link #digestAuth(ZooKeeper, String)} to add authorizations to ZooKeeper.
+ * Construct a new ZooKeeper client, retrying if it doesn't work right away.
The caller is
+ * responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param conf a convenient carrier of ZK connection information using
Accumulo properties
*/
- public static Id getZkDigestAuthId(final String secret) {
- try {
- final String scheme = "digest";
- String auth = DigestAuthenticationProvider.generateDigest("accumulo:" +
secret);
- return new Id(scheme, auth);
- } catch (NoSuchAlgorithmException ex) {
- throw new IllegalArgumentException("Could not generate ZooKeeper digest
string", ex);
+ public static ZooKeeper connect(String clientName, AccumuloConfiguration
conf) {
+ return ZooUtil.connect(clientName, conf.get(Property.INSTANCE_ZK_HOST),
+ (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+ conf.get(Property.INSTANCE_SECRET));
+ }
+
+ /**
+ * Construct a new ZooKeeper client, retrying if it doesn't work right away.
The caller is
+ * responsible for closing instances returned from this method.
+ *
+ * @param clientName a convenient name for logging its connection state
changes
+ * @param connectString in the form of host1:port1,host2:port2/chroot/path
+ * @param timeout in milliseconds
+ * @param instanceSecret instance secret (may be null)
+ */
+ public static ZooKeeper connect(String clientName, String connectString, int
timeout,
+ String instanceSecret) {
+ log.debug("Connecting to {} with timeout {} with auth", connectString,
timeout);
+ final int TIME_BETWEEN_CONNECT_CHECKS_MS = 100;
+ int connectTimeWait = Math.min(10_000, timeout);
+ boolean tryAgain = true;
+ long sleepTime = 100;
+ ZooKeeper zk = null;
+
+ var watcher = new ZooSessionWatcher(clientName);
+
+ long startTime = System.nanoTime();
+
+ while (tryAgain) {
+ try {
+ zk = new ZooKeeper(connectString, timeout, watcher);
+ // it may take some time to get connected to zookeeper if some of the
servers are down
+ for (int i = 0; i < connectTimeWait / TIME_BETWEEN_CONNECT_CHECKS_MS
&& tryAgain; i++) {
+ if (zk.getState().isConnected()) {
+ if (instanceSecret != null) {
+ ZooUtil.digestAuth(zk, instanceSecret);
+ }
+ tryAgain = false;
+ } else {
+ UtilWaitThread.sleep(TIME_BETWEEN_CONNECT_CHECKS_MS);
+ }
+ }
+
+ } catch (IOException e) {
+ if (e instanceof UnknownHostException) {
+ /*
+ * Make sure we wait at least as long as the JVM TTL for negative
DNS responses
+ */
+ int ttl =
AddressUtil.getAddressCacheNegativeTtl((UnknownHostException) e);
+ sleepTime = Math.max(sleepTime, (ttl + 1) * 1000L);
+ }
+ log.warn("Connection to zooKeeper failed, will try again in "
+ + String.format("%.2f secs", sleepTime / 1000.0), e);
+ } finally {
+ if (tryAgain && zk != null) {
+ try {
+ zk.close();
+ zk = null;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("interrupted", e);
+ }
+ }
+ }
+
+ long stopTime = System.nanoTime();
+ long duration = NANOSECONDS.toMillis(stopTime - startTime);
+
+ if (duration > 2L * timeout) {
+ throw new IllegalStateException("Failed to connect to zookeeper (" +
connectString
+ + ") within 2x zookeeper timeout period " + timeout);
+ }
+
+ if (tryAgain) {
+ if (2L * timeout < duration + sleepTime + connectTimeWait) {
+ sleepTime = 2L * timeout - duration - connectTimeWait;
+ }
+ if (sleepTime < 0) {
+ connectTimeWait -= sleepTime;
+ sleepTime = 0;
+ }
+ UtilWaitThread.sleep(sleepTime);
+ if (sleepTime < 10000) {
+ sleepTime = sleepTime + (long) (sleepTime *
RANDOM.get().nextDouble());
+ }
+ }
}
+
+ return zk;
}
- public static void digestAuth(ZooKeeper zoo, String secret) {
- auth(zoo, "digest", ("accumulo:" + secret).getBytes(UTF_8));
+ /**
+ * Given a zooCache and instanceId, look up the instance name.
+ */
+ public static String getInstanceName(String zooKeepers, int zkSessionTimeout,
+ InstanceId instanceId) {
+ requireNonNull(zooKeepers);
+ var instanceIdBytes =
requireNonNull(instanceId).canonical().getBytes(UTF_8);
+ try (var zk = connect("ZooUtil.getInstanceName", zooKeepers,
zkSessionTimeout, null)) {
+ for (String name : zk.getChildren(Constants.ZROOT +
Constants.ZINSTANCES, false)) {
+ var bytes = zk.getData(Constants.ZROOT + Constants.ZINSTANCES + "/" +
name, false, null);
+ if (Arrays.equals(bytes, instanceIdBytes)) {
+ return name;
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
Review Comment:
I think we should not have started doing this because I don't know that we
check this and or clear it in a lot of cases. If the condition is not cleared
by the code that calls this method, then the condition persists on the thread
until it is checked and cleared. It's possible that some library we use may
check this condition and do something unintended. I'm wondering if we should
just throw InterruptedException.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]