This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a91acc  Continuous Ingest improvements (#168)
7a91acc is described below

commit 7a91acc7ce12afe907d6a01adc784dc3aa553e16
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Wed Dec 8 10:04:43 2021 -0500

    Continuous Ingest improvements (#168)
    
    * consolidated for-loops
    * improved logs
    * avoided parsing properties multiple times
    * move AccumuloClient and BatchWriter into try-with-resources
    * created member variables to reuse values & simplify methods
    * renamed variables and made some final
---
 conf/accumulo-testing.properties                   |   2 +-
 .../testing/continuous/ContinuousIngest.java       | 222 ++++++++++-----------
 2 files changed, 107 insertions(+), 117 deletions(-)

diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 6ef5855..aceaef7 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -83,7 +83,7 @@ test.ci.ingest.pause.duration.min=60
 # Maximum pause duration (in seconds)
 test.ci.ingest.pause.duration.max=120
 # The probability (between 0.0 and 1.0) that a set of entries will be deleted 
during continuous ingest
-# To disable deletes, set probability to 0
+# To disable deletes, set probability to 0.0
 test.ci.ingest.delete.probability=0.1
 
 # Batch walker
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 459102f..73eb545 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -17,10 +17,6 @@
 package org.apache.accumulo.testing.continuous;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static 
org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MAX;
-import static 
org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MIN;
-import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MAX;
-import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MIN;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -54,6 +50,9 @@ public class ContinuousIngest {
   private static List<ColumnVisibility> visibilities;
   private static long lastPauseNs;
   private static long pauseWaitSec;
+  private static boolean pauseEnabled;
+  private static int pauseMin;
+  private static int pauseMax;
 
   private static ColumnVisibility getVisibility(Random rand) {
     return visibilities.get(rand.nextInt(visibilities.size()));
@@ -64,14 +63,11 @@ public class ContinuousIngest {
     return Boolean.parseBoolean(value);
   }
 
-  private static int getPause(Properties props, Random rand, String minProp, 
String maxProp) {
-    int min = Integer.parseInt(props.getProperty(minProp));
-    int max = Integer.parseInt(props.getProperty(maxProp));
-    Preconditions.checkState(max >= min && min > 0);
-    if (max == min) {
-      return min;
+  private static int getPause(Random rand) {
+    if (pauseMax == pauseMin) {
+      return pauseMin;
     }
-    return (rand.nextInt(max - min) + min);
+    return (rand.nextInt(pauseMax - pauseMin) + pauseMin);
   }
 
   private static float getDeleteProbability(Properties props) {
@@ -86,51 +82,48 @@ public class ContinuousIngest {
     return 
Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, 
"1000000"));
   }
 
-  private static void pauseCheck(Properties props, Random rand) throws 
InterruptedException {
-    if (pauseEnabled(props)) {
+  private static void pauseCheck(Random rand) throws InterruptedException {
+    if (pauseEnabled) {
       long elapsedNano = System.nanoTime() - lastPauseNs;
       if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) {
-        long pauseDurationSec = getPause(props, rand, 
CI_INGEST_PAUSE_DURATION_MIN,
-            CI_INGEST_PAUSE_DURATION_MAX);
-        log.info("PAUSING for " + pauseDurationSec + "s");
+        long pauseDurationSec = getPause(rand);
+        log.info("PAUSING for {}s", pauseDurationSec);
         Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec));
         lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPause(props, rand, CI_INGEST_PAUSE_WAIT_MIN, 
CI_INGEST_PAUSE_WAIT_MAX);
-        log.info("INGESTING for " + pauseWaitSec + "s");
+        pauseWaitSec = getPause(rand);
+        log.info("INGESTING for {}s", pauseWaitSec);
       }
     }
   }
 
   public static void main(String[] args) throws Exception {
 
-    try (ContinuousEnv env = new ContinuousEnv(args)) {
+    try (ContinuousEnv env = new ContinuousEnv(args);
+        AccumuloClient client = env.getAccumuloClient()) {
 
-      visibilities = 
parseVisibilities(env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES));
+      final long rowMin = env.getRowMin();
+      final long rowMax = env.getRowMax();
+      Preconditions.checkState(0 <= rowMin && rowMin <= rowMax,
+          "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax");
 
-      long rowMin = env.getRowMin();
-      long rowMax = env.getRowMax();
-      if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) {
-        throw new IllegalArgumentException("bad min and max");
-      }
-
-      AccumuloClient client = env.getAccumuloClient();
       String tableName = env.getAccumuloTableName();
       if (!client.tableOperations().exists(tableName)) {
         throw new TableNotFoundException(null, tableName,
             "Consult the README and create the table before starting ingest.");
       }
 
-      BatchWriter bw = client.createBatchWriter(tableName);
-
-      Random r = new Random();
+      Random rand = new Random();
 
       byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
+      log.info("Ingest instance ID: {} current time: {}ms", new 
String(ingestInstanceId, UTF_8),
+          System.currentTimeMillis());
 
-      log.info(String.format("UUID %d %s", System.currentTimeMillis(),
-          new String(ingestInstanceId, UTF_8)));
+      Properties testProps = env.getTestProperties();
 
-      long count = 0;
-      final int flushInterval = getFlushEntries(env.getTestProperties());
+      long entriesWritten = 0L;
+      long entriesDeleted = 0L;
+      final int flushInterval = getFlushEntries(testProps);
+      log.info("A flush will occur after every {} entries written", 
flushInterval);
       final int maxDepth = 25;
 
       // always want to point back to flushed data. This way the previous item 
should
@@ -141,99 +134,97 @@ public class ContinuousIngest {
 
       long lastFlushTime = System.currentTimeMillis();
 
-      int maxColF = env.getMaxColF();
-      int maxColQ = env.getMaxColQ();
-      boolean checksum = 
Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM));
-      long numEntries = 
Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+      final int maxColF = env.getMaxColF();
+      final int maxColQ = env.getMaxColQ();
+      final boolean checksum = Boolean
+          .parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM));
+      final long numEntries = Long
+          
.parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES));
+      log.info("Total entries to be written: {}", numEntries);
 
