This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new c8ef784  Adding Accumulo Availability Monitor :
c8ef784 is described below

commit c8ef7846b4426c5cb86632ade692b0ff8aa23977
Author: Tushar D <tudha...@microsoft.com>
AuthorDate: Wed Nov 13 18:17:32 2019 -0800

    Adding Accumulo Availability Monitor :
    
    Monitor.Java : This file includes core code for scan operation that 
randomly picks a tablet server, identifies its minimum and maximum rows range 
and picks out 'distance' (configurable through opts) number of rows to be 
scanned in one iteration.
    
    MonitorOpts.java : This file is used to provide all the parameters which 
are taken by the Monitor Class. Interesting Options Include : Ability to set 
scanner sleep time , between iterations. Setting Batch Size that scanner uses 
internally. Setting number of iteration or continuous mode.
    
    monitor script : This follows same pattern as cingest, rwalk, performance 
scripts.
    
    Caching Splits
    
    Schema Agnostic Healthprobe
    
    Added Changes to make the healthprobe schema agnostic, while retaining the 
functionality to scan configurable number of rows. Rows are scanned from 
beginning of the tablet. Tablet information is loaded once and cached for later 
iterations.
    
    Signed-off-by: Marc Parisi <phroc...@apache.org>
---
 Dockerfile                                         |   1 +
 README.md                                          |   7 +
 src/main/docker/docker-entry => bin/monitor        |  34 ++--
 src/main/docker/docker-entry                       |   3 +-
 .../accumulo/testing/healthprobe/Monitor.java      | 172 +++++++++++++++++++++
 .../accumulo/testing/healthprobe/MonitorOpts.java  |  50 ++++++
 6 files changed, 246 insertions(+), 21 deletions(-)

diff --git a/Dockerfile b/Dockerfile
index b544020..36afd92 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -43,6 +43,7 @@ COPY ./bin/build /opt/at/bin
 COPY ./bin/cingest /opt/at/bin
 COPY ./bin/rwalk /opt/at/bin
 COPY ./bin/gcs /opt/at/bin
+COPY ./bin/monitor /opt/at/bin
 COPY ./src/main/docker/docker-entry /opt/at/bin
 
 COPY ./target/accumulo-testing-shaded.jar /opt/at/
diff --git a/README.md b/README.md
index e5e4f78..eadf19d 100644
--- a/README.md
+++ b/README.md
@@ -32,6 +32,7 @@ Tests are run using the following scripts in `bin/`:
   * `performance` - Runs performance test
   * `agitator` - Runs agitator
   * `gcs` - Runs garbage collection simultation
+  * `monitor` - Runs availability monitor probe
 
 Run the scripts without arguments to view usage.
 
@@ -43,6 +44,7 @@ run in Docker:
 
   * `cingest` - All applications can be run except `verify` & `moru` which 
launch a MapReduce job.
   * `rwalk` - All modules can be run.
+  * `monitor` - All modules can be run.
 
 1. To create the `accumulo-testing` docker image, make sure the following 
files exist in your clone:
 
@@ -238,6 +240,11 @@ test and produce json result files.
 There are some utilities for working with the json result files, run the 
`performance` script
 with no options to see them.
 
+## Availability Monitor 
+Monitor class aims at verifying availability of overall accumulo cluster by 
continually doing
+scans of random values across various tablet servers and capturing timing
+information related to how long such scans take. 
+
 ## Automated Cluster Testing
 See the [readme.md](/test/automation/README.md).
 
diff --git a/src/main/docker/docker-entry b/bin/monitor
similarity index 63%
copy from src/main/docker/docker-entry
copy to bin/monitor
index 3b05bd5..eb606a8 100755
--- a/src/main/docker/docker-entry
+++ b/bin/monitor
@@ -16,42 +16,36 @@
 # limitations under the License.
 
 bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
