This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new de02f025ce ensures sessions are always cleaned up (#3569) de02f025ce is described below commit de02f025ce5b25acfb86dbad9e9b96c7115d653f Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Jul 10 11:27:46 2023 -0400 ensures sessions are always cleaned up (#3569) This is a potential fix for #3512. It ensures that when a sesssions cleanup method returns false that cleanup will be attempted again later. --- .../accumulo/tserver/session/ScanSession.java | 4 ++ .../apache/accumulo/tserver/session/Session.java | 6 ++ .../accumulo/tserver/session/SessionManager.java | 84 +++++++++++++--------- 3 files changed, 62 insertions(+), 32 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java index 3217fe1b8f..0fefcc1327 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java @@ -189,4 +189,8 @@ public abstract class ScanSession extends Session implements ScanInfo { return true; } + @Override + public String toString() { + return super.toString() + " tableId:" + getTableId(); + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index b1c14ca6e8..6e49833729 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -49,4 +49,10 @@ public class Session { public boolean cleanup() { return true; } + + @Override + public String toString() { + return getClass().getSimpleName() + " " + state + " startTime:" + startTime + " lastAccessTime:" + + lastAccessTime + " client:" + client; + } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index b8d605ebd8..f0f8a5de2a 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -18,15 +18,22 @@ */ package org.apache.accumulo.tserver.session; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledFuture; @@ -42,6 +49,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; import org.apache.accumulo.core.tabletserver.thrift.ScanState; import org.apache.accumulo.core.tabletserver.thrift.ScanType; import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.tserver.scan.ScanRunState; @@ -61,7 +69,7 @@ public class SessionManager { private final ConcurrentMap<Long,Session> sessions = new ConcurrentHashMap<>(); private final long maxIdle; private final long maxUpdateIdle; - private final List<Session> idleSessions = new ArrayList<>(); + private final BlockingQueue<Session> deferredCleanupQueue = new ArrayBlockingQueue<>(5000); private final Long expiredSessionMarker = (long) -1; private final AccumuloConfiguration aconf; private final ServerContext ctx; @@ -209,15 +217,39 @@ public class SessionManager { } if (doCleanup) { - session.cleanup(); + cleanup(session); } } return session; } + private void cleanup(Session session) { + if (!session.cleanup()) { + var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) + .incrementBy(25, MILLISECONDS).maxWait(5, SECONDS).backOffFactor(1.5) + .logInterval(1, MINUTES).createRetry(); + + while (!deferredCleanupQueue.offer(session)) { + if (session.cleanup()) { + break; + } + + try { + retry.waitForNextAttempt(log, "Unable to cleanup session or defer cleanup " + session); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + retry.logRetry(log, "Unable to cleanup session or defer cleanup " + session); + } + + retry.logCompletion(log, "Cleaned up session or deferred cleanup " + session); + } + } + private void sweep(final long maxIdle, final long maxUpdateIdle) { - List<Session> sessionsToCleanup = new ArrayList<>(); + List<Session> sessionsToCleanup = new LinkedList<>(); Iterator<Session> iter = sessions.values().iterator(); while (iter.hasNext()) { Session session = iter.next(); @@ -239,22 +271,14 @@ public class SessionManager { } } - // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a - // synchronized list + // do clean up outside of lock for TabletServer + deferredCleanupQueue.drainTo(sessionsToCleanup); - synchronized (idleSessions) { - sessionsToCleanup.addAll(idleSessions); - idleSessions.clear(); - } + // make a pass through and remove everything that can be cleaned up before calling the + // cleanup(Session) method which may block when it can not clean up a session. + sessionsToCleanup.removeIf(Session::cleanup); - // perform cleanup for all of the sessions - for (Session session : sessionsToCleanup) { - if (!session.cleanup()) { - synchronized (idleSessions) { - idleSessions.add(session); - } - } - } + sessionsToCleanup.forEach(this::cleanup); } public void removeIfNotAccessed(final long sessionId, final long delay) { @@ -282,7 +306,7 @@ public class SessionManager { log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + "ms"); sessions.remove(sessionId); - session2.cleanup(); + cleanup(session2); } } } @@ -299,13 +323,11 @@ public class SessionManager { Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : deferredCleanupQueue) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); } List.of(sessions.entrySet(), copiedIdleSessions).forEach(set -> set.forEach(entry -> { @@ -341,13 +363,11 @@ public class SessionManager { final long ct = System.currentTimeMillis(); final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : deferredCleanupQueue) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); } List.of(sessions.entrySet(), copiedIdleSessions).forEach(s -> s.forEach(entry -> {