This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 00bde13  Make synchronization tighter
00bde13 is described below

commit 00bde136655633339a42783fd41a23ba3f324c94
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Tue Mar 16 20:38:54 2021 +0000

    Make synchronization tighter
    
    Instead of synchronizing on the QUEUES collection in total, synchronize
    on the QueueAndPriority objects which are used in the code and are cached
    singletons in the JVM
---
 .../coordinator/CompactionCoordinator.java         | 68 +++++++++++-----------
 1 file changed, 34 insertions(+), 34 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b4b2cb4..3f38d6c 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -84,7 +84,7 @@ public class CompactionCoordinator extends AbstractServer
 
   /* Map of external queue name -> priority -> tservers */
   private static final 
Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> QUEUES =
-      new HashMap<>();
+      new ConcurrentHashMap<>();
   /* index of tserver to queue and priority, exists to provide O(1) lookup 
into QUEUES */
   private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX =
       new ConcurrentHashMap<>();
@@ -209,7 +209,7 @@ public class CompactionCoordinator extends AbstractServer
             summaries.forEach(summary -> {
               QueueAndPriority qp =
                   QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
-              synchronized (QUEUES) {
+              synchronized (qp) {
                 QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>())
                     .computeIfAbsent(qp.getPriority(), k -> new 
LinkedHashSet<>()).add(tsi);
                 INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp);
@@ -264,7 +264,7 @@ public class CompactionCoordinator extends AbstractServer
         if (null != m) {
           LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority());
           if (null != tservers) {
-            synchronized (QUEUES) {
+            synchronized (qp) {
               tservers.remove(tsi);
               INDEX.remove(tsi);
             }
@@ -299,31 +299,31 @@ public class CompactionCoordinator extends AbstractServer
     LOG.debug("getCompactionJob " + queueName + " " + compactorAddress);
     String queue = queueName.intern();
     TExternalCompactionJob result = null;
-    // CBUG Review synchronization on QUEUES
-    synchronized (QUEUES) {
-      TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
-      if (null != m && !m.isEmpty()) {
-        while (result == null) {
-
-          // m could become empty if we have contacted all tservers in this 
queue and
-          // there are no compactions
-          if (m.isEmpty()) {
-            LOG.debug("No tservers found for queue {}, returning empty job to 
compactor {}", queue,
-                compactorAddress);
-            result = new TExternalCompactionJob();
-            break;
-          }
-
-          // Get the first TServerInstance from the highest priority queue
-          Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
-          Long priority = entry.getKey();
-          LinkedHashSet<TServerInstance> tservers = entry.getValue();
+    TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue);
+    if (null != m && !m.isEmpty()) {
+      while (result == null) {
+
+        // m could become empty if we have contacted all tservers in this 
queue and
+        // there are no compactions
+        if (m.isEmpty()) {
+          LOG.debug("No tservers found for queue {}, returning empty job to 
compactor {}", queue,
+              compactorAddress);
+          result = new TExternalCompactionJob();
+          break;
+        }
 
-          if (null == tservers || tservers.isEmpty()) {
-            // Clean up the map entry when no tservers for this queue and 
priority
-            m.remove(entry.getKey(), entry.getValue());
-            continue;
-          } else {
+        // Get the first TServerInstance from the highest priority queue
+        Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry();
+        Long priority = entry.getKey();
+        LinkedHashSet<TServerInstance> tservers = entry.getValue();
+        QueueAndPriority qp = QueueAndPriority.get(queue, priority);
+
+        if (null == tservers || tservers.isEmpty()) {
+          // Clean up the map entry when no tservers for this queue and 
priority
+          m.remove(entry.getKey(), entry.getValue());
+          continue;
+        } else {
+          synchronized(qp) {
             TServerInstance tserver = tservers.iterator().next();
             LOG.debug("Found tserver {} with priority {} for queue {}", 
tserver.getHostAndPort(),
                 priority, queue);
@@ -334,9 +334,9 @@ public class CompactionCoordinator extends AbstractServer
               // CBUG This may be redundant as cleanup happens in the 'if' 
clause above
               m.remove(entry.getKey(), entry.getValue());
             }
-            HashSet<QueueAndPriority> qp = INDEX.get(tserver);
-            qp.remove(QueueAndPriority.get(queue, priority));
-            if (qp.isEmpty()) {
+            HashSet<QueueAndPriority> queues = INDEX.get(tserver);
+            queues.remove(QueueAndPriority.get(queue, priority));
+            if (queues.isEmpty()) {
               // Remove the tserver from the index
               INDEX.remove(tserver);
             }
@@ -368,11 +368,11 @@ public class CompactionCoordinator extends AbstractServer
             }
           }
         }
-      } else {
-        LOG.debug("No tservers found for queue {}, returning empty job to 
compactor {}", queue,
-            compactorAddress);
-        result = new TExternalCompactionJob();
       }
+    } else {
+      LOG.debug("No tservers found for queue {}, returning empty job to 
compactor {}", queue,
+          compactorAddress);
+      result = new TExternalCompactionJob();
     }
     return result;
 

Reply via email to