This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 00bde13 Make synchronization tighter 00bde13 is described below commit 00bde136655633339a42783fd41a23ba3f324c94 Author: Dave Marion <dlmar...@apache.org> AuthorDate: Tue Mar 16 20:38:54 2021 +0000 Make synchronization tighter Instead of synchronizing on the QUEUES collection in total, synchronize on the QueueAndPriority objects which are used in the code and are cached singletons in the JVM --- .../coordinator/CompactionCoordinator.java | 68 +++++++++++----------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b4b2cb4..3f38d6c 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -84,7 +84,7 @@ public class CompactionCoordinator extends AbstractServer /* Map of external queue name -> priority -> tservers */ private static final Map<String,TreeMap<Long,LinkedHashSet<TServerInstance>>> QUEUES = - new HashMap<>(); + new ConcurrentHashMap<>(); /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ private static final Map<TServerInstance,HashSet<QueueAndPriority>> INDEX = new ConcurrentHashMap<>(); @@ -209,7 +209,7 @@ public class CompactionCoordinator extends AbstractServer summaries.forEach(summary -> { QueueAndPriority qp = QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); - synchronized (QUEUES) { + synchronized (qp) { QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>()) .computeIfAbsent(qp.getPriority(), k -> new LinkedHashSet<>()).add(tsi); INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp); @@ -264,7 +264,7 @@ public class CompactionCoordinator extends AbstractServer if (null != m) { LinkedHashSet<TServerInstance> tservers = m.get(qp.getPriority()); if (null != tservers) { - synchronized (QUEUES) { + synchronized (qp) { tservers.remove(tsi); INDEX.remove(tsi); } @@ -299,31 +299,31 @@ public class CompactionCoordinator extends AbstractServer LOG.debug("getCompactionJob " + queueName + " " + compactorAddress); String queue = queueName.intern(); TExternalCompactionJob result = null; - // CBUG Review synchronization on QUEUES - synchronized (QUEUES) { - TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue); - if (null != m && !m.isEmpty()) { - while (result == null) { - - // m could become empty if we have contacted all tservers in this queue and - // there are no compactions - if (m.isEmpty()) { - LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, - compactorAddress); - result = new TExternalCompactionJob(); - break; - } - - // Get the first TServerInstance from the highest priority queue - Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry(); - Long priority = entry.getKey(); - LinkedHashSet<TServerInstance> tservers = entry.getValue(); + TreeMap<Long,LinkedHashSet<TServerInstance>> m = QUEUES.get(queue); + if (null != m && !m.isEmpty()) { + while (result == null) { + + // m could become empty if we have contacted all tservers in this queue and + // there are no compactions + if (m.isEmpty()) { + LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, + compactorAddress); + result = new TExternalCompactionJob(); + break; + } - if (null == tservers || tservers.isEmpty()) { - // Clean up the map entry when no tservers for this queue and priority - m.remove(entry.getKey(), entry.getValue()); - continue; - } else { + // Get the first TServerInstance from the highest priority queue + Entry<Long,LinkedHashSet<TServerInstance>> entry = m.firstEntry(); + Long priority = entry.getKey(); + LinkedHashSet<TServerInstance> tservers = entry.getValue(); + QueueAndPriority qp = QueueAndPriority.get(queue, priority); + + if (null == tservers || tservers.isEmpty()) { + // Clean up the map entry when no tservers for this queue and priority + m.remove(entry.getKey(), entry.getValue()); + continue; + } else { + synchronized(qp) { TServerInstance tserver = tservers.iterator().next(); LOG.debug("Found tserver {} with priority {} for queue {}", tserver.getHostAndPort(), priority, queue); @@ -334,9 +334,9 @@ public class CompactionCoordinator extends AbstractServer // CBUG This may be redundant as cleanup happens in the 'if' clause above m.remove(entry.getKey(), entry.getValue()); } - HashSet<QueueAndPriority> qp = INDEX.get(tserver); - qp.remove(QueueAndPriority.get(queue, priority)); - if (qp.isEmpty()) { + HashSet<QueueAndPriority> queues = INDEX.get(tserver); + queues.remove(QueueAndPriority.get(queue, priority)); + if (queues.isEmpty()) { // Remove the tserver from the index INDEX.remove(tserver); } @@ -368,11 +368,11 @@ public class CompactionCoordinator extends AbstractServer } } } - } else { - LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, - compactorAddress); - result = new TExternalCompactionJob(); } + } else { + LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, + compactorAddress); + result = new TExternalCompactionJob(); } return result;