-at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
+source "${bin_dir}/build"
 
 function print_usage() {
   cat <<EOF
 
-Usage: accumulo-testing <script> (<argument>)
+Usage: monitor <application> {-o test.<prop>=<value>}
 
-  Available scripts:
+Available applications:
+
+    readprobe   Runs a probe that scans random keys and reports scan timings
 
-  cingest     Runs continuous ingest script
-  rwalk       Runs random walk script
-  gcs         Runs garbage collection simulation
 EOF
 }
 
 if [ -z "$1" ]; then
-  echo "ERROR: <script> needs to be set"
+  echo "ERROR: <application> needs to be set"
   print_usage
   exit 1
 fi
 
-if [ -z "$HADOOP_HOME" ]; then
-  echo "HADOOP_HOME must be set!"
-  exit 1
-fi
-
+ci_package="org.apache.accumulo.testing.healthprobe"
 case "$1" in
-  cingest|rwalk|gcs)
-    "${at_home}"/bin/"$1" "${@:2}"
-    ;;
-  -h|help)
-    print_usage
-    exit 1
+  readprobe)
+    ci_main="${ci_package}.Monitor"
     ;;
   *)
-    echo "ERROR - unknown <script>: $1"
+    echo "Unknown application: $1"
+    print_usage
     exit 1
-    ;;
 esac
+
+export 
CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
+java $JAVA_OPTS -DINSTRUMENTATIONKEY="$INSTRUMENTATIONKEY" 
-Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "-c" 
"$ACCUMULO_CLIENT_PROPS"
\ No newline at end of file
diff --git a/src/main/docker/docker-entry b/src/main/docker/docker-entry
index 3b05bd5..f1e3ab1 100755
--- a/src/main/docker/docker-entry
+++ b/src/main/docker/docker-entry
@@ -28,6 +28,7 @@ Usage: accumulo-testing <script> (<argument>)
   cingest     Runs continuous ingest script
   rwalk       Runs random walk script
   gcs         Runs garbage collection simulation
+  monitor     Runs a readprobe for monitoring read timings
 EOF
 }
 
@@ -43,7 +44,7 @@ if [ -z "$HADOOP_HOME" ]; then
 fi
 
 case "$1" in
-  cingest|rwalk|gcs)
+  cingest|rwalk|gcs|monitor)
     "${at_home}"/bin/"$1" "${@:2}"
     ;;
   -h|help)
diff --git a/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java 
b/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java
new file mode 100644
index 0000000..a1d097e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.healthprobe;
+
+import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.Locations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+public class Monitor {
+
+  private static final Logger log = LoggerFactory.getLogger(Monitor.class);
+  static long distance = 1l;
+  static List<TabletId> tablets;
+  static boolean cacheTablets = true;
+
+  public static void main(String[] args) throws Exception {
+    MonitorOpts opts = new MonitorOpts();
+    opts.parseArgs(Monitor.class.getName(), args);
+    distance = opts.distance;
+    Authorizations auth = new Authorizations();
+    if (opts.auth != "") {
+      auth = new Authorizations(opts.auth);
+    }
+
+    try (AccumuloClient client = 
Accumulo.newClient().from(opts.getClientProps()).build();
+        Scanner scanner = client.createScanner(opts.tableName, auth)) {
+      if (opts.isolate) {
+        scanner.enableIsolation();
+      }
+      int scannerSleepMs = opts.sleep_ms;
+      LoopControl scanning_condition = opts.continuous ? new 
ContinuousLoopControl()
+          : new IterativeLoopControl(opts.scan_iterations);
+
+      while (scanning_condition.keepScanning()) {
+
+        Random tablet_index_generator = new Random();
+        TabletId pickedTablet = pickTablet(client.tableOperations(), 
opts.tableName,
+            tablet_index_generator);
+        Range range = pickedTablet.toRange();
+        scanner.setRange(range);
+
+        if (opts.batch_size > 0) {
+          scanner.setBatchSize(opts.batch_size);
+        }
+        try {
+          long startTime = System.nanoTime();
+          long count = consume(scanner, opts.distance);
+          long stopTime = System.nanoTime();
+          MDC.put("StartTime", String.valueOf(startTime));
+          MDC.put("TabletId", String.valueOf(pickedTablet));
+          MDC.put("TableName", String.valueOf(opts.tableName));
+          MDC.put("TotalTime", String.valueOf((stopTime - startTime)));
+          MDC.put("StartRow", String.valueOf(range.getStartKey()));
+          MDC.put("EndRow", String.valueOf(range.getEndKey()));
+          MDC.put("TotalRecords", String.valueOf(count));
+
+          log.info("SCN starttime={} tabletindex={} tablename={} totaltime={} 
totalrecords={}",
+              startTime, tablet_index_generator, opts.tableName, (stopTime - 
startTime), count);
+          if (scannerSleepMs > 0) {
+            sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS);
+          }
+        } catch (Exception e) {
+          log.error(String.format(
+              "Exception while scanning range %s. Check the state of Accumulo 
for errors.", range),
+              e);
+        }
+      }
+    }
+  }
+
+  public static int consume(Iterable<Entry<Key,Value>> scanner, long 
numberOfRows) {
+    RowIterator rowIter = new RowIterator(scanner);
+    int count = 0;
+
+    while (rowIter.hasNext()) {
+      Iterator<Entry<Key,Value>> itr = rowIter.next();
+      count++;
+      if (count >= numberOfRows) {
+        break;
+      }
+    }
+    return count;
+  }
+
+  public static TabletId pickTablet(TableOperations tops, String table, Random 
r)
+      throws TableNotFoundException, AccumuloSecurityException, 
AccumuloException {
+
+    if (cacheTablets) {
+      Locations locations = tops.locate(table, Collections.singleton(new 
Range()));
+      tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet());
+      cacheTablets = false;
+    }
+    int index = r.nextInt(tablets.size());
+    TabletId tabletId = tablets.get(index);
+    return tabletId;
+  }
+
+  /*
+   * These interfaces + implementations are used to determine how many times 
the scanner should look
+   * up a random tablet and scan it.
+   */
+  static interface LoopControl {
+    public boolean keepScanning();
+  }
+
+  // Does a finite number of iterations
+  static class IterativeLoopControl implements LoopControl {
+    private final int max;
+    private int current;
+
+    public IterativeLoopControl(int max) {
+      this.max = max;
+      this.current = 0;
+    }
+
+    @Override
+    public boolean keepScanning() {
+      if (current < max) {
+        ++current;
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  // Does an infinite number of iterations
+  static class ContinuousLoopControl implements LoopControl {
+    @Override
+    public boolean keepScanning() {
+      return true;
+    }
+  }
+}
diff --git 
a/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java 
b/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java
new file mode 100644
index 0000000..18a3002
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.healthprobe;
+
+import org.apache.accumulo.testing.cli.ClientOpts;
+
+import com.beust.jcommander.Parameter;
+
+class MonitorOpts extends ClientOpts {
+  @Parameter(names = {"-t", "--table"}, description = "table to use")
+  String tableName = "";
+
+  @Parameter(names = "--isolate",
+      description = "true to turn on scan isolation, false to turn off. 
default is false.")
+  boolean isolate = false;
+
+  @Parameter(names = "--num-iterations", description = "number of scan 
iterations")
+  int scan_iterations = 1024;
+
+  @Parameter(names = "--continuous",
+      description = "continuously scan the table. note that this overrides 
--num-iterations")
+  boolean continuous = false;
+
+  @Parameter(names = "--scan-batch-size", description = "scanner batch size")
+  int batch_size = 5;
+
+  @Parameter(names = "--scanner-sleep-ms", description = "scanner sleep 
interval in ms")
+  int sleep_ms = 60000;
+
+  @Parameter(names = "--num-of-rows-per-iteration",
+      description = "Number of rows scanned together in one iteration of 
scanner consumption")
+  long distance = 10l;
+
+  @Parameter(names = "--auth", description = "Provide a auth that can access 
all/most rows")
+  String auth = "";
+}

Reply via email to