This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.8
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.8 by this push:
     new 3db52e0  ACCUMULO-4782 switch session manager to a concurrent map 
(#382)
3db52e0 is described below

commit 3db52e0bff7d855d2b769966f9a9804837ad93a0
Author: Keith Turner <ke...@deenlo.com>
AuthorDate: Wed Feb 14 16:32:29 2018 -0500

    ACCUMULO-4782 switch session manager to a concurrent map (#382)
---
 .../apache/accumulo/tserver/session/Session.java   |   7 +-
 .../accumulo/tserver/session/SessionManager.java   | 218 +++++++++++++--------
 2 files changed, 147 insertions(+), 78 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index 1d2d88d..eed45cf 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -20,10 +20,15 @@ import 
org.apache.accumulo.core.security.thrift.TCredentials;
 import org.apache.accumulo.server.rpc.TServerUtils;
 
 public class Session {
+
+  enum State {
+    NEW, UNRESERVED, RESERVED, REMOVED
+  }
+
   public final String client;
   long lastAccessTime;
   public long startTime;
-  boolean reserved;
+  State state = State.NEW;
   private final TCredentials credentials;
 
   Session(TCredentials credentials) {
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index bf37855..b04a367 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver.session;
 
+import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -26,6 +27,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.accumulo.core.client.impl.Translator;
 import org.apache.accumulo.core.client.impl.Translators;
@@ -39,18 +42,20 @@ import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.server.util.time.SimpleTimer;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.scan.ScanTask;
+import org.apache.accumulo.tserver.session.Session.State;
 import org.apache.accumulo.tserver.tablet.ScanBatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
 public class SessionManager {
   private static final Logger log = 
LoggerFactory.getLogger(SessionManager.class);
 
-  private final SecureRandom random = new SecureRandom();
-  private final Map<Long,Session> sessions = new HashMap<>();
+  private final SecureRandom random;
+  private final ConcurrentMap<Long,Session> sessions = new 
ConcurrentHashMap<>();
   private final long maxIdle;
   private final long maxUpdateIdle;
   private final List<Session> idleSessions = new ArrayList<>();
@@ -62,6 +67,16 @@ public class SessionManager {
     maxUpdateIdle = 
conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
     maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 
+    SecureRandom sr;
+    try {
+      // This is faster than the default secure random which uses /dev/urandom
+      sr = SecureRandom.getInstance("SHA1PRNG");
+    } catch (NoSuchAlgorithmException e) {
+      log.debug("Unable to create SHA1PRNG secure random, using default");
+      sr = new SecureRandom();
+    }
+    random = sr;
+
     Runnable r = new Runnable() {
       @Override
       public void run() {
@@ -70,20 +85,21 @@ public class SessionManager {
     };
 
     SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000));
+
   }
 
-  public synchronized long createSession(Session session, boolean reserve) {
+  public long createSession(Session session, boolean reserve) {
     long sid = random.nextLong();
 
-    while (sessions.containsKey(sid)) {
-      sid = random.nextLong();
+    synchronized (session) {
+      Preconditions.checkArgument(session.state == State.NEW);
+      session.state = reserve ? State.RESERVED : State.UNRESERVED;
+      session.startTime = session.lastAccessTime = System.currentTimeMillis();
     }
 
-    sessions.put(sid, session);
-
-    session.reserved = reserve;
-
-    session.startTime = session.lastAccessTime = System.currentTimeMillis();
+    while (sessions.putIfAbsent(sid, session) != null) {
+      sid = random.nextLong();
+    }
 
     return sid;
   }
@@ -96,56 +112,83 @@ public class SessionManager {
    * while a session is reserved, it cannot be canceled or removed
    */
 
-  public synchronized Session reserveSession(long sessionId) {
+  public Session reserveSession(long sessionId) {
     Session session = sessions.get(sessionId);
     if (session != null) {
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
+      synchronized (session) {
+        if (session.state == State.RESERVED)
+          throw new IllegalStateException("Attempted to reserved session that 
is already reserved " + sessionId);
+        if (session.state == State.REMOVED)
+          return null;
+        session.state = State.RESERVED;
+      }
     }
 
     return session;
 
   }
 
-  public synchronized Session reserveSession(long sessionId, boolean wait) {
+  public Session reserveSession(long sessionId, boolean wait) {
     Session session = sessions.get(sessionId);
+
     if (session != null) {
-      while (wait && session.reserved) {
-        try {
-          wait(1000);
-        } catch (InterruptedException e) {
-          throw new RuntimeException();
+      synchronized (session) {
+
+        if (session.state == State.REMOVED)
+          return null;
+
+        while (wait && session.state == State.RESERVED) {
+          try {
+            session.wait(1000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException();
+          }
         }
-      }
 
-      if (session.reserved)
-        throw new IllegalStateException();
-      session.reserved = true;
+        if (session.state == State.RESERVED)
+          throw new IllegalStateException("Attempted to reserved session that 
is already reserved " + sessionId);
+        if (session.state == State.REMOVED)
+          return null;
+        session.state = State.RESERVED;
+      }
     }
 
     return session;
 
   }
 
-  public synchronized void unreserveSession(Session session) {
-    if (!session.reserved)
-      throw new IllegalStateException();
-    notifyAll();
-    session.reserved = false;
-    session.lastAccessTime = System.currentTimeMillis();
+  public void unreserveSession(Session session) {
+    synchronized (session) {
+      if (session.state == State.REMOVED)
+        return;
+      if (session.state != State.RESERVED)
+        throw new IllegalStateException("Cannon unreserve, state: " + 
session.state);
+      session.notifyAll();
+      session.state = State.UNRESERVED;
+      session.lastAccessTime = System.currentTimeMillis();
+    }
   }
 
-  public synchronized void unreserveSession(long sessionId) {
+  public void unreserveSession(long sessionId) {
     Session session = getSession(sessionId);
-    if (session != null)
+    if (session != null) {
       unreserveSession(session);
+    }
+
   }
 
-  public synchronized Session getSession(long sessionId) {
+  public Session getSession(long sessionId) {
     Session session = sessions.get(sessionId);
-    if (session != null)
-      session.lastAccessTime = System.currentTimeMillis();
+
+    if (session != null) {
+      synchronized (session) {
+        if (session.state == State.REMOVED) {
+          return null;
+        }
+        session.lastAccessTime = System.currentTimeMillis();
+      }
+    }
+
     return session;
   }
 
@@ -154,35 +197,47 @@ public class SessionManager {
   }
 
   public Session removeSession(long sessionId, boolean unreserve) {
-    Session session = null;
-    synchronized (this) {
-      session = sessions.remove(sessionId);
-      if (unreserve && session != null)
-        unreserveSession(session);
-    }
 
-    // do clean up out side of lock..
-    if (session != null)
-      session.cleanup();
+    Session session = sessions.remove(sessionId);
+    if (session != null) {
+      boolean doCleanup = false;
+      synchronized (session) {
+        if (session.state != State.REMOVED) {
+          if (unreserve) {
+            unreserveSession(session);
+          }
+          doCleanup = true;
+          session.state = State.REMOVED;
+        }
+      }
+
+      if (doCleanup) {
+        session.cleanup();
+      }
+    }
 
     return session;
   }
 
   private void sweep(final long maxIdle, final long maxUpdateIdle) {
     List<Session> sessionsToCleanup = new ArrayList<>();
-    synchronized (this) {
-      Iterator<Session> iter = sessions.values().iterator();
-      while (iter.hasNext()) {
-        Session session = iter.next();
-        long configuredIdle = maxIdle;
-        if (session instanceof UpdateSession) {
-          configuredIdle = maxUpdateIdle;
-        }
-        long idleTime = System.currentTimeMillis() - session.lastAccessTime;
-        if (idleTime > configuredIdle && !session.reserved) {
-          log.info("Closing idle session from user=" + session.getUser() + ", 
client=" + session.client + ", idle=" + idleTime + "ms");
-          iter.remove();
-          sessionsToCleanup.add(session);
+
+    Iterator<Session> iter = sessions.values().iterator();
+    while (iter.hasNext()) {
+      Session session = iter.next();
+      synchronized (session) {
+        if (session.state == State.UNRESERVED) {
+          long configuredIdle = maxIdle;
+          if (session instanceof UpdateSession) {
+            configuredIdle = maxUpdateIdle;
+          }
+          long idleTime = System.currentTimeMillis() - session.lastAccessTime;
+          if (idleTime > configuredIdle) {
+            log.info("Closing idle session from user=" + session.getUser() + 
", client=" + session.client + ", idle=" + idleTime + "ms");
+            iter.remove();
+            sessionsToCleanup.add(session);
+            session.state = State.REMOVED;
+          }
         }
       }
     }
@@ -190,39 +245,47 @@ public class SessionManager {
     // do clean up outside of lock for TabletServer in a synchronized block 
for simplicity vice a synchronized list
 
     synchronized (idleSessions) {
-
       sessionsToCleanup.addAll(idleSessions);
-
       idleSessions.clear();
+    }
 
-      // perform cleanup for all of the sessions
-      for (Session session : sessionsToCleanup) {
-        if (!session.cleanup())
+    // perform cleanup for all of the sessions
+    for (Session session : sessionsToCleanup) {
+      if (!session.cleanup())
+        synchronized (idleSessions) {
           idleSessions.add(session);
-      }
+        }
     }
+
   }
 
-  public synchronized void removeIfNotAccessed(final long sessionId, final 
long delay) {
+  public void removeIfNotAccessed(final long sessionId, final long delay) {
     Session session = sessions.get(sessionId);
     if (session != null) {
-      final long removeTime = session.lastAccessTime;
+      long tmp;
+      synchronized (session) {
+        tmp = session.lastAccessTime;
+      }
+      final long removeTime = tmp;
       TimerTask r = new TimerTask() {
         @Override
         public void run() {
-          Session sessionToCleanup = null;
-          synchronized (SessionManager.this) {
-            Session session2 = sessions.get(sessionId);
-            if (session2 != null && session2.lastAccessTime == removeTime && 
!session2.reserved) {
+          Session session2 = sessions.get(sessionId);
+          if (session2 != null) {
+            boolean shouldRemove = false;
+            synchronized (session2) {
+              if (session2.lastAccessTime == removeTime && session2.state == 
State.UNRESERVED) {
+                session2.state = State.REMOVED;
+                shouldRemove = true;
+              }
+            }
+
+            if (shouldRemove) {
               log.info("Closing not accessed session from user=" + 
session2.getUser() + ", client=" + session2.client + ", duration=" + delay + 
"ms");
               sessions.remove(sessionId);
-              sessionToCleanup = session2;
+              session2.cleanup();
             }
           }
-
-          // call clean up outside of lock
-          if (sessionToCleanup != null)
-            sessionToCleanup.cleanup();
         }
       };
 
@@ -230,7 +293,7 @@ public class SessionManager {
     }
   }
 
-  public synchronized Map<String,MapCounter<ScanRunState>> 
getActiveScansPerTable() {
+  public Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() {
     Map<String,MapCounter<ScanRunState>> counts = new HashMap<>();
     Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
 
@@ -243,9 +306,10 @@ public class SessionManager {
       }
     }
 
-    for (Entry<Long,Session> entry : sessions.entrySet()) {
+    for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), 
copiedIdleSessions)) {
 
       Session session = entry.getValue();
+
       @SuppressWarnings("rawtypes")
       ScanTask nbt = null;
       String tableID = null;
@@ -280,7 +344,7 @@ public class SessionManager {
     return counts;
   }
 
-  public synchronized List<ActiveScan> getActiveScans() {
+  public List<ActiveScan> getActiveScans() {
 
     final List<ActiveScan> activeScans = new ArrayList<>();
     final long ct = System.currentTimeMillis();

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to