This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new de02f025ce ensures sessions are always cleaned up (#3569)
de02f025ce is described below

commit de02f025ce5b25acfb86dbad9e9b96c7115d653f
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Jul 10 11:27:46 2023 -0400

    ensures sessions are always cleaned up (#3569)
    
    This is a potential fix for #3512.  It ensures that when a sesssions
    cleanup method returns false that cleanup will be attempted again later.
---
 .../accumulo/tserver/session/ScanSession.java      |  4 ++
 .../apache/accumulo/tserver/session/Session.java   |  6 ++
 .../accumulo/tserver/session/SessionManager.java   | 84 +++++++++++++---------
 3 files changed, 62 insertions(+), 32 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index 3217fe1b8f..0fefcc1327 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -189,4 +189,8 @@ public abstract class ScanSession extends Session 
implements ScanInfo {
     return true;
   }
 
+  @Override
+  public String toString() {
+    return super.toString() + " tableId:" + getTableId();
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index b1c14ca6e8..6e49833729 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -49,4 +49,10 @@ public class Session {
   public boolean cleanup() {
     return true;
   }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + " " + state + " startTime:" + 
startTime + " lastAccessTime:"
+        + lastAccessTime + " client:" + client;
+  }
 }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index b8d605ebd8..f0f8a5de2a 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -18,15 +18,22 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledFuture;
@@ -42,6 +49,7 @@ import 
org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Retry;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.scan.ScanRunState;
@@ -61,7 +69,7 @@ public class SessionManager {
   private final ConcurrentMap<Long,Session> sessions = new 
ConcurrentHashMap<>();
   private final long maxIdle;
   private final long maxUpdateIdle;
-  private final List<Session> idleSessions = new ArrayList<>();
+  private final BlockingQueue<Session> deferredCleanupQueue = new 
ArrayBlockingQueue<>(5000);
   private final Long expiredSessionMarker = (long) -1;
   private final AccumuloConfiguration aconf;
   private final ServerContext ctx;
@@ -209,15 +217,39 @@ public class SessionManager {
       }
 
       if (doCleanup) {
-        session.cleanup();
+        cleanup(session);
       }
     }
 
     return session;
   }
 
+  private void cleanup(Session session) {
+    if (!session.cleanup()) {
+      var retry = Retry.builder().infiniteRetries().retryAfter(25, 
MILLISECONDS)
+          .incrementBy(25, MILLISECONDS).maxWait(5, SECONDS).backOffFactor(1.5)
+          .logInterval(1, MINUTES).createRetry();
+
+      while (!deferredCleanupQueue.offer(session)) {
+        if (session.cleanup()) {
+          break;
+        }
+
+        try {
+          retry.waitForNextAttempt(log, "Unable to cleanup session or defer 
cleanup " + session);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+        retry.logRetry(log, "Unable to cleanup session or defer cleanup " + 
session);
+      }
+
+      retry.logCompletion(log, "Cleaned up session or deferred cleanup " + 
session);
+    }
+  }
+
   private void sweep(final long maxIdle, final long maxUpdateIdle) {
-    List<Session> sessionsToCleanup = new ArrayList<>();
+    List<Session> sessionsToCleanup = new LinkedList<>();
     Iterator<Session> iter = sessions.values().iterator();
     while (iter.hasNext()) {
       Session session = iter.next();
@@ -239,22 +271,14 @@ public class SessionManager {
       }
     }
 
-    // do clean up outside of lock for TabletServer in a synchronized block 
for simplicity vice a
-    // synchronized list
+    // do clean up outside of lock for TabletServer
+    deferredCleanupQueue.drainTo(sessionsToCleanup);
 
-    synchronized (idleSessions) {
-      sessionsToCleanup.addAll(idleSessions);
-      idleSessions.clear();
-    }
+    // make a pass through and remove everything that can be cleaned up before 
calling the
+    // cleanup(Session) method which may block when it can not clean up a 
session.
+    sessionsToCleanup.removeIf(Session::cleanup);
 
-    // perform cleanup for all of the sessions
-    for (Session session : sessionsToCleanup) {
-      if (!session.cleanup()) {
-        synchronized (idleSessions) {
-          idleSessions.add(session);
-        }
-      }
-    }
+    sessionsToCleanup.forEach(this::cleanup);
   }
 
   public void removeIfNotAccessed(final long sessionId, final long delay) {
@@ -282,7 +306,7 @@ public class SessionManager {
               log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client="
                   + session2.client + ", duration=" + delay + "ms");
               sessions.remove(sessionId);
-              session2.cleanup();
+              cleanup(session2);
             }
           }
         }
@@ -299,13 +323,11 @@ public class SessionManager {
 
     Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
 
-    synchronized (idleSessions) {
-      /**
-       * Add sessions so that get the list returned in the active scans call
-       */
-      for (Session session : idleSessions) {
-        copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
-      }
+    /**
+     * Add sessions so that get the list returned in the active scans call
+     */
+    for (Session session : deferredCleanupQueue) {
+      copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
     }
 
     List.of(sessions.entrySet(), copiedIdleSessions).forEach(set -> 
set.forEach(entry -> {
@@ -341,13 +363,11 @@ public class SessionManager {
     final long ct = System.currentTimeMillis();
     final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
 
-    synchronized (idleSessions) {
-      /**
-       * Add sessions so that get the list returned in the active scans call
-       */
-      for (Session session : idleSessions) {
-        copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
-      }
+    /**
+     * Add sessions so that get the list returned in the active scans call
+     */
+    for (Session session : deferredCleanupQueue) {
+      copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, 
session));
     }
 
     List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> 
s.forEach(entry -> {

Reply via email to