Repository: accumulo
Updated Branches:
  refs/heads/master 642b7a95f -> aaaca5240


ACCUMULO-3999 use a threadpool to fetch status from tablet servers


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/aaaca524
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/aaaca524
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/aaaca524

Branch: refs/heads/master
Commit: aaaca5240b13f4ba029c8202a3c2f446f81fb8c5
Parents: 642b7a9
Author: Eric C. Newton <eric.new...@gmail.com>
Authored: Mon Oct 19 10:38:51 2015 -0400
Committer: Eric C. Newton <eric.new...@gmail.com>
Committed: Mon Oct 19 10:38:51 2015 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 .../java/org/apache/accumulo/master/Master.java | 74 ++++++++++++--------
 2 files changed, 47 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/aaaca524/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 400577c..ab5c6d2 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -222,6 +222,8 @@ public enum Property {
       "Minimum number of threads dedicated to answering coordinator requests"),
   
MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
 "5s", PropertyType.TIMEDURATION,
       "The time between adjustments of the coordinator thread pool"),
+  MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", 
PropertyType.COUNT,
+      "The number of threads to use when fetching the tablet server status for 
balancing."),
 
   // properties that are specific to tablet server behavior
   TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this 
category affect the behavior of the tablet servers"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/aaaca524/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index ff4705e..152831d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -33,6 +33,8 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1049,40 +1051,54 @@ public class Master extends AccumuloServerContext 
implements LiveTServerSet.List
 
   private SortedMap<TServerInstance,TabletServerStatus> 
gatherTableInformation() {
     long start = System.currentTimeMillis();
-    SortedMap<TServerInstance,TabletServerStatus> result = new 
TreeMap<TServerInstance,TabletServerStatus>();
+    int threads = 
Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 
1);
+    ExecutorService tp = Executors.newFixedThreadPool(threads);
+    final SortedMap<TServerInstance,TabletServerStatus> result = new 
TreeMap<TServerInstance,TabletServerStatus>();
     Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
-    for (TServerInstance server : currentServers) {
-      try {
-        Thread t = Thread.currentThread();
-        String oldName = t.getName();
-        try {
-          t.setName("Getting status from " + server);
-          TServerConnection connection = tserverSet.getConnection(server);
-          if (connection == null)
-            throw new IOException("No connection to " + server);
-          TabletServerStatus status = connection.getTableMap(false);
-          result.put(server, status);
-        } finally {
-          t.setName(oldName);
-        }
-      } catch (Exception ex) {
-        log.error("unable to get tablet server status " + server + " " + 
ex.toString());
-        log.debug("unable to get tablet server status " + server, ex);
-        if (badServers.get(server).incrementAndGet() > MAX_BAD_STATUS_COUNT) {
-          log.warn("attempting to stop " + server);
+    for (TServerInstance serverInstance : currentServers) {
+      final TServerInstance server = serverInstance;
+      tp.submit(new Runnable() {
+        @Override
+        public void run() {
           try {
-            TServerConnection connection = tserverSet.getConnection(server);
-            if (connection != null) {
-              connection.halt(masterLock);
+            Thread t = Thread.currentThread();
+            String oldName = t.getName();
+            try {
+              t.setName("Getting status from " + server);
+              TServerConnection connection = tserverSet.getConnection(server);
+              if (connection == null)
+                throw new IOException("No connection to " + server);
+              TabletServerStatus status = connection.getTableMap(false);
+              result.put(server, status);
+            } finally {
+              t.setName(oldName);
+            }
+          } catch (Exception ex) {
+            log.error("unable to get tablet server status " + server + " " + 
ex.toString());
+            log.debug("unable to get tablet server status " + server, ex);
+            if (badServers.get(server).incrementAndGet() > 
MAX_BAD_STATUS_COUNT) {
+              log.warn("attempting to stop " + server);
+              try {
+                TServerConnection connection = 
tserverSet.getConnection(server);
+                if (connection != null) {
+                  connection.halt(masterLock);
+                }
+              } catch (TTransportException e) {
+                // ignore: it's probably down
+              } catch (Exception e) {
+                log.info("error talking to troublesome tablet server ", e);
+              }
+              badServers.remove(server);
             }
-          } catch (TTransportException e) {
-            // ignore: it's probably down
-          } catch (Exception e) {
-            log.info("error talking to troublesome tablet server ", e);
           }
-          badServers.remove(server);
         }
-      }
+      });
+    }
+    tp.shutdown();
+    try {
+      tp.awaitTermination(5, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      log.debug("Interrupted while fetching status");
     }
     synchronized (badServers) {
       badServers.keySet().retainAll(currentServers);

Reply via email to