dlmarion commented on code in PR #3230:
URL: https://github.com/apache/accumulo/pull/3230#discussion_r1132531140


##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/PreUpgradeValidation.java:
##########
@@ -117,4 +138,111 @@ private void fail(Exception e) {
     System.exit(1);
   }
 
+  private void validateTableLocks(final ServerContext context) {
+
+    final ZooReaderWriter zrw = context.getZooReaderWriter();
+    final ZooKeeper zk = zrw.getZooKeeper();
+    final String rootPath = context.getZooKeeperRoot();
+    final String tserverLockRoot = rootPath + Constants.ZTSERVERS;
+
+    log.debug("Looking for locks that may be from previous version in path: 
{}", tserverLockRoot);
+
+    AtomicInteger errorCount = new AtomicInteger(0);
+
+    List<Pair<HostAndPort,ServiceLock.ServiceLockPath>> hostsWithLocks =
+        gatherLocks(zk, tserverLockRoot);
+
+    // try a thrift call to the hosts - hosts running previous versions will 
fail the call
+    ThreadPoolExecutor lockCheckPool = 
ThreadPools.getServerThreadPools().createThreadPool(8, 64,

Review Comment:
   ```suggestion
       ThreadPoolExecutor lockCheckPool = 
ThreadPools.getServerThreadPools().createThreadPool(8, 
Runtime.getRuntime().availableProcessors() - 2,
   ```
   
   Why not use almost all of the available processors?



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/PreUpgradeValidation.java:
##########
@@ -117,4 +138,111 @@ private void fail(Exception e) {
     System.exit(1);
   }
 
+  private void validateTableLocks(final ServerContext context) {
+
+    final ZooReaderWriter zrw = context.getZooReaderWriter();
+    final ZooKeeper zk = zrw.getZooKeeper();
+    final String rootPath = context.getZooKeeperRoot();
+    final String tserverLockRoot = rootPath + Constants.ZTSERVERS;
+
+    log.debug("Looking for locks that may be from previous version in path: 
{}", tserverLockRoot);
+
+    AtomicInteger errorCount = new AtomicInteger(0);
+
+    List<Pair<HostAndPort,ServiceLock.ServiceLockPath>> hostsWithLocks =
+        gatherLocks(zk, tserverLockRoot);
+
+    // try a thrift call to the hosts - hosts running previous versions will 
fail the call
+    ThreadPoolExecutor lockCheckPool = 
ThreadPools.getServerThreadPools().createThreadPool(8, 64,
+        10, MINUTES, "update-lock-check", false);
+
+    hostsWithLocks.forEach(h -> lockCheckPool.execute(() -> {
+      HostAndPort host = h.getFirst();
+      ServiceLock.ServiceLockPath lockPath = h.getSecond();
+      try (TTransport transport = ThriftUtil.createTransport(host, context)) {
+        log.trace("found valid lock at: {}", lockPath);
+      } catch (TException ex) {
+        log.debug("Could not establish a connection for to service holding 
lock. Deleting node: {}",
+            lockPath, ex);
+        try {
+          zk.delete(lockPath.toString(), -1);
+          errorCount.incrementAndGet();
+        } catch (KeeperException.NoNodeException e) {
+          // ignore - node already gone.
+        } catch (InterruptedException | KeeperException e) {
+          // task will be terminated - ignore interrupt.
+          errorCount.incrementAndGet();
+        }
+      }
+    }));
+    lockCheckPool.shutdown();
+    try {
+      // wait to all to finish
+      if (!lockCheckPool.awaitTermination(10, MINUTES)) {
+        log.warn(
+            "Timed out waiting for lock check to finish - continuing, but 
tservers running prior versions may be present");
+      }
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException("interrupted validating service locks in 
ZooKeeper", ex);
+    }
+    log.debug("Completed tserver lock check.");
+  }
+
+  private List<Pair<HostAndPort,ServiceLock.ServiceLockPath>> 
gatherLocks(final ZooKeeper zk,
+      final String zkPathRoot) {
+    List<Pair<HostAndPort,ServiceLock.ServiceLockPath>> hosts = new 
ArrayList<>();
+    try {
+      ZKUtil.visitSubTreeDFS(zk, zkPathRoot, false, (rc, path, ctx, name) -> {
+        if (name.startsWith(ServiceLock.ZLOCK_PREFIX)) {

Review Comment:
   if `name` does not start with ZLOCK_PREFIX, shouldn't we delete it as well?



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/PreUpgradeValidation.java:
##########
@@ -117,4 +138,111 @@ private void fail(Exception e) {
     System.exit(1);
   }
 
+  private void validateTableLocks(final ServerContext context) {
+
+    final ZooReaderWriter zrw = context.getZooReaderWriter();
+    final ZooKeeper zk = zrw.getZooKeeper();
+    final String rootPath = context.getZooKeeperRoot();
+    final String tserverLockRoot = rootPath + Constants.ZTSERVERS;
+
+    log.debug("Looking for locks that may be from previous version in path: 
{}", tserverLockRoot);
+
+    AtomicInteger errorCount = new AtomicInteger(0);
+
+    List<Pair<HostAndPort,ServiceLock.ServiceLockPath>> hostsWithLocks =
+        gatherLocks(zk, tserverLockRoot);
+
+    // try a thrift call to the hosts - hosts running previous versions will 
fail the call
+    ThreadPoolExecutor lockCheckPool = 
ThreadPools.getServerThreadPools().createThreadPool(8, 64,
+        10, MINUTES, "update-lock-check", false);
+
+    hostsWithLocks.forEach(h -> lockCheckPool.execute(() -> {
+      HostAndPort host = h.getFirst();
+      ServiceLock.ServiceLockPath lockPath = h.getSecond();
+      try (TTransport transport = ThriftUtil.createTransport(host, context)) {
+        log.trace("found valid lock at: {}", lockPath);
+      } catch (TException ex) {
+        log.debug("Could not establish a connection for to service holding 
lock. Deleting node: {}",
+            lockPath, ex);
+        try {
+          zk.delete(lockPath.toString(), -1);
+          errorCount.incrementAndGet();
+        } catch (KeeperException.NoNodeException e) {
+          // ignore - node already gone.
+        } catch (InterruptedException | KeeperException e) {
+          // task will be terminated - ignore interrupt.
+          errorCount.incrementAndGet();
+        }
+      }
+    }));
+    lockCheckPool.shutdown();
+    try {
+      // wait to all to finish
+      if (!lockCheckPool.awaitTermination(10, MINUTES)) {
+        log.warn(
+            "Timed out waiting for lock check to finish - continuing, but 
tservers running prior versions may be present");
+      }
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();

Review Comment:
   Might want to review the Manager code where setManagerState() is called. 
It's called in `StatusThread` which catches `Exception`, logs it, and retries. 
I think you will want to check and reset the interrupt state on the thread.



##########
server/manager/src/main/java/org/apache/accumulo/manager/upgrade/PreUpgradeValidation.java:
##########
@@ -117,4 +138,111 @@ private void fail(Exception e) {
     System.exit(1);
   }
 
+  private void validateTableLocks(final ServerContext context) {
+
+    final ZooReaderWriter zrw = context.getZooReaderWriter();
+    final ZooKeeper zk = zrw.getZooKeeper();
+    final String rootPath = context.getZooKeeperRoot();
+    final String tserverLockRoot = rootPath + Constants.ZTSERVERS;
+
+    log.debug("Looking for locks that may be from previous version in path: 
{}", tserverLockRoot);
+
+    AtomicInteger errorCount = new AtomicInteger(0);

Review Comment:
   I don't think this is checked after the threads have finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to