This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/main by this push:
     new c2c2408  Fix failing ScanExecutorPT (#163)
c2c2408 is described below

commit c2c240848a02bc25e400d8f2b812638e02448d46
Author: Dom G <47725857+domgargu...@users.noreply.github.com>
AuthorDate: Tue Oct 5 10:30:11 2021 -0400

    Fix failing ScanExecutorPT (#163)
    
    * replaced outdated hinting
    
    Co-authored-by: Keith Turner <ktur...@apache.org>
    Co-authored-by: Jeffrey Manno <jeffreymann...@gmail.com>
---
 .../testing/performance/tests/ScanExecutorPT.java  | 72 +++++++++++-----------
 1 file changed, 37 insertions(+), 35 deletions(-)

diff --git 
a/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java
 
b/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java
index 687308d..9554e5a 100644
--- 
a/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java
+++ 
b/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java
@@ -23,7 +23,6 @@ import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -44,7 +43,6 @@ import org.apache.accumulo.testing.performance.util.TestData;
 import org.apache.accumulo.testing.performance.util.TestExecutor;
 import org.apache.hadoop.io.Text;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 
 public class ScanExecutorPT implements PerformanceTest {
@@ -59,10 +57,10 @@ public class ScanExecutorPT implements PerformanceTest {
   private static final String SCAN_EXECUTOR_THREADS = "2";
   private static final String SCAN_PRIORITIZER = 
"org.apache.accumulo.core.spi.scan.HintScanPrioritizer";
 
-  private static final String TEST_DESC = "Scan Executor Test.  Test running 
lots of short scans "
-      + "while long scans are running in the background.  Each short scan 
reads a random row and "
+  private static final String TEST_DESC = "Scan Executor Test. Test running 
lots of short scans "
+      + "while long scans are running in the background. Each short scan reads 
a random row and "
       + "family. Using execution hints, short scans are randomly either given 
a high priority or "
-      + "a dedicated executor.  If the scan prioritizer or dispatcher is not 
working properly, "
+      + "a dedicated executor. If the scan prioritizer or dispatcher is not 
working properly, "
       + "then the short scans will be orders of magnitude slower.";
 
   @Override
@@ -71,14 +69,16 @@ public class ScanExecutorPT implements PerformanceTest {
 
     siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200");
     siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200");
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads",
-        SCAN_EXECUTOR_THREADS);
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + 
"se1.prioritizer",
-        SCAN_PRIORITIZER);
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads",
-        SCAN_EXECUTOR_THREADS);
-    siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + 
"se2.prioritizer",
-        SCAN_PRIORITIZER);
+
+    final String tservSEprefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey();
+
+    siteCfg.put(tservSEprefix + "se1.threads", SCAN_EXECUTOR_THREADS);
+    siteCfg.put(tservSEprefix + "se1.prioritizer", SCAN_PRIORITIZER);
+    siteCfg.put(tservSEprefix + "se1.prioritizer.opts.priority.short", "1");
+    siteCfg.put(tservSEprefix + "se1.prioritizer.opts.priority.long", "2");
+
+    siteCfg.put(tservSEprefix + "se2.threads", SCAN_EXECUTOR_THREADS);
+    siteCfg.put(tservSEprefix + "se2.prioritizer", SCAN_PRIORITIZER);
 
     return new SystemConfiguration().setAccumuloConfig(siteCfg);
   }
@@ -86,28 +86,29 @@ public class ScanExecutorPT implements PerformanceTest {
   @Override
   public Report runTest(Environment env) throws Exception {
 
-    String tableName = "scept";
+    AccumuloClient client = env.getClient();
+
+    final String tableName = "scept";
 
     Map<String,String> props = new HashMap<>();
     props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "executor", 
"se1");
-    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "heed_hints", 
"true");
+    props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + 
"executor.dedicated", "se2");
     props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
 
-    env.getClient().tableOperations().create(tableName,
-        new NewTableConfiguration().setProperties(props));
+    client.tableOperations().create(tableName, new 
NewTableConfiguration().setProperties(props));
 
     long t1 = System.currentTimeMillis();
-    TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, 
NUM_QUALS);
+    TestData.generate(client, tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS);
     long t2 = System.currentTimeMillis();
