keith-turner closed pull request #385: ACCUMULO-4788
URL: https://github.com/apache/accumulo/pull/385
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
index f1ab501b0a..dcae49cca3 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportKey.java
@@ -86,6 +86,10 @@ public boolean equals(Object o) {
         && (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
   }
 
+  public final void precomputeHashCode() {
+    hashCode();
+  }
+
   @Override
   public int hashCode() {
     if (hash == -1)
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
index 3d36e694a6..c221607328 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java
@@ -41,6 +41,8 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 
 public class ThriftTransportPool {
   private static SecurityPermission TRANSPORT_POOL_PERMISSION = new 
SecurityPermission("transportPoolPermission");
@@ -48,7 +50,26 @@
   private static final Random random = new Random();
   private long killTime = 1000 * 3;
 
-  private Map<ThriftTransportKey,List<CachedConnection>> cache = new 
HashMap<>();
+  private static class CachedConnections {
+    LinkedList<CachedConnection> unreserved = new LinkedList<>();
+    Map<CachedTTransport,CachedConnection> reserved = new HashMap<>();
+
+    public CachedConnection reserveAny() {
+      if (unreserved.size() > 0) {
+        CachedConnection cachedConnection = unreserved.removeFirst();
+        cachedConnection.reserve();
+        reserved.put(cachedConnection.transport, cachedConnection);
+        if (log.isTraceEnabled()) {
+          log.trace("Using existing connection to {}", 
cachedConnection.transport.cacheKey);
+        }
+        return cachedConnection;
+      }
+
+      return null;
+    }
+  }
+
+  private Map<ThriftTransportKey,CachedConnections> cache = new HashMap<>();
   private Map<ThriftTransportKey,Long> errorCount = new HashMap<>();
   private Map<ThriftTransportKey,Long> errorTime = new HashMap<>();
   private Set<ThriftTransportKey> serversWarnedAbout = new HashSet<>();
@@ -66,15 +87,17 @@ public CachedConnection(CachedTTransport t) {
       this.transport = t;
     }
 
-    void setReserved(boolean reserved) {
-      this.transport.setReserved(reserved);
+    void reserve() {
+      Preconditions.checkState(!this.transport.reserved);
+      this.transport.setReserved(true);
     }
 
-    boolean isReserved() {
-      return this.transport.reserved;
+    void unreserve() {
+      Preconditions.checkState(this.transport.reserved);
+      this.transport.setReserved(false);
     }
 
-    CachedTTransport transport;
+    final CachedTTransport transport;
 
     long lastReturnTime;
   }
@@ -98,20 +121,18 @@ private void closeConnections() {
         ArrayList<CachedConnection> connectionsToClose = new ArrayList<>();
 
         synchronized (pool) {
-          for (List<CachedConnection> ccl : pool.getCache().values()) {
-            Iterator<CachedConnection> iter = ccl.iterator();
+          for (CachedConnections cachedConns : pool.getCache().values()) {
+            Iterator<CachedConnection> iter = 
cachedConns.unreserved.iterator();
             while (iter.hasNext()) {
               CachedConnection cachedConnection = iter.next();
 
-              if (!cachedConnection.isReserved() && System.currentTimeMillis() 
- cachedConnection.lastReturnTime > pool.killTime) {
+              if (System.currentTimeMillis() - cachedConnection.lastReturnTime 
> pool.killTime) {
                 connectionsToClose.add(cachedConnection);
                 iter.remove();
               }
             }
-          }
 
-          for (List<CachedConnection> ccl : pool.getCache().values()) {
-            for (CachedConnection cachedConnection : ccl) {
+            for (CachedConnection cachedConnection : 
cachedConns.reserved.values()) {
               cachedConnection.transport.checkForStuckIO(STUCK_THRESHOLD);
             }
           }
@@ -389,21 +410,21 @@ public TTransport getTransport(HostAndPort location, long 
milliseconds, ClientCo
   }
 
   private TTransport getTransport(ThriftTransportKey cacheKey) throws 
TTransportException {
+    // compute hash code outside of lock, this lowers the time the lock is held
+    cacheKey.precomputeHashCode();
     synchronized (this) {
       // atomically reserve location if it exist in cache
-      List<CachedConnection> ccl = getCache().get(cacheKey);
+      CachedConnections ccl = getCache().get(cacheKey);
 
       if (ccl == null) {
-        ccl = new LinkedList<>();
+        ccl = new CachedConnections();
         getCache().put(cacheKey, ccl);
       }
 
-      for (CachedConnection cachedConnection : ccl) {
-        if (!cachedConnection.isReserved()) {
-          cachedConnection.setReserved(true);
-          log.trace("Using existing connection to {}", cacheKey.getServer());
-          return cachedConnection.transport;
-        }
+      CachedConnection cachedConnection = ccl.reserveAny();
+      if (cachedConnection != null) {
+        log.trace("Using existing connection to {}", cacheKey.getServer());
+        return cachedConnection.transport;
       }
     }
 
@@ -428,13 +449,11 @@ private TTransport getTransport(ThriftTransportKey 
cacheKey) throws TTransportEx
           Collections.shuffle(cachedServers, random);
 
           for (ThriftTransportKey ttk : cachedServers) {
-            for (CachedConnection cachedConnection : getCache().get(ttk)) {
-              if (!cachedConnection.isReserved()) {
-                cachedConnection.setReserved(true);
-                final String serverAddr = ttk.getServer().toString();
-                log.trace("Using existing connection to {}", serverAddr);
-                return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
-              }
+            CachedConnection cachedConnection = 
getCache().get(ttk).reserveAny();
+            if (cachedConnection != null) {
+              final String serverAddr = ttk.getServer().toString();
+              log.trace("Using existing connection to {}", serverAddr);
+              return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
             }
           }
         }
@@ -448,15 +467,12 @@ private TTransport getTransport(ThriftTransportKey 
cacheKey) throws TTransportEx
 
       if (preferCachedConnection) {
         synchronized (this) {
-          List<CachedConnection> cachedConnList = getCache().get(ttk);
-          if (cachedConnList != null) {
-            for (CachedConnection cachedConnection : cachedConnList) {
-              if (!cachedConnection.isReserved()) {
-                cachedConnection.setReserved(true);
-                final String serverAddr = ttk.getServer().toString();
-                log.trace("Using existing connection to {} timeout {}", 
serverAddr, ttk.getTimeout());
-                return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
-              }
+          CachedConnections cachedConns = getCache().get(ttk);
+          if (cachedConns != null) {
+            CachedConnection cachedConnection = cachedConns.reserveAny();
+            if (cachedConnection != null) {
+              final String serverAddr = ttk.getServer().toString();
+              return new Pair<String,TTransport>(serverAddr, 
cachedConnection.transport);
             }
           }
         }
@@ -483,18 +499,18 @@ private TTransport createNewTransport(ThriftTransportKey 
cacheKey) throws TTrans
     CachedTTransport tsc = new CachedTTransport(transport, cacheKey);
 
     CachedConnection cc = new CachedConnection(tsc);
-    cc.setReserved(true);
+    cc.reserve();
 
     try {
       synchronized (this) {
-        List<CachedConnection> ccl = getCache().get(cacheKey);
+        CachedConnections cachedConns = getCache().get(cacheKey);
 
-        if (ccl == null) {
-          ccl = new LinkedList<>();
-          getCache().put(cacheKey, ccl);
+        if (cachedConns == null) {
+          cachedConns = new CachedConnections();
+          getCache().put(cacheKey, cachedConns);
         }
 
-        ccl.add(cc);
+        cachedConns.reserved.put(cc.transport, cc);
       }
     } catch (TransportPoolShutdownException e) {
       cc.transport.close();
@@ -514,13 +530,12 @@ public void returnTransport(TTransport tsc) {
     ArrayList<CachedConnection> closeList = new ArrayList<>();
 
     synchronized (this) {
-      List<CachedConnection> ccl = getCache().get(ctsc.getCacheKey());
-      for (Iterator<CachedConnection> iterator = ccl.iterator(); 
iterator.hasNext();) {
-        CachedConnection cachedConnection = iterator.next();
-        if (cachedConnection.transport == tsc) {
+      CachedConnections cachedConns = getCache().get(ctsc.getCacheKey());
+      if (cachedConns != null) {
+        CachedConnection cachedConnection = cachedConns.reserved.remove(ctsc);
+        if (cachedConnection != null) {
           if (ctsc.sawError) {
             closeList.add(cachedConnection);
-            iterator.remove();
 
             log.trace("Returned connection had error {}", ctsc.getCacheKey());
 
@@ -540,27 +555,23 @@ public void returnTransport(TTransport tsc) {
               serversWarnedAbout.add(ctsc.getCacheKey());
             }
 
-            cachedConnection.setReserved(false);
+            cachedConnection.unreserve();
+
+            // remove all unreserved cached connection when a sever has an 
error, not just the connection that was returned
+            closeList.addAll(cachedConns.unreserved);
+            cachedConns.unreserved.clear();
 
           } else {
             log.trace("Returned connection {} ioCount: {}", 
ctsc.getCacheKey(), cachedConnection.transport.ioCount);
 
             cachedConnection.lastReturnTime = System.currentTimeMillis();
-            cachedConnection.setReserved(false);
+            cachedConnection.unreserve();
+            // Calling addFirst to use unreserved as LIFO queue. Using LIFO 
ensures that when the # of pooled connections exceeds the working set size that 
the
+            // idle times at the end of the list grow. The connections with 
large idle times will be cleaned up. Using a FIFO could continually reset the 
idle
+            // times of all connections, even when there are more than the 
working set size.
+            cachedConns.unreserved.addFirst(cachedConnection);
           }
           existInCache = true;
-          break;
-        }
-      }
-
-      // remove all unreserved cached connection when a sever has an error, 
not just the connection that was returned
-      if (ctsc.sawError) {
-        for (Iterator<CachedConnection> iterator = ccl.iterator(); 
iterator.hasNext();) {
-          CachedConnection cachedConnection = iterator.next();
-          if (!cachedConnection.isReserved()) {
-            closeList.add(cachedConnection);
-            iterator.remove();
-          }
         }
       }
     }
@@ -616,10 +627,8 @@ public void shutdown() {
         return;
 
       // close any connections in the pool... even ones that are in use
-      for (List<CachedConnection> ccl : getCache().values()) {
-        Iterator<CachedConnection> iter = ccl.iterator();
-        while (iter.hasNext()) {
-          CachedConnection cc = iter.next();
+      for (CachedConnections cachedConn : getCache().values()) {
+        for (CachedConnection cc : 
Iterables.concat(cachedConn.reserved.values(), cachedConn.unreserved)) {
           try {
             cc.transport.close();
           } catch (Exception e) {
@@ -639,7 +648,7 @@ public void shutdown() {
     }
   }
 
-  private Map<ThriftTransportKey,List<CachedConnection>> getCache() {
+  private Map<ThriftTransportKey,CachedConnections> getCache() {
     if (cache == null)
       throw new TransportPoolShutdownException();
     return cache;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to