[accumulo] branch main updated: Fix WebViewsIT

2021-04-19 Thread ctubbsii
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
 new 8dfe443  Fix WebViewsIT
8dfe443 is described below

commit 8dfe443af55f5d3aacb12fb38c5002dcb61cb772
Author: Christopher Tubbs 
AuthorDate: Mon Apr 19 12:02:03 2021 -0400

Fix WebViewsIT

Fix WebViewsIT, broken by addition of new unexpected method call in mock
code, re #2020

Co-authored-by: Karthick Narendran 
---
 .../monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java 
b/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java
index 9b11c47..27a6c76 100644
--- 
a/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java
+++ 
b/server/monitor/src/test/java/org/apache/accumulo/monitor/it/WebViewsIT.java
@@ -93,6 +93,7 @@ public class WebViewsIT extends JerseyTest {
 
expect(contextMock.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
 expect(contextMock.getInstanceID()).andReturn("foo").atLeastOnce();
 expect(contextMock.getInstanceName()).andReturn("foo").anyTimes();
+expect(contextMock.getZooKeepers()).andReturn("foo:2181").anyTimes();
 
 Monitor monitorMock = EasyMock.createMock(Monitor.class);
 expect(monitorMock.getContext()).andReturn(contextMock).anyTimes();


[accumulo] branch 1451-external-compactions-feature updated: WIP refactored how tservers are found, not tested

2021-04-19 Thread kturner
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
 new 850044d  WIP refactored how tservers are found, not tested
850044d is described below

commit 850044dcaea3fdcf022a78d799d734e06ab91f07
Author: Keith Turner 
AuthorDate: Mon Apr 19 15:27:15 2021 -0400

WIP refactored how tservers are found, not tested

  * Support last tserver for queue+prio so we can return diff tservers
  * Only remove tserver+prio when tserver reports nothing
  * Remove any extra when getting tserver report
---
 .../coordinator/CompactionCoordinator.java | 129 ---
 .../accumulo/coordinator/QueueSummaries.java   | 183 +
 .../coordinator/CompactionCoordinatorTest.java |   6 +-
 3 files changed, 223 insertions(+), 95 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 3d6f749..b8a8655 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -23,17 +23,14 @@ import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 import java.net.UnknownHostException;
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.clientImpl.ThriftTransportPool;
@@ -96,12 +93,7 @@ public class CompactionCoordinator extends AbstractServer
   private static final long FIFTEEN_MINUTES =
   TimeUnit.MILLISECONDS.convert(Duration.of(15, 
TimeUnit.MINUTES.toChronoUnit()));
 
-  /* Map of external queue name -> priority -> tservers */
-  protected static final 
Map>> QUEUES =
-  new ConcurrentHashMap<>();
-  /* index of tserver to queue and priority, exists to provide O(1) lookup 
into QUEUES */
-  protected static final Map> INDEX =
-  new ConcurrentHashMap<>();
+  QueueSummaries queueSummaries = new QueueSummaries();
   /* Map of compactionId to RunningCompactions */
   protected static final Map RUNNING =
   new ConcurrentHashMap<>();
@@ -381,9 +373,7 @@ public class CompactionCoordinator extends AbstractServer
   QueueAndPriority.get(summary.getQueue().intern(), 
summary.getPriority());
   synchronized (qp) {
 TIME_COMPACTOR_LAST_CHECKED.computeIfAbsent(qp.getQueue(), k 
-> 0L);
-QUEUES.computeIfAbsent(qp.getQueue(), k -> new TreeMap<>())
-.computeIfAbsent(qp.getPriority(), k -> new 
LinkedHashSet<>()).add(tsi);
-INDEX.computeIfAbsent(tsi, k -> new HashSet<>()).add(qp);
+queueSummaries.update(tsi, summaries);
   }
 });
   } finally {
@@ -444,20 +434,7 @@ public class CompactionCoordinator extends AbstractServer
 // run() will iterate over the current and added tservers and add them to 
the internal
 // data structures. For tservers that are deleted, we need to remove them 
from QUEUES
 // and INDEX
-deleted.forEach(tsi -> {
-  INDEX.get(tsi).forEach(qp -> {
-TreeMap> m = 
QUEUES.get(qp.getQueue());
-if (null != m) {
-  LinkedHashSet tservers = m.get(qp.getPriority());
-  if (null != tservers) {
-synchronized (qp) {
-  tservers.remove(tsi);
-}
-  }
-}
-  });
-  INDEX.remove(tsi);
-});
+queueSummaries.remove(deleted);
   }
 
   /**
@@ -483,81 +460,47 @@ public class CompactionCoordinator extends AbstractServer
 TIME_COMPACTOR_LAST_CHECKED.put(queue, System.currentTimeMillis());
 
 TExternalCompactionJob result = null;
-final TreeMap> m = QUEUES.get(queue);
-if (null != m && !m.isEmpty()) {
-  while (result == null) {
-
-// m could become empty if we have contacted all tservers in this 
queue and
-// there are no compactions
-if (m.isEmpty()) {
-  LOG.debug("No tservers found for queue {}, returning empty job to 
compactor {}", queue,
-  compactorAddress);
-  result = new TExternalCompactionJob();
-  break;
- 

[accumulo] branch 1451-external-compactions-feature updated: Added test for queue summaries

2021-04-19 Thread kturner
This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
 new a81e595  Added test for queue summaries
a81e595 is described below

commit a81e5953fce1bdf04cc0bbabc91f3867959b1786
Author: Keith Turner 
AuthorDate: Mon Apr 19 15:50:10 2021 -0400

Added test for queue summaries
---
 .../coordinator/CompactionCoordinator.java |   5 +-
 .../accumulo/coordinator/QueueSummaries.java   |  72 
 .../accumulo/coordinator/QueueSummariesTest.java   | 184 +
 .../apache/accumulo/test/ExternalCompactionIT.java |   4 -
 4 files changed, 231 insertions(+), 34 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index b8a8655..c169701 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -382,6 +382,7 @@ public class CompactionCoordinator extends AbstractServer
 } catch (TException e) {
   LOG.warn("Error getting external compaction summaries from tablet 
server: {}",
   tsi.getHostAndPort(), e);
+  queueSummaries.remove(Set.of(tsi));
 }
   });
 
@@ -488,8 +489,10 @@ public class CompactionCoordinator extends AbstractServer
 result = job;
 break;
   } catch (TException e) {
-LOG.error("Error from tserver {} while trying to reserve compaction, 
trying next tserver",
+LOG.warn("Error from tserver {} while trying to reserve compaction, 
trying next tserver",
 
ExternalCompactionUtil.getHostPortString(tserver.getHostAndPort()), e);
+queueSummaries.removeSummary(tserver, queueName, prioTserver.prio);
+prioTserver = queueSummaries.getNextTserver(queueName);
   } finally {
 ThriftUtil.returnClient(client);
   }
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
index fced9a0..e1caa8f 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/QueueSummaries.java
@@ -18,16 +18,17 @@
  */
 package org.apache.accumulo.coordinator;
 
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
@@ -37,27 +38,24 @@ import com.google.common.collect.Sets;
 public class QueueSummaries {
 
   // keep track of the last tserver retunred for qeueue
- final Map LAST = new HashMap<>();
+  final Map LAST = new HashMap<>();
 
- //CBUG may not need concurrent hash map depending on how sync is done
   /* Map of external queue name -> priority -> tservers */
-  final Map>> QUEUES =
-  new ConcurrentHashMap<>();
+  final Map>> QUEUES = new 
HashMap<>();
   /* index of tserver to queue and priority, exists to provide O(1) lookup 
into QUEUES */
-  final Map> INDEX =
-  new ConcurrentHashMap<>();
+  final Map> INDEX = new HashMap<>();
 
   private Entry> getNextTserverEntry(String 
queue) {
 TreeMap> m = QUEUES.get(queue);
-if(m == null) {
+if (m == null) {
   return null;
 }
 
 Iterator>> iter = 
m.entrySet().iterator();
 
-while(iter.hasNext()) {
+while (iter.hasNext()) {
   Entry> next = iter.next();
-  if(next.getValue().isEmpty()) {
+  if (next.getValue().isEmpty()) {
 iter.remove();
   } else {
 return next;
@@ -70,24 +68,41 @@ public class QueueSummaries {
 
   }
 
-
   static class PrioTserver {
-
-final TServerInstance tserver;
+TServerInstance tserver;
 final long prio;
 
 public PrioTserver(TServerInstance t, long p) {
   this.tserver = t;
   this.prio = p;
 }
+
+@Override
+public boolean equals(Object obj) {
+  if (obj instanceof PrioTserver) {
+PrioTserver opt = (PrioTserver) obj;
+return tserver.equals(opt.tserver) && prio == opt.prio;
+  }
+
+  return false;
+}
+
+@Override
+public int hashCode() {
+  return Objects.hash(tserver,