keith-turner closed pull request #546: Fix multiple concurrency bugs in 
Master.gatherTableInformation()
URL: https://github.com/apache/accumulo/pull/546
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 1eed867d01..6a2024e63e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -340,8 +340,9 @@
   
MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time",
       "5s", PropertyType.TIMEDURATION,
       "The time between adjustments of the coordinator thread pool"),
-  MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "1", 
PropertyType.COUNT,
-      "The number of threads to use when fetching the tablet server status for 
balancing."),
+  MASTER_STATUS_THREAD_POOL_SIZE("master.status.threadpool.size", "0", 
PropertyType.COUNT,
+      "The number of threads to use when fetching the tablet server status for 
balancing.  Zero "
+          + "indicates an unlimited number of threads will be used."),
   MASTER_METADATA_SUSPENDABLE("master.metadata.suspendable", "false", 
PropertyType.BOOLEAN,
       "Allow tablets for the " + MetadataTable.NAME
           + " table to be suspended via table.suspend.duration."),
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java 
b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 9414b98543..2f124e350a 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -32,6 +32,7 @@
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -163,6 +164,7 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.Iterables;
 
 /**
@@ -1065,7 +1067,7 @@ public void run() {
     private long updateStatus()
         throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException {
       Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
-      tserverStatus = 
Collections.synchronizedSortedMap(gatherTableInformation(currentServers));
+      tserverStatus = gatherTableInformation(currentServers);
       checkForHeldServer(tserverStatus);
 
       if (!badServers.isEmpty()) {
@@ -1146,12 +1148,20 @@ private long balanceTablets() {
 
   private SortedMap<TServerInstance,TabletServerStatus> gatherTableInformation(
       Set<TServerInstance> currentServers) {
+    final long rpcTimeout = 
getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT);
+    int threads = 
getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE);
+    ExecutorService tp = threads == 0 ? Executors.newCachedThreadPool()
+        : Executors.newFixedThreadPool(threads);
     long start = System.currentTimeMillis();
-    int threads = 
Math.max(getConfiguration().getCount(Property.MASTER_STATUS_THREAD_POOL_SIZE), 
1);
-    ExecutorService tp = Executors.newFixedThreadPool(threads);
-    final SortedMap<TServerInstance,TabletServerStatus> result = new 
TreeMap<>();
+    final SortedMap<TServerInstance,TabletServerStatus> result = new 
ConcurrentSkipListMap<>();
     for (TServerInstance serverInstance : currentServers) {
       final TServerInstance server = serverInstance;
+      if (threads == 0) {
+        // Since an unbounded thread pool is being used, rate limit how fast 
task are added to the
+        // executor. This prevents the threads from growing large unless there 
are lots of
+        // unresponsive tservers.
+        sleepUninterruptibly(Math.max(1, rpcTimeout / 120_000), 
TimeUnit.MILLISECONDS);
+      }
       tp.submit(new Runnable() {
         @Override
         public void run() {
@@ -1191,18 +1201,24 @@ public void run() {
     }
     tp.shutdown();
     try {
-      
tp.awaitTermination(getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT)
 * 2,
-          TimeUnit.MILLISECONDS);
+      tp.awaitTermination(Math.max(10000, rpcTimeout / 3), 
TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       log.debug("Interrupted while fetching status");
     }
+
+    tp.shutdownNow();
+
+    // Threads may still modify map after shutdownNow is called, so create an 
immutable snapshot.
+    SortedMap<TServerInstance,TabletServerStatus> info = 
ImmutableSortedMap.copyOf(result);
+
     synchronized (badServers) {
       badServers.keySet().retainAll(currentServers);
-      badServers.keySet().removeAll(result.keySet());
+      badServers.keySet().removeAll(info.keySet());
     }
-    log.debug(String.format("Finished gathering information from %d servers in 
%.2f seconds",
-        result.size(), (System.currentTimeMillis() - start) / 1000.));
-    return result;
+    log.debug(String.format("Finished gathering information from %d of %d 
servers in %.2f seconds",
+        info.size(), currentServers.size(), (System.currentTimeMillis() - 
start) / 1000.));
+
+    return info;
   }
 
   public void run() throws IOException, InterruptedException, KeeperException {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to