This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch zipfianIngest
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git

commit f0f6abbb214485064b38975f3041170ff5e84423
Author: Dom Garguilo <domgargu...@apache.org>
AuthorDate: Fri Apr 12 13:58:21 2024 -0400

    Add zipfian distribution option to continuous ingest
---
 conf/accumulo-testing.properties                   |  8 ++++
 .../testing/continuous/ContinuousIngest.java       | 51 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/conf/accumulo-testing.properties b/conf/accumulo-testing.properties
index 93c3227..e871277 100644
--- a/conf/accumulo-testing.properties
+++ b/conf/accumulo-testing.properties
@@ -89,6 +89,14 @@ test.ci.ingest.pause.duration.max=120
 # The probability (between 0.0 and 1.0) that a set of entries will be deleted 
during continuous ingest
 # To disable deletes, set probability to 0.0
 test.ci.ingest.delete.probability=0.1
+# Enables Zipfian distribution for value size. If set to true, the value will 
have random bytes inserted into it with a size generated based on a Zipfian 
distribution.
+test.ci.ingest.zipfian.enabled=true
+# Minimum size to insert into the value when Zipfian distribution is enabled
+test.ci.ingest.zipfian.min.size=0
+# Maximum size to insert into the value when Zipfian distribution is enabled
+test.ci.ingest.zipfian.max.size=10000
+# Exponent of the Zipfian distribution
+test.ci.ingest.zipfian.exponent=1.5
 
 # Batch walker
 # ------------
diff --git 
a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java 
b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
index 1bb32a5..bb93ca7 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousIngest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.security.ColumnVisibility;
 import org.apache.accumulo.testing.TestProps;
 import org.apache.accumulo.testing.util.FastFormat;
+import org.apache.commons.math3.random.RandomDataGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,6 +57,13 @@ public class ContinuousIngest {
   private static int pauseMin;
   private static int pauseMax;
 
+  private static boolean zipfianEnabled;
+  private static int minSize;
+  private static int maxSize;
+  private static double exponent;
+
+  private static RandomDataGenerator rnd;
+
   private static ColumnVisibility getVisibility(Random rand) {
     return visibilities.get(rand.nextInt(visibilities.size()));
   }
@@ -173,6 +181,18 @@ public class ContinuousIngest {
     log.info("DELETES will occur with a probability of {}",
         String.format("%.02f", deleteProbability));
 
+    zipfianEnabled = 
Boolean.parseBoolean(testProps.getProperty("test.ci.ingest.zipfian.enabled"));
+
+    if (zipfianEnabled) {
+      minSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.min.size"));
+      maxSize = 
Integer.parseInt(testProps.getProperty("test.ci.ingest.zipfian.max.size"));
+      exponent = 
Double.parseDouble(testProps.getProperty("test.ci.ingest.zipfian.exponent"));
+      rnd = new RandomDataGenerator();
+
+      log.info("Zipfian distribution enabled with min size: {}, max size: {}, 
exponent: {}",
+          minSize, maxSize, exponent);
+    }
+
     try (BatchWriter bw = client.createBatchWriter(tableName)) {
       out: while (true) {
         ColumnVisibility cv = getVisibility(random);
@@ -317,18 +337,37 @@ public class ContinuousIngest {
 
   public static byte[] createValue(byte[] ingestInstanceId, long 
entriesWritten, byte[] prevRow,
       Checksum cksum) {
-    int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : 
prevRow.length) + 3;
+    final int numOfSeparators = 4;
+    int dataLen =
+        ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) 
+ numOfSeparators;
     if (cksum != null)
       dataLen += 8;
+
+    int zipfLength = 0;
+    if (zipfianEnabled) {
+      // add the length of the zipfian data to the value
+      int range = maxSize - minSize;
+      zipfLength = rnd.nextZipf(range, exponent) + minSize;
+      dataLen += zipfLength;
+    }
+
     byte[] val = new byte[dataLen];
+
+    // add the ingest instance id to the value
     System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
     int index = ingestInstanceId.length;
+
     val[index++] = ':';
+
+    // add the count of entries written to the value
     int added = FastFormat.toZeroPaddedString(val, index, entriesWritten, 16, 
16, EMPTY_BYTES);
     if (added != 16)
       throw new RuntimeException(" " + added);
     index += 16;
+
     val[index++] = ':';
+
+    // add the previous row to the value
     if (prevRow != null) {
       System.arraycopy(prevRow, 0, val, index, prevRow.length);
       index += prevRow.length;
@@ -336,6 +375,16 @@ public class ContinuousIngest {
 
     val[index++] = ':';
 
+    if (zipfianEnabled) {
+      // add random data to the value of length zipfLength
+      for (int i = 0; i < zipfLength; i++) {
+        val[index++] = (byte) rnd.nextInt(0, 256);
+      }
+
+      val[index++] = ':';
+    }
+
+    // add the checksum to the value
     if (cksum != null) {
       cksum.update(val, 0, index);
       cksum.getValue();

Reply via email to