-      Properties testProps = env.getTestProperties();
-      if (pauseEnabled(testProps)) {
+      visibilities = 
parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES));
+
+      pauseEnabled = pauseEnabled(testProps);
+
+      pauseMin = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN));
+      pauseMax = 
Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX));
+      Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax,
+          "Bad pause wait min/max, must conform to: 0 < min <= max");
+
+      if (pauseEnabled) {
         lastPauseNs = System.nanoTime();
-        pauseWaitSec = getPause(testProps, r, CI_INGEST_PAUSE_WAIT_MIN, 
CI_INGEST_PAUSE_WAIT_MAX);
+        pauseWaitSec = getPause(rand);
         log.info("PAUSING enabled");
-        log.info("INGESTING for " + pauseWaitSec + "s");
+        log.info("INGESTING for {}s", pauseWaitSec);
       }
 
       final float deleteProbability = getDeleteProbability(testProps);
       log.info("DELETES will occur with a probability of {}",
           String.format("%.02f", deleteProbability));
 
-      out: while (true) {
-        ColumnVisibility cv = getVisibility(r);
+      try (BatchWriter bw = client.createBatchWriter(tableName)) {
+        out: while (true) {
+          ColumnVisibility cv = getVisibility(rand);
 
-        // generate first set of nodes
-        for (int index = 0; index < flushInterval; index++) {
-          long rowLong = genLong(rowMin, rowMax, r);
+          // generate sets nodes that link to previous set of nodes
+          for (int depth = 0; depth < maxDepth; depth++) {
+            for (int index = 0; index < flushInterval; index++) {
+              long rowLong = genLong(rowMin, rowMax, rand);
 
-          int cf = r.nextInt(maxColF);
-          int cq = r.nextInt(maxColQ);
+              byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 
1][index].row);
 
-          nodeMap[0][index] = new MutationInfo(rowLong, cf, cq);
+              int cfInt = rand.nextInt(maxColF);
+              int cqInt = rand.nextInt(maxColQ);
 
-          Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, 
count, null, checksum);
-          count++;
-          bw.addMutation(m);
-        }
+              nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
+              Mutation m = genMutation(rowLong, cfInt, cqInt, cv, 
ingestInstanceId, entriesWritten,
+                  prevRow, checksum);
+              entriesWritten++;
+              bw.addMutation(m);
+            }
 
-        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-        if (count >= numEntries)
-          break out;
-
-        // generate subsequent sets of nodes that link to previous set of nodes
-        for (int depth = 1; depth < maxDepth; depth++) {
-          for (int index = 0; index < flushInterval; index++) {
-            long rowLong = genLong(rowMin, rowMax, r);
-            byte[] prevRow = genRow(nodeMap[depth - 1][index].row);
-            int cfInt = r.nextInt(maxColF);
-            int cqInt = r.nextInt(maxColQ);
-            nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt);
-            Mutation m = genMutation(rowLong, cfInt, cqInt, cv, 
ingestInstanceId, count, prevRow,
-                checksum);
-            count++;
-            bw.addMutation(m);
+            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
+            if (entriesWritten >= numEntries)
+              break out;
+            pauseCheck(rand);
           }
 
-          lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-          if (count >= numEntries)
-            break out;
-          pauseCheck(testProps, r);
-        }
-
-        // random chance that the entries will be deleted
-        boolean delete = r.nextFloat() < deleteProbability;
-
-        // if the previously written entries are scheduled to be deleted
-        if (delete) {
-          log.info("Deleting last portion of written entries");
-          // add delete mutations in the reverse order in which they were 
written
-          for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
-            for (int index = nodeMap[depth].length - 1; index >= 0; index--) {
-              MutationInfo currentNode = nodeMap[depth][index];
-              Mutation m = new Mutation(genRow(currentNode.row));
-              m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
+          // random chance that the entries will be deleted
+          final boolean delete = rand.nextFloat() < deleteProbability;
+
+          // if the previously written entries are scheduled to be deleted
+          if (delete) {
+            log.info("Deleting last portion of written entries");
+            // add delete mutations in the reverse order in which they were 
written
+            for (int depth = nodeMap.length - 1; depth >= 0; depth--) {
+              for (int index = nodeMap[depth].length - 1; index >= 0; index--) 
{
+                MutationInfo currentNode = nodeMap[depth][index];
+                Mutation m = new Mutation(genRow(currentNode.row));
+                m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq));
+                entriesDeleted++;
+                bw.addMutation(m);
+              }
+              lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
+              pauseCheck(rand);
+            }
+          } else {
+            // create one big linked list, this makes all the first inserts 
point to something
+            for (int index = 0; index < flushInterval - 1; index++) {
+              MutationInfo firstEntry = nodeMap[0][index];
+              MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
+              Mutation m = genMutation(firstEntry.row, firstEntry.cf, 
firstEntry.cq, cv,
+                  ingestInstanceId, entriesWritten, genRow(lastEntry.row), 
checksum);
+              entriesWritten++;
               bw.addMutation(m);
             }
-            lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-            pauseCheck(testProps, r);
+            lastFlushTime = flush(bw, entriesWritten, entriesDeleted, 
lastFlushTime);
           }
