[accumulo] branch main updated: Fix WebViewsIT
This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/main by this push: new 8dfe443 Fix WebViewsIT 8dfe443 is described below commit 8dfe443af55f5d3aacb12fb38c5002dcb61cb772 Author: Christopher Tubbs AuthorDate: Mon Apr 19 12:02:03 2021 -0400 Fix WebViewsIT Fix WebViewsIT, broken by addition of new unexpected method call in mock code, re #2020 Co-authored-by: Karthick Narendran --- .../monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java b/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java index 9b11c47..27a6c76 100644 --- a/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java +++ b/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java @@ -93,6 +93,7 @@ public class WebViewsIT extends JerseyTest { expect(contextMock.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); expect(contextMock.getInstanceID()).andReturn("foo").atLeastOnce(); expect(contextMock.getInstanceName()).andReturn("foo").anyTimes(); +expect(contextMock.getZooKeepers()).andReturn("foo:2181").anyTimes(); Monitor monitorMock = EasyMock.createMock(Monitor.class); expect(monitorMock.getContext()).andReturn(contextMock).anyTimes();
[accumulo] branch 1451-external-compactions-feature updated: WIP refactored how tservers are found, not tested
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 850044d WIP refactored how tservers are found, not tested 850044d is described below commit 850044dcaea3fdcf022a78d799d734e06ab91f07 Author: Keith Turner AuthorDate: Mon Apr 19 15:27:15 2021 -0400 WIP refactored how tservers are found, not tested * Support last tserver for queue+prio so we can return diff tservers * Only remove tserver+prio when tserver reports nothing * Remove any extra when getting tserver report --- .../coordinator/CompactionCoordinator.java | 129 --- .../accumulo/coordinator/QueueSummaries.java | 183 + .../coordinator/CompactionCoordinatorTest.java | 6 +- 3 files changed, 223 insertions(+), 95 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 3d6f749..b8a8655 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -23,17 +23,14 @@ import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.net.UnknownHostException; import java.time.Duration; import java.util.ArrayList; -import java.util.HashSet; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.ThriftTransportPool; @@ -96,12 +93,7 @@ public class CompactionCoordinator extends AbstractServer private static final long FIFTEEN_MINUTES = TimeUnit.MILLISECONDS.convert(Duration.of(15, TimeUnit.MINUTES.toChronoUnit())); - /* Map of external queue name -> priority -> tservers */ - protected static final Map>> QUEUES = - new ConcurrentHashMap<>(); - /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ - protected static final Map> INDEX = - new ConcurrentHashMap<>(); + QueueSummaries queueSummaries = new QueueSummaries(); /* Map of compactionId to RunningCompactions */ protected static final Map RUNNING = new ConcurrentHashMap<>(); @@ -381,9 +373,7 @@ public class CompactionCoordinator extends AbstractServer QueueAndPriority.get(summary.getQueue().intern(), summary.getPriority()); synchronized (qp) { TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(qp.getQueue(), k -> 0L); -QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>()) -.computeIfAbsent(qp.getPriority(), k -> new LinkedHashSet<>()).add(tsi); -INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp); +queueSummaries.update(tsi, summaries); } }); } finally { @@ -444,20 +434,7 @@ public class CompactionCoordinator extends AbstractServer // run() will iterate over the current and added tservers and add them to the internal // data structures. For tservers that are deleted, we need to remove them from QUEUES // and INDEX -deleted.forEach(tsi -> { - INDEX.get(tsi).forEach(qp -> { -TreeMap> m = QUEUES.get(qp.getQueue()); -if (null != m) { - LinkedHashSet tservers = m.get(qp.getPriority()); - if (null != tservers) { -synchronized (qp) { - tservers.remove(tsi); -} - } -} - }); - INDEX.remove(tsi); -}); +queueSummaries.remove(deleted); } /** @@ -483,81 +460,47 @@ public class CompactionCoordinator extends AbstractServer TIME_COMPACTOR_LAST_CHECKED.put(queue, System.currentTimeMillis()); TExternalCompactionJob result = null; -final TreeMap> m = QUEUES.get(queue); -if (null != m && !m.isEmpty()) { - while (result == null) { - -// m could become empty if we have contacted all tservers in this queue and -// there are no compactions -if (m.isEmpty()) { - LOG.debug("No tservers found for queue {}, returning empty job to compactor {}", queue, - compactorAddress); - result = new TExternalCompactionJob(); - break; -
[accumulo] branch 1451-external-compactions-feature updated: Added test for queue summaries
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new a81e595 Added test for queue summaries a81e595 is described below commit a81e5953fce1bdf04cc0bbabc91f3867959b1786 Author: Keith Turner AuthorDate: Mon Apr 19 15:50:10 2021 -0400 Added test for queue summaries --- .../coordinator/CompactionCoordinator.java | 5 +- .../accumulo/coordinator/QueueSummaries.java | 72 .../accumulo/coordinator/QueueSummariesTest.java | 184 + .../apache/accumulo/test/ExternalCompactionIT.java | 4 - 4 files changed, 231 insertions(+), 34 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b8a8655..c169701 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -382,6 +382,7 @@ public class CompactionCoordinator extends AbstractServer } catch (TException e) { LOG.warn("Error getting external compaction summaries from tablet server: {}", tsi.getHostAndPort(), e); + queueSummaries.remove(Set.of(tsi)); } }); @@ -488,8 +489,10 @@ public class CompactionCoordinator extends AbstractServer result = job; break; } catch (TException e) { -LOG.error("Error from tserver {} while trying to reserve compaction, trying next tserver", +LOG.warn("Error from tserver {} while trying to reserve compaction, trying next tserver", ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e); +queueSummaries.removeSummary(tserver, queueName, prioTserver.prio); +prioTserver = queueSummaries.getNextTserver(queueName); } finally { ThriftUtil.returnClient(client); } diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java index fced9a0..e1caa8f 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java @@ -18,16 +18,17 @@ */ package org.apache.accumulo.coordinator; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; @@ -37,27 +38,24 @@ import com.google.common.collect.Sets; public class QueueSummaries { // keep track of the last tserver retunred for qeueue - final Map LAST = new HashMap<>(); + final Map LAST = new HashMap<>(); - //CBUG may not need concurrent hash map depending on how sync is done /* Map of external queue name -> priority -> tservers */ - final Map>> QUEUES = - new ConcurrentHashMap<>(); + final Map>> QUEUES = new HashMap<>(); /* index of tserver to queue and priority, exists to provide O(1) lookup into QUEUES */ - final Map> INDEX = - new ConcurrentHashMap<>(); + final Map> INDEX = new HashMap<>(); private Entry> getNextTserverEntry(String queue) { TreeMap> m = QUEUES.get(queue); -if(m == null) { +if (m == null) { return null; } Iterator>> iter = m.entrySet().iterator(); -while(iter.hasNext()) { +while (iter.hasNext()) { Entry> next = iter.next(); - if(next.getValue().isEmpty()) { + if (next.getValue().isEmpty()) { iter.remove(); } else { return next; @@ -70,24 +68,41 @@ public class QueueSummaries { } - static class PrioTserver { - -final TServerInstance tserver; +TServerInstance tserver; final long prio; public PrioTserver(TServerInstance t, long p) { this.tserver = t; this.prio = p; } + +@Override +public boolean equals(Object obj) { + if (obj instanceof PrioTserver) { +PrioTserver opt = (PrioTserver) obj; +return tserver.equals(opt.tserver) && prio == opt.prio; + } + + return false; +} + +@Override +public int hashCode() { + return Objects.hash(tserver,