keith-turner closed pull request #383: ACCUMULO-4809 Avoid blocking during session cleanup URL: https://github.com/apache/accumulo/pull/383
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 78c351ded7..a03c0321b8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -169,36 +169,38 @@ public Session removeSession(long sessionId, boolean unreserve) { } private void sweep(final long maxIdle, final long maxUpdateIdle) { - List<Session> sessionsToCleanup = new ArrayList<>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long configuredIdle = maxIdle; - if (session instanceof UpdateSession) { - configuredIdle = maxUpdateIdle; - } - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > configuredIdle && !session.reserved) { - log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); - iter.remove(); - sessionsToCleanup.add(session); + // In Accumulo's current code only one thread will ever call this method. However if multiple threads called this method concurrently it could result in + // sessions being lost. This method synchronizes on idleSessions to prevent the loss. Its not expected that anything else will synchronize on idleSessions. + synchronized (idleSessions) { + synchronized (this) { + Iterator<Session> iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + long configuredIdle = maxIdle; + if (session instanceof UpdateSession) { + configuredIdle = maxUpdateIdle; + } + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > configuredIdle && !session.reserved) { + log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); + iter.remove(); + idleSessions.add(session); + } } } - } - - // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list - synchronized (idleSessions) { + List<Session> sessionsToCleanup = new ArrayList<>(); - sessionsToCleanup.addAll(idleSessions); - - idleSessions.clear(); + // do clean up outside of lock for TabletServer + for (Session session : idleSessions) { + if (!session.cleanup()) { + sessionsToCleanup.add(session); + } + } - // perform cleanup for all of the sessions - for (Session session : sessionsToCleanup) { - if (!session.cleanup()) - idleSessions.add(session); + synchronized (this) { + idleSessions.clear(); + idleSessions.addAll(sessionsToCleanup); } } } @@ -234,16 +236,14 @@ public void run() { Map<String,MapCounter<ScanRunState>> counts = new HashMap<>(); Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : idleSessions) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); } - for (Entry<Long,Session> entry : sessions.entrySet()) { + for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { Session session = entry.getValue(); @SuppressWarnings("rawtypes") @@ -286,13 +286,11 @@ public void run() { final long ct = System.currentTimeMillis(); final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } + /** + * Add sessions so that get the list returned in the active scans call + */ + for (Session session : idleSessions) { + copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); } for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services