This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new c2c2408 Fix failing ScanExecutorPT (#163) c2c2408 is described below commit c2c240848a02bc25e400d8f2b812638e02448d46 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Tue Oct 5 10:30:11 2021 -0400 Fix failing ScanExecutorPT (#163) * replaced outdated hinting Co-authored-by: Keith Turner <ktur...@apache.org> Co-authored-by: Jeffrey Manno <jeffreymann...@gmail.com> --- .../testing/performance/tests/ScanExecutorPT.java | 72 +++++++++++----------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java b/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java index 687308d..9554e5a 100644 --- a/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java +++ b/src/main/java/org/apache/accumulo/testing/performance/tests/ScanExecutorPT.java @@ -23,7 +23,6 @@ import java.util.LongSummaryStatistics; import java.util.Map; import java.util.Map.Entry; import java.util.Random; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -44,7 +43,6 @@ import org.apache.accumulo.testing.performance.util.TestData; import org.apache.accumulo.testing.performance.util.TestExecutor; import org.apache.hadoop.io.Text; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class ScanExecutorPT implements PerformanceTest { @@ -59,10 +57,10 @@ public class ScanExecutorPT implements PerformanceTest { private static final String SCAN_EXECUTOR_THREADS = "2"; private static final String SCAN_PRIORITIZER = "org.apache.accumulo.core.spi.scan.HintScanPrioritizer"; - private static final String TEST_DESC = "Scan Executor Test. Test running lots of short scans " - + "while long scans are running in the background. Each short scan reads a random row and " + private static final String TEST_DESC = "Scan Executor Test. Test running lots of short scans " + + "while long scans are running in the background. Each short scan reads a random row and " + "family. Using execution hints, short scans are randomly either given a high priority or " - + "a dedicated executor. If the scan prioritizer or dispatcher is not working properly, " + + "a dedicated executor. If the scan prioritizer or dispatcher is not working properly, " + "then the short scans will be orders of magnitude slower."; @Override @@ -71,14 +69,16 @@ public class ScanExecutorPT implements PerformanceTest { siteCfg.put(Property.TSERV_SCAN_MAX_OPENFILES.getKey(), "200"); siteCfg.put(Property.TSERV_MINTHREADS.getKey(), "200"); - siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.threads", - SCAN_EXECUTOR_THREADS); - siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se1.prioritizer", - SCAN_PRIORITIZER); - siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.threads", - SCAN_EXECUTOR_THREADS); - siteCfg.put(Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey() + "se2.prioritizer", - SCAN_PRIORITIZER); + + final String tservSEprefix = Property.TSERV_SCAN_EXECUTORS_PREFIX.getKey(); + + siteCfg.put(tservSEprefix + "se1.threads", SCAN_EXECUTOR_THREADS); + siteCfg.put(tservSEprefix + "se1.prioritizer", SCAN_PRIORITIZER); + siteCfg.put(tservSEprefix + "se1.prioritizer.opts.priority.short", "1"); + siteCfg.put(tservSEprefix + "se1.prioritizer.opts.priority.long", "2"); + + siteCfg.put(tservSEprefix + "se2.threads", SCAN_EXECUTOR_THREADS); + siteCfg.put(tservSEprefix + "se2.prioritizer", SCAN_PRIORITIZER); return new SystemConfiguration().setAccumuloConfig(siteCfg); } @@ -86,28 +86,29 @@ public class ScanExecutorPT implements PerformanceTest { @Override public Report runTest(Environment env) throws Exception { - String tableName = "scept"; + AccumuloClient client = env.getClient(); + + final String tableName = "scept"; Map<String,String> props = new HashMap<>(); props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "executor", "se1"); - props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "heed_hints", "true"); + props.put(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey() + "executor.dedicated", "se2"); props.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); - env.getClient().tableOperations().create(tableName, - new NewTableConfiguration().setProperties(props)); + client.tableOperations().create(tableName, new NewTableConfiguration().setProperties(props)); long t1 = System.currentTimeMillis(); - TestData.generate(env.getClient(), tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS); + TestData.generate(client, tableName, NUM_ROWS, NUM_FAMS, NUM_QUALS); long t2 = System.currentTimeMillis(); - env.getClient().tableOperations().compact(tableName, null, null, true, true); + client.tableOperations().compact(tableName, null, null, true, true); long t3 = System.currentTimeMillis(); AtomicBoolean stop = new AtomicBoolean(false); - TestExecutor<Long> longScans = startLongScans(env, tableName, stop); + TestExecutor<Long> longScans = startLongScans(client, tableName, stop); - LongSummaryStatistics shortStats1 = runShortScans(env, tableName, 50000); - LongSummaryStatistics shortStats2 = runShortScans(env, tableName, 100000); + LongSummaryStatistics shortStats1 = runShortScans(client, tableName, 50000); + LongSummaryStatistics shortStats2 = runShortScans(client, tableName, 100000); stop.set(true); long t4 = System.currentTimeMillis(); @@ -121,21 +122,21 @@ public class ScanExecutorPT implements PerformanceTest { final String ms = TimeUnit.MILLISECONDS.toString(); builder.id("sexec").description(TEST_DESC); - builder.info("write", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "entries/sec", + builder.info("write_rate", NUM_ROWS * NUM_FAMS * NUM_QUALS, t2 - t1, "entries/sec", "Data write rate"); - builder.info("compact", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "entries/sec", + builder.info("compact_rate", NUM_ROWS * NUM_FAMS * NUM_QUALS, t3 - t2, "entries/sec", "Compact rate"); builder.info("short_times1", shortStats1, ms, "Duration of each short scan from first run."); builder.info("short_times2", shortStats2, ms, "Duration of each short scan from second run."); - builder.result("short", shortStats2.getAverage(), ms, + builder.result("short_average", shortStats2.getAverage(), ms, "Average duration of short scans from second run."); builder.info("long_counts", longStats, "entries read", "Entries read by each long scan threads"); - builder.info("long", longStats.getSum(), (t4 - t3), "entries/sec", + builder.info("long_rate", longStats.getSum(), (t4 - t3), "entries/sec", "Combined rate of all long scans"); builder.parameter("short_threads", NUM_SHORT_SCANS_THREADS, "Threads used to run short scans."); builder.parameter("long_threads", NUM_LONG_SCANS, - "Threads running long scans. Each thread repeatedly scans entire table for duration of test."); + "Threads running long scans. Each thread repeatedly scans entire table for duration of test."); builder.parameter("rows", NUM_ROWS, "Rows in test table"); builder.parameter("families", NUM_FAMS, "Families per row in test table"); builder.parameter("qualifiers", NUM_QUALS, "Qualifiers per family in test table"); @@ -178,11 +179,11 @@ public class ScanExecutorPT implements PerformanceTest { return count; } - private LongSummaryStatistics runShortScans(Environment env, String tableName, int numScans) - throws InterruptedException, ExecutionException { + private LongSummaryStatistics runShortScans(AccumuloClient client, String tableName, + int numScans) { - Map<String,String> execHints = ImmutableMap.of("executor", "se2"); - Map<String,String> prioHints = ImmutableMap.of("priority", "1"); + Map<String,String> execHints = Map.of("scan_type", "dedicated"); + Map<String,String> prioHints = Map.of("scan_type", "short"); try (TestExecutor<Long> executor = new TestExecutor<>(NUM_SHORT_SCANS_THREADS)) { Random rand = new Random(); @@ -193,20 +194,21 @@ public class ScanExecutorPT implements PerformanceTest { // scans have a 20% chance of getting dedicated thread pool and 80% chance of getting high // priority Map<String,String> hints = rand.nextInt(10) <= 1 ? execHints : prioHints; - executor.submit(() -> scan(tableName, env.getClient(), row, fam, hints)); + executor.submit(() -> scan(tableName, client, row, fam, hints)); } return executor.stream().mapToLong(l -> l).summaryStatistics(); } } - private TestExecutor<Long> startLongScans(Environment env, String tableName, AtomicBoolean stop) { - Map<String,String> hints = ImmutableMap.of("priority", "2"); + private TestExecutor<Long> startLongScans(AccumuloClient client, String tableName, + AtomicBoolean stop) { + Map<String,String> hints = Map.of("scan_type", "long"); TestExecutor<Long> longScans = new TestExecutor<>(NUM_LONG_SCANS); for (int i = 0; i < NUM_LONG_SCANS; i++) { - longScans.submit(() -> scan(tableName, env.getClient(), stop, hints)); + longScans.submit(() -> scan(tableName, client, stop, hints)); } return longScans; }