-        } else {
-          // create one big linked list, this makes all the first inserts 
point to something
-          for (int index = 0; index < flushInterval - 1; index++) {
-            MutationInfo firstEntry = nodeMap[0][index];
-            MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1];
-            Mutation m = genMutation(firstEntry.row, firstEntry.cf, 
firstEntry.cq, cv,
-                ingestInstanceId, count, genRow(lastEntry.row), checksum);
-            count++;
-            bw.addMutation(m);
-          }
-          lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
-        }
 
-        if (count >= numEntries)
-          break out;
-        pauseCheck(testProps, r);
+          if (entriesWritten >= numEntries)
+            break out;
+          pauseCheck(rand);
+        }
       }
-      bw.close();
     }
   }
 
@@ -263,19 +254,18 @@ public class ContinuousIngest {
     return vis;
   }
 
-  private static long flush(BatchWriter bw, long count, final int 
flushInterval, long lastFlushTime)
-      throws MutationsRejectedException {
+  private static long flush(BatchWriter bw, long entriesWritten, long 
entriesDeleted,
+      long lastFlushTime) throws MutationsRejectedException {
     long t1 = System.currentTimeMillis();
     bw.flush();
     long t2 = System.currentTimeMillis();
-    log.info(String.format("FLUSH %d %d %d %d %d", t2, (t2 - lastFlushTime), 
(t2 - t1), count,
-        flushInterval));
-    lastFlushTime = t2;
-    return lastFlushTime;
+    log.info("FLUSH - duration: {}ms, since last flush: {}ms, total written: 
{}, total deleted: {}",
+        (t2 - t1), (t2 - lastFlushTime), entriesWritten, entriesDeleted);
+    return t2;
   }
 
   public static Mutation genMutation(long rowLong, int cfInt, int cqInt, 
ColumnVisibility cv,
-      byte[] ingestInstanceId, long count, byte[] prevRow, boolean checksum) {
+      byte[] ingestInstanceId, long entriesWritten, byte[] prevRow, boolean 
checksum) {
     // Adler32 is supposed to be faster, but according to wikipedia is not
     // good for small data.... so used CRC32 instead
     CRC32 cksum = null;
@@ -295,7 +285,7 @@ public class ContinuousIngest {
 
     Mutation m = new Mutation(rowString);
 
-    m.put(cfString, cqString, cv, createValue(ingestInstanceId, count, 
prevRow, cksum));
+    m.put(cfString, cqString, cv, createValue(ingestInstanceId, 
entriesWritten, prevRow, cksum));
     return m;
   }
 
@@ -315,7 +305,7 @@ public class ContinuousIngest {
     return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
   }
 
-  public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] 
prevRow,
+  public static byte[] createValue(byte[] ingestInstanceId, long 
entriesWritten, byte[] prevRow,
       Checksum cksum) {
     int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : 
prevRow.length) + 3;
     if (cksum != null)
@@ -324,7 +314,7 @@ public class ContinuousIngest {
     System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
     int index = ingestInstanceId.length;
     val[index++] = ':';
-    int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, 
EMPTY_BYTES);
+    int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 
16, EMPTY_BYTES);
     if (added != 16)
       throw new RuntimeException(" " + added);
     index += 16;

Reply via email to