This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 7a91acc Continuous Ingest improvements (#168) 7a91acc is described below commit 7a91acc7ce12afe907d6a01adc784dc3aa553e16 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Wed Dec 8 10:04:43 2021 -0500 Continuous Ingest improvements (#168) * consolidated for-loops * improved logs * avoided parsing properties multiple times * move AccumuloClient and BatchWriter into try-with-resources * created member variables to reuse values & simplify methods * renamed variables and made some final --- conf/accumulo-testing.properties | 2 +- .../testing/continuous/ContinuousIngest.java | 222 ++++++++++----------- 2 files changed, 107 insertions(+), 117 deletions(-) diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties index 6ef5855..aceaef7 100644 --- a/conf/accumulo-testing.properties +++ b/conf/accumulo-testing.properties @@ -83,7 +83,7 @@ test.ci.ingest.pause.duration.min=60 # Maximum pause duration (in seconds) test.ci.ingest.pause.duration.max=120 # The probability (between 0.0 and 1.0) that a set of entries will be deleted during continuous ingest -# To disable deletes, set probability to 0 +# To disable deletes, set probability to 0.0 test.ci.ingest.delete.probability=0.1 # Batch walker diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java index 459102f..73eb545 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java @@ -17,10 +17,6 @@ package org.apache.accumulo.testing.continuous; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MAX; -import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_DURATION_MIN; -import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MAX; -import static org.apache.accumulo.testing.TestProps.CI_INGEST_PAUSE_WAIT_MIN; import java.util.ArrayList; import java.util.Collections; @@ -54,6 +50,9 @@ public class ContinuousIngest { private static List<ColumnVisibility> visibilities; private static long lastPauseNs; private static long pauseWaitSec; + private static boolean pauseEnabled; + private static int pauseMin; + private static int pauseMax; private static ColumnVisibility getVisibility(Random rand) { return visibilities.get(rand.nextInt(visibilities.size())); @@ -64,14 +63,11 @@ public class ContinuousIngest { return Boolean.parseBoolean(value); } - private static int getPause(Properties props, Random rand, String minProp, String maxProp) { - int min = Integer.parseInt(props.getProperty(minProp)); - int max = Integer.parseInt(props.getProperty(maxProp)); - Preconditions.checkState(max >= min && min > 0); - if (max == min) { - return min; + private static int getPause(Random rand) { + if (pauseMax == pauseMin) { + return pauseMin; } - return (rand.nextInt(max - min) + min); + return (rand.nextInt(pauseMax - pauseMin) + pauseMin); } private static float getDeleteProbability(Properties props) { @@ -86,51 +82,48 @@ public class ContinuousIngest { return Integer.parseInt(props.getProperty(TestProps.CI_INGEST_FLUSH_ENTRIES, "1000000")); } - private static void pauseCheck(Properties props, Random rand) throws InterruptedException { - if (pauseEnabled(props)) { + private static void pauseCheck(Random rand) throws InterruptedException { + if (pauseEnabled) { long elapsedNano = System.nanoTime() - lastPauseNs; if (elapsedNano > (TimeUnit.SECONDS.toNanos(pauseWaitSec))) { - long pauseDurationSec = getPause(props, rand, CI_INGEST_PAUSE_DURATION_MIN, - CI_INGEST_PAUSE_DURATION_MAX); - log.info("PAUSING for " + pauseDurationSec + "s"); + long pauseDurationSec = getPause(rand); + log.info("PAUSING for {}s", pauseDurationSec); Thread.sleep(TimeUnit.SECONDS.toMillis(pauseDurationSec)); lastPauseNs = System.nanoTime(); - pauseWaitSec = getPause(props, rand, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX); - log.info("INGESTING for " + pauseWaitSec + "s"); + pauseWaitSec = getPause(rand); + log.info("INGESTING for {}s", pauseWaitSec); } } } public static void main(String[] args) throws Exception { - try (ContinuousEnv env = new ContinuousEnv(args)) { + try (ContinuousEnv env = new ContinuousEnv(args); + AccumuloClient client = env.getAccumuloClient()) { - visibilities = parseVisibilities(env.getTestProperty(TestProps.CI_INGEST_VISIBILITIES)); + final long rowMin = env.getRowMin(); + final long rowMax = env.getRowMax(); + Preconditions.checkState(0 <= rowMin && rowMin <= rowMax, + "Bad rowMin/rowMax, must conform to: 0 <= rowMin <= rowMax"); - long rowMin = env.getRowMin(); - long rowMax = env.getRowMax(); - if (rowMin < 0 || rowMax < 0 || rowMax <= rowMin) { - throw new IllegalArgumentException("bad min and max"); - } - - AccumuloClient client = env.getAccumuloClient(); String tableName = env.getAccumuloTableName(); if (!client.tableOperations().exists(tableName)) { throw new TableNotFoundException(null, tableName, "Consult the README and create the table before starting ingest."); } - BatchWriter bw = client.createBatchWriter(tableName); - - Random r = new Random(); + Random rand = new Random(); byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8); + log.info("Ingest instance ID: {} current time: {}ms", new String(ingestInstanceId, UTF_8), + System.currentTimeMillis()); - log.info(String.format("UUID %d %s", System.currentTimeMillis(), - new String(ingestInstanceId, UTF_8))); + Properties testProps = env.getTestProperties(); - long count = 0; - final int flushInterval = getFlushEntries(env.getTestProperties()); + long entriesWritten = 0L; + long entriesDeleted = 0L; + final int flushInterval = getFlushEntries(testProps); + log.info("A flush will occur after every {} entries written", flushInterval); final int maxDepth = 25; // always want to point back to flushed data. This way the previous item should @@ -141,99 +134,97 @@ public class ContinuousIngest { long lastFlushTime = System.currentTimeMillis(); - int maxColF = env.getMaxColF(); - int maxColQ = env.getMaxColQ(); - boolean checksum = Boolean.parseBoolean(env.getTestProperty(TestProps.CI_INGEST_CHECKSUM)); - long numEntries = Long.parseLong(env.getTestProperty(TestProps.CI_INGEST_CLIENT_ENTRIES)); + final int maxColF = env.getMaxColF(); + final int maxColQ = env.getMaxColQ(); + final boolean checksum = Boolean + .parseBoolean(testProps.getProperty(TestProps.CI_INGEST_CHECKSUM)); + final long numEntries = Long + .parseLong(testProps.getProperty(TestProps.CI_INGEST_CLIENT_ENTRIES)); + log.info("Total entries to be written: {}", numEntries); - Properties testProps = env.getTestProperties(); - if (pauseEnabled(testProps)) { + visibilities = parseVisibilities(testProps.getProperty(TestProps.CI_INGEST_VISIBILITIES)); + + pauseEnabled = pauseEnabled(testProps); + + pauseMin = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MIN)); + pauseMax = Integer.parseInt(testProps.getProperty(TestProps.CI_INGEST_PAUSE_WAIT_MAX)); + Preconditions.checkState(0 < pauseMin && pauseMin <= pauseMax, + "Bad pause wait min/max, must conform to: 0 < min <= max"); + + if (pauseEnabled) { lastPauseNs = System.nanoTime(); - pauseWaitSec = getPause(testProps, r, CI_INGEST_PAUSE_WAIT_MIN, CI_INGEST_PAUSE_WAIT_MAX); + pauseWaitSec = getPause(rand); log.info("PAUSING enabled"); - log.info("INGESTING for " + pauseWaitSec + "s"); + log.info("INGESTING for {}s", pauseWaitSec); } final float deleteProbability = getDeleteProbability(testProps); log.info("DELETES will occur with a probability of {}", String.format("%.02f", deleteProbability)); - out: while (true) { - ColumnVisibility cv = getVisibility(r); + try (BatchWriter bw = client.createBatchWriter(tableName)) { + out: while (true) { + ColumnVisibility cv = getVisibility(rand); - // generate first set of nodes - for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(rowMin, rowMax, r); + // generate sets nodes that link to previous set of nodes + for (int depth = 0; depth < maxDepth; depth++) { + for (int index = 0; index < flushInterval; index++) { + long rowLong = genLong(rowMin, rowMax, rand); - int cf = r.nextInt(maxColF); - int cq = r.nextInt(maxColQ); + byte[] prevRow = depth == 0 ? null : genRow(nodeMap[depth - 1][index].row); - nodeMap[0][index] = new MutationInfo(rowLong, cf, cq); + int cfInt = rand.nextInt(maxColF); + int cqInt = rand.nextInt(maxColQ); - Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, checksum); - count++; - bw.addMutation(m); - } + nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt); + Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, entriesWritten, + prevRow, checksum); + entriesWritten++; + bw.addMutation(m); + } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= numEntries) - break out; - - // generate subsequent sets of nodes that link to previous set of nodes - for (int depth = 1; depth < maxDepth; depth++) { - for (int index = 0; index < flushInterval; index++) { - long rowLong = genLong(rowMin, rowMax, r); - byte[] prevRow = genRow(nodeMap[depth - 1][index].row); - int cfInt = r.nextInt(maxColF); - int cqInt = r.nextInt(maxColQ); - nodeMap[depth][index] = new MutationInfo(rowLong, cfInt, cqInt); - Mutation m = genMutation(rowLong, cfInt, cqInt, cv, ingestInstanceId, count, prevRow, - checksum); - count++; - bw.addMutation(m); + lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); + if (entriesWritten >= numEntries) + break out; + pauseCheck(rand); } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - if (count >= numEntries) - break out; - pauseCheck(testProps, r); - } - - // random chance that the entries will be deleted - boolean delete = r.nextFloat() < deleteProbability; - - // if the previously written entries are scheduled to be deleted - if (delete) { - log.info("Deleting last portion of written entries"); - // add delete mutations in the reverse order in which they were written - for (int depth = nodeMap.length - 1; depth >= 0; depth--) { - for (int index = nodeMap[depth].length - 1; index >= 0; index--) { - MutationInfo currentNode = nodeMap[depth][index]; - Mutation m = new Mutation(genRow(currentNode.row)); - m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq)); + // random chance that the entries will be deleted + final boolean delete = rand.nextFloat() < deleteProbability; + + // if the previously written entries are scheduled to be deleted + if (delete) { + log.info("Deleting last portion of written entries"); + // add delete mutations in the reverse order in which they were written + for (int depth = nodeMap.length - 1; depth >= 0; depth--) { + for (int index = nodeMap[depth].length - 1; index >= 0; index--) { + MutationInfo currentNode = nodeMap[depth][index]; + Mutation m = new Mutation(genRow(currentNode.row)); + m.putDelete(genCol(currentNode.cf), genCol(currentNode.cq)); + entriesDeleted++; + bw.addMutation(m); + } + lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); + pauseCheck(rand); + } + } else { + // create one big linked list, this makes all the first inserts point to something + for (int index = 0; index < flushInterval - 1; index++) { + MutationInfo firstEntry = nodeMap[0][index]; + MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1]; + Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv, + ingestInstanceId, entriesWritten, genRow(lastEntry.row), checksum); + entriesWritten++; bw.addMutation(m); } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - pauseCheck(testProps, r); + lastFlushTime = flush(bw, entriesWritten, entriesDeleted, lastFlushTime); } - } else { - // create one big linked list, this makes all the first inserts point to something - for (int index = 0; index < flushInterval - 1; index++) { - MutationInfo firstEntry = nodeMap[0][index]; - MutationInfo lastEntry = nodeMap[maxDepth - 1][index + 1]; - Mutation m = genMutation(firstEntry.row, firstEntry.cf, firstEntry.cq, cv, - ingestInstanceId, count, genRow(lastEntry.row), checksum); - count++; - bw.addMutation(m); - } - lastFlushTime = flush(bw, count, flushInterval, lastFlushTime); - } - if (count >= numEntries) - break out; - pauseCheck(testProps, r); + if (entriesWritten >= numEntries) + break out; + pauseCheck(rand); + } } - bw.close(); } } @@ -263,19 +254,18 @@ public class ContinuousIngest { return vis; } - private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) - throws MutationsRejectedException { + private static long flush(BatchWriter bw, long entriesWritten, long entriesDeleted, + long lastFlushTime) throws MutationsRejectedException { long t1 = System.currentTimeMillis(); bw.flush(); long t2 = System.currentTimeMillis(); - log.info(String.format("FLUSH %d %d %d %d %d", t2, (t2 - lastFlushTime), (t2 - t1), count, - flushInterval)); - lastFlushTime = t2; - return lastFlushTime; + log.info("FLUSH - duration: {}ms, since last flush: {}ms, total written: {}, total deleted: {}", + (t2 - t1), (t2 - lastFlushTime), entriesWritten, entriesDeleted); + return t2; } public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, - byte[] ingestInstanceId, long count, byte[] prevRow, boolean checksum) { + byte[] ingestInstanceId, long entriesWritten, byte[] prevRow, boolean checksum) { // Adler32 is supposed to be faster, but according to wikipedia is not // good for small data.... so used CRC32 instead CRC32 cksum = null; @@ -295,7 +285,7 @@ public class ContinuousIngest { Mutation m = new Mutation(rowString); - m.put(cfString, cqString, cv, createValue(ingestInstanceId, count, prevRow, cksum)); + m.put(cfString, cqString, cv, createValue(ingestInstanceId, entriesWritten, prevRow, cksum)); return m; } @@ -315,7 +305,7 @@ public class ContinuousIngest { return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES); } - public static byte[] createValue(byte[] ingestInstanceId, long count, byte[] prevRow, + public static byte[] createValue(byte[] ingestInstanceId, long entriesWritten, byte[] prevRow, Checksum cksum) { int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3; if (cksum != null) @@ -324,7 +314,7 @@ public class ContinuousIngest { System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length); int index = ingestInstanceId.length; val[index++] = ':'; - int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES); + int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 16, EMPTY_BYTES); if (added != 16) throw new RuntimeException(" " + added); index += 16;