[accumulo-testing] branch main updated: Add deletes to continuous ingest (#166)

2021-11-19 Thread domgarguilo
This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
 new 711d28e  Add deletes to continuous ingest (#166)
711d28e is described below

commit 711d28e83a91cbaa0754064a7d72c0f975f7de31
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Fri Nov 19 13:04:02 2021 -0500

Add deletes to continuous ingest (#166)

* Add the ability for the deletion of entries to occur while running 
continuous ingest
---
 conf/accumulo-testing.properties   |  3 +
 .../org/apache/accumulo/testing/TestProps.java |  2 +
 .../testing/continuous/ContinuousIngest.java   | 85 --
 3 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 4f0e749..6ef5855 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -82,6 +82,9 @@ test.ci.ingest.pause.wait.max=180
 test.ci.ingest.pause.duration.min=60
 # Maximum pause duration (in seconds)
 test.ci.ingest.pause.duration.max=120
+# The probability (between 0.0 and 1.0) that a set of entries will be deleted 
during continuous ingest
+# To disable deletes, set probability to 0
+test.ci.ingest.delete.probability=0.1
 
 # Batch walker
 # 
diff --git a/src/main/java/org/apache/accumulo/testing/TestProps.java 
b/src/main/java/org/apache/accumulo/testing/TestProps.java
index c4c8948..e7801d3 100644
--- a/src/main/java/org/apache/accumulo/testing/TestProps.java
+++ b/src/main/java/org/apache/accumulo/testing/TestProps.java
@@ -94,6 +94,8 @@ public class TestProps {
   public static final String CI_INGEST_PAUSE_DURATION_MAX = CI_INGEST + 
"pause.duration.max";
   // Amount of data to write before flushing. Pause checks are only done after 
flush.
   public static final String CI_INGEST_FLUSH_ENTRIES = CI_INGEST + 
"entries.flush";
+  // The probability (between 0.0 and 1.0) that a set of entries will be 
deleted during continuous ingest
+  public static final String CI_INGEST_DELETE_PROBABILITY = CI_INGEST + 
"delete.probability";
 
   /** Batch Walker **/
   // Sleep time between batch scans (in ms)
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 66f5152..459102f 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -74,6 +74,14 @@ public class ContinuousIngest {
 return (rand.nextInt(max - min) + min);
   }
 
+  private static float getDeleteProbability(Properties props) {
+String stringValue = 
props.getProperty(TestProps.CI_INGEST_DELETE_PROBABILITY);
+float prob = Float.parseFloat(stringValue);
+Preconditions.checkArgument(prob >= 0.0 && prob <= 1.0,
+"Delete probability should be between 0.0 and 1.0");
+return prob;
+  }
+
   private static int getFlushEntries(Properties props) {
 return 
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, 
"100"));
   }
@@ -128,11 +136,8 @@ public class ContinuousIngest {
   // always want to point back to flushed data. This way the previous item 
should
   // always exist in accumulo when verifying data. To do this make insert 
N point
   // back to the row from insert (N - flushInterval). The array below is 
used to keep
-  // track of this.
-  long[] prevRows = new long[flushInterval];
-  long[] firstRows = new long[flushInterval];
-  int[] firstColFams = new int[flushInterval];
-  int[] firstColQuals = new int[flushInterval];
+  // track of all inserts.
+  MutationInfo[][] nodeMap = new MutationInfo[maxDepth][flushInterval];
 
   long lastFlushTime = System.currentTimeMillis();
 
@@ -149,20 +154,21 @@ public class ContinuousIngest {
 log.info("INGESTING for " + pauseWaitSec + "s");
   }
 
+  final float deleteProbability = getDeleteProbability(testProps);
+  log.info("DELETES will occur with a probability of {}",
+  String.format("%.02f", deleteProbability));
+
   out: while (true) {
-// generate first set of nodes
 ColumnVisibility cv = getVisibility(r);
 
+// generate first set of nodes
 for (int index = 0; index < flushInterval; index++) {
   long rowLong = genLong(rowMin, rowMax, r);
-  prevRows[index] = rowLong;
-  firstRows[index] = rowLong;
 
   int cf = r.nextInt(maxColF);
   int cq = r.nextInt(maxColQ);
 
-  firstColFams[index] = cf;
-  firstColQuals[index] = cq;
+  nodeMap[0][index] = new MutationInfo(rowLong, cf, cq);
 
   Mutation m = genMutation(rowLong, cf, cq, cv, 

[accumulo] branch main updated: Create page in Monitor for external compactions (#2358)

2021-11-19 Thread mmiller
This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
 new 2bce893  Create page in Monitor for external compactions (#2358)
2bce893 is described below

commit 2bce8939145b49a765f998f5dbe2b6242a64e3e6
Author: Mike Miller 
AuthorDate: Fri Nov 19 10:42:48 2021 -0500

Create page in Monitor for external compactions (#2358)

* Create multiple new classes for displaying 3 different tables of data
in the new external compaction page in the monitor
* Create 3 new ajax endpoints in ECResource
* Modify Compactor and ExternalCompactionUtil to return Optional for the
compaction coordinator instead of null
* Add check for compaction coordinator to Monitor.fetchData()
* New ExternalCompactionProgressIT for testing progress
* Use new bootstrap panel and badges for coordinator info
* Closes #2290

Co-authored-by: Dom G. <47725857+domgargu...@users.noreply.github.com>
---
 .../util/compaction/ExternalCompactionUtil.java|  10 +-
 .../org/apache/accumulo/compactor/Compactor.java   |   8 +-
 .../java/org/apache/accumulo/monitor/Monitor.java  |  89 +
 .../compactions/external/CompactionInputFile.java  |  38 
 .../rest/compactions/external/CompactorInfo.java   |  33 
 .../rest/compactions/external/Compactors.java  |  40 
 .../rest/compactions/external/CoordinatorInfo.java |  41 +
 .../rest/compactions/external/ECResource.java  |  62 +++
 .../external/ExternalCompactionInfo.java   |  60 ++
 .../compactions/external/RunningCompactions.java   |  39 
 .../compactions/external/RunningCompactorInfo.java | 133 ++
 .../org/apache/accumulo/monitor/view/WebViews.java |  25 +++
 .../org/apache/accumulo/monitor/resources/js/ec.js | 201 +
 .../org/apache/accumulo/monitor/templates/ec.ftl   |  80 
 .../apache/accumulo/monitor/templates/navbar.ftl   |   1 +
 .../compaction/ExternalCompactionProgressIT.java   | 166 +
 .../compaction/ExternalCompactionTestUtils.java|  42 +++--
 17 files changed, 1044 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
index a5538b6..24dff44 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/util/compaction/ExternalCompactionUtil.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -93,19 +94,18 @@ public class ExternalCompactionUtil {
 
   /**
*
-   * @return null if Coordinator node not found, else HostAndPort
+   * @return Optional HostAndPort of Coordinator node if found
*/
-  public static HostAndPort findCompactionCoordinator(ClientContext context) {
+  public static Optional findCompactionCoordinator(ClientContext 
context) {
 final String lockPath = context.getZooKeeperRoot() + 
Constants.ZCOORDINATOR_LOCK;
 try {
   var zk = ZooSession.getAnonymousSession(context.getZooKeepers(),
   context.getZooKeepersSessionTimeOut());
   byte[] address = ServiceLock.getLockData(zk, ServiceLock.path(lockPath));
   if (null == address) {
-return null;
+return Optional.empty();
   }
-  String coordinatorAddress = new String(address);
-  return HostAndPort.fromString(coordinatorAddress);
+  return Optional.of(HostAndPort.fromString(new String(address)));
 } catch (KeeperException | InterruptedException e) {
   throw new RuntimeException(e);
 }
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 898177e..6565280 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -474,12 +474,12 @@ public class Compactor extends AbstractServer implements 
CompactorService.Iface
*   when unable to get client
*/
   protected CompactionCoordinatorService.Client getCoordinatorClient() throws 
TTransportException {
-HostAndPort coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(getContext());
-if (null == coordinatorHost) {
+var coordinatorHost = 
ExternalCompactionUtil.findCompactionCoordinator(getContext());
+if (coordinatorHost.isEmpty()) {
   throw new TTransportException("Unable to get CompactionCoordinator