-    env.getClient().tableOperations().compact(tableName, null, null, true, 
true);
+    client.tableOperations().compact(tableName, null, null, true, true);
     long t3 = System.currentTimeMillis();
 
     AtomicBoolean stop = new AtomicBoolean(false);
 
-    TestExecutor<Long> longScans = startLongScans(env, tableName, stop);
+    TestExecutor<Long> longScans = startLongScans(client, tableName, stop);
 
-    LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000);
-    LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000);
+    LongSummaryStatistics shortStats1 = runShortScans(client, tableName, 
50000);
+    LongSummaryStatistics shortStats2 = runShortScans(client, tableName, 
100000);
 
     stop.set(true);
     long t4 = System.currentTimeMillis();
@@ -121,21 +122,21 @@ public class ScanExecutorPT implements PerformanceTest {
     final String ms = TimeUnit.MILLISECONDS.toString();
 
     builder.id("sexec").description(TEST_DESC);
-    builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, 
"entries/sec",
+    builder.info("write_rate", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, 
"entries/sec",
         "Data write rate");
-    builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, 
"entries/sec",
+    builder.info("compact_rate", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, 
"entries/sec",
         "Compact rate");
     builder.info("short_times1", shortStats1, ms, "Duration of each short scan 
from first run.");
     builder.info("short_times2", shortStats2, ms, "Duration of each short scan 
from second run.");
-    builder.result("short", shortStats2.getAverage(), ms,
+    builder.result("short_average", shortStats2.getAverage(), ms,
         "Average duration of short scans from second run.");
     builder.info("long_counts", longStats, "entries read",
         "Entries read by each long scan threads");
-    builder.info("long", longStats.getSum(), (t4 - t3), "entries/sec",
+    builder.info("long_rate", longStats.getSum(), (t4 - t3), "entries/sec",
         "Combined rate of all long scans");
     builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used 
to run short scans.");
     builder.parameter("long_threads", NUM_LONG_SCANS,
-        "Threads running long scans.  Each thread repeatedly scans entire 
table for duration of test.");
+        "Threads running long scans. Each thread repeatedly scans entire table 
for duration of test.");
     builder.parameter("rows", NUM_ROWS, "Rows in test table");
     builder.parameter("families", NUM_FAMS, "Families per row in test table");
     builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test 
table");
@@ -178,11 +179,11 @@ public class ScanExecutorPT implements PerformanceTest {
     return count;
   }
 
-  private LongSummaryStatistics runShortScans(Environment env, String 
tableName, int numScans)
-      throws InterruptedException, ExecutionException {
+  private LongSummaryStatistics runShortScans(AccumuloClient client, String 
tableName,
+      int numScans) {
 
-    Map<String,String> execHints = ImmutableMap.of("executor", "se2");
-    Map<String,String> prioHints = ImmutableMap.of("priority", "1");
+    Map<String,String> execHints = Map.of("scan_type", "dedicated");
+    Map<String,String> prioHints = Map.of("scan_type", "short");
 
     try (TestExecutor<Long> executor = new 
TestExecutor<>(NUM_SHORT_SCANS_THREADS)) {
       Random rand = new Random();
@@ -193,20 +194,21 @@ public class ScanExecutorPT implements PerformanceTest {
         // scans have a 20% chance of getting dedicated thread pool and 80% 
chance of getting high
         // priority
         Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : 
prioHints;
-        executor.submit(() -> scan(tableName, env.getClient(), row, fam, 
hints));
+        executor.submit(() -> scan(tableName, client, row, fam, hints));
       }
 
       return executor.stream().mapToLong(l -> l).summaryStatistics();
     }
   }
 
-  private TestExecutor<Long> startLongScans(Environment env, String tableName, 
AtomicBoolean stop) {
-    Map<String,String> hints = ImmutableMap.of("priority", "2");
+  private TestExecutor<Long> startLongScans(AccumuloClient client, String 
tableName,
+      AtomicBoolean stop) {
+    Map<String,String> hints = Map.of("scan_type", "long");
 
     TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS);
 
     for (int i = 0; i < NUM_LONG_SCANS; i++) {
-      longScans.submit(() -> scan(tableName, env.getClient(), stop, hints));
+      longScans.submit(() -> scan(tableName, client, stop, hints));
     }
     return longScans;
   }

Reply via email to