This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1.8 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.8 by this push: new 3db52e0 ACCUMULO-4782 switch session manager to a concurrent map (#382) 3db52e0 is described below commit 3db52e0bff7d855d2b769966f9a9804837ad93a0 Author: Keith Turner <ke...@deenlo.com> AuthorDate: Wed Feb 14 16:32:29 2018 -0500 ACCUMULO-4782 switch session manager to a concurrent map (#382) --- .../apache/accumulo/tserver/session/Session.java | 7 +- .../accumulo/tserver/session/SessionManager.java | 218 +++++++++++++-------- 2 files changed, 147 insertions(+), 78 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java index 1d2d88d..eed45cf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java @@ -20,10 +20,15 @@ import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.server.rpc.TServerUtils; public class Session { + + enum State { + NEW, UNRESERVED, RESERVED, REMOVED + } + public final String client; long lastAccessTime; public long startTime; - boolean reserved; + State state = State.NEW; private final TCredentials credentials; Session(TCredentials credentials) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index bf37855..b04a367 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.tserver.session; +import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; import java.util.HashMap; @@ -26,6 +27,8 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.Translators; @@ -39,18 +42,20 @@ import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.accumulo.tserver.scan.ScanRunState; import org.apache.accumulo.tserver.scan.ScanTask; +import org.apache.accumulo.tserver.session.Session.State; import org.apache.accumulo.tserver.tablet.ScanBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; public class SessionManager { private static final Logger log = LoggerFactory.getLogger(SessionManager.class); - private final SecureRandom random = new SecureRandom(); - private final Map<Long,Session> sessions = new HashMap<>(); + private final SecureRandom random; + private final ConcurrentMap<Long,Session> sessions = new ConcurrentHashMap<>(); private final long maxIdle; private final long maxUpdateIdle; private final List<Session> idleSessions = new ArrayList<>(); @@ -62,6 +67,16 @@ public class SessionManager { maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + SecureRandom sr; + try { + // This is faster than the default secure random which uses /dev/urandom + sr = SecureRandom.getInstance("SHA1PRNG"); + } catch (NoSuchAlgorithmException e) { + log.debug("Unable to create SHA1PRNG secure random, using default"); + sr = new SecureRandom(); + } + random = sr; + Runnable r = new Runnable() { @Override public void run() { @@ -70,20 +85,21 @@ public class SessionManager { }; SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000)); + } - public synchronized long createSession(Session session, boolean reserve) { + public long createSession(Session session, boolean reserve) { long sid = random.nextLong(); - while (sessions.containsKey(sid)) { - sid = random.nextLong(); + synchronized (session) { + Preconditions.checkArgument(session.state == State.NEW); + session.state = reserve ? State.RESERVED : State.UNRESERVED; + session.startTime = session.lastAccessTime = System.currentTimeMillis(); } - sessions.put(sid, session); - - session.reserved = reserve; - - session.startTime = session.lastAccessTime = System.currentTimeMillis(); + while (sessions.putIfAbsent(sid, session) != null) { + sid = random.nextLong(); + } return sid; } @@ -96,56 +112,83 @@ public class SessionManager { * while a session is reserved, it cannot be canceled or removed */ - public synchronized Session reserveSession(long sessionId) { + public Session reserveSession(long sessionId) { Session session = sessions.get(sessionId); if (session != null) { - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; + synchronized (session) { + if (session.state == State.RESERVED) + throw new IllegalStateException("Attempted to reserved session that is already reserved " + sessionId); + if (session.state == State.REMOVED) + return null; + session.state = State.RESERVED; + } } return session; } - public synchronized Session reserveSession(long sessionId, boolean wait) { + public Session reserveSession(long sessionId, boolean wait) { Session session = sessions.get(sessionId); + if (session != null) { - while (wait && session.reserved) { - try { - wait(1000); - } catch (InterruptedException e) { - throw new RuntimeException(); + synchronized (session) { + + if (session.state == State.REMOVED) + return null; + + while (wait && session.state == State.RESERVED) { + try { + session.wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(); + } } - } - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; + if (session.state == State.RESERVED) + throw new IllegalStateException("Attempted to reserved session that is already reserved " + sessionId); + if (session.state == State.REMOVED) + return null; + session.state = State.RESERVED; + } } return session; } - public synchronized void unreserveSession(Session session) { - if (!session.reserved) - throw new IllegalStateException(); - notifyAll(); - session.reserved = false; - session.lastAccessTime = System.currentTimeMillis(); + public void unreserveSession(Session session) { + synchronized (session) { + if (session.state == State.REMOVED) + return; + if (session.state != State.RESERVED) + throw new IllegalStateException("Cannon unreserve, state: " + session.state); + session.notifyAll(); + session.state = State.UNRESERVED; + session.lastAccessTime = System.currentTimeMillis(); + } } - public synchronized void unreserveSession(long sessionId) { + public void unreserveSession(long sessionId) { Session session = getSession(sessionId); - if (session != null) + if (session != null) { unreserveSession(session); + } + } - public synchronized Session getSession(long sessionId) { + public Session getSession(long sessionId) { Session session = sessions.get(sessionId); - if (session != null) - session.lastAccessTime = System.currentTimeMillis(); + + if (session != null) { + synchronized (session) { + if (session.state == State.REMOVED) { + return null; + } + session.lastAccessTime = System.currentTimeMillis(); + } + } + return session; } @@ -154,35 +197,47 @@ public class SessionManager { } public Session removeSession(long sessionId, boolean unreserve) { - Session session = null; - synchronized (this) { - session = sessions.remove(sessionId); - if (unreserve && session != null) - unreserveSession(session); - } - // do clean up out side of lock.. - if (session != null) - session.cleanup(); + Session session = sessions.remove(sessionId); + if (session != null) { + boolean doCleanup = false; + synchronized (session) { + if (session.state != State.REMOVED) { + if (unreserve) { + unreserveSession(session); + } + doCleanup = true; + session.state = State.REMOVED; + } + } + + if (doCleanup) { + session.cleanup(); + } + } return session; } private void sweep(final long maxIdle, final long maxUpdateIdle) { List<Session> sessionsToCleanup = new ArrayList<>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long configuredIdle = maxIdle; - if (session instanceof UpdateSession) { - configuredIdle = maxUpdateIdle; - } - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > configuredIdle && !session.reserved) { - log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); - iter.remove(); - sessionsToCleanup.add(session); + + Iterator<Session> iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + synchronized (session) { + if (session.state == State.UNRESERVED) { + long configuredIdle = maxIdle; + if (session instanceof UpdateSession) { + configuredIdle = maxUpdateIdle; + } + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > configuredIdle) { + log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); + iter.remove(); + sessionsToCleanup.add(session); + session.state = State.REMOVED; + } } } } @@ -190,39 +245,47 @@ public class SessionManager { // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list synchronized (idleSessions) { - sessionsToCleanup.addAll(idleSessions); - idleSessions.clear(); + } - // perform cleanup for all of the sessions - for (Session session : sessionsToCleanup) { - if (!session.cleanup()) + // perform cleanup for all of the sessions + for (Session session : sessionsToCleanup) { + if (!session.cleanup()) + synchronized (idleSessions) { idleSessions.add(session); - } + } } + } - public synchronized void removeIfNotAccessed(final long sessionId, final long delay) { + public void removeIfNotAccessed(final long sessionId, final long delay) { Session session = sessions.get(sessionId); if (session != null) { - final long removeTime = session.lastAccessTime; + long tmp; + synchronized (session) { + tmp = session.lastAccessTime; + } + final long removeTime = tmp; TimerTask r = new TimerTask() { @Override public void run() { - Session sessionToCleanup = null; - synchronized (SessionManager.this) { - Session session2 = sessions.get(sessionId); - if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { + Session session2 = sessions.get(sessionId); + if (session2 != null) { + boolean shouldRemove = false; + synchronized (session2) { + if (session2.lastAccessTime == removeTime && session2.state == State.UNRESERVED) { + session2.state = State.REMOVED; + shouldRemove = true; + } + } + + if (shouldRemove) { log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + "ms"); sessions.remove(sessionId); - sessionToCleanup = session2; + session2.cleanup(); } } - - // call clean up outside of lock - if (sessionToCleanup != null) - sessionToCleanup.cleanup(); } }; @@ -230,7 +293,7 @@ public class SessionManager { } } - public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { + public Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { Map<String,MapCounter<ScanRunState>> counts = new HashMap<>(); Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); @@ -243,9 +306,10 @@ public class SessionManager { } } - for (Entry<Long,Session> entry : sessions.entrySet()) { + for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { Session session = entry.getValue(); + @SuppressWarnings("rawtypes") ScanTask nbt = null; String tableID = null; @@ -280,7 +344,7 @@ public class SessionManager { return counts; } - public synchronized List<ActiveScan> getActiveScans() { + public List<ActiveScan> getActiveScans() { final List<ActiveScan> activeScans = new ArrayList<>(); final long ct = System.currentTimeMillis(); -- To stop receiving notification emails like this one, please contact ktur...@apache.org.