This is an automated email from the ASF dual-hosted git repository. phrocker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/master by this push: new c8ef784 Adding Accumulo Availability Monitor : c8ef784 is described below commit c8ef7846b4426c5cb86632ade692b0ff8aa23977 Author: Tushar D <tudha...@microsoft.com> AuthorDate: Wed Nov 13 18:17:32 2019 -0800 Adding Accumulo Availability Monitor : Monitor.Java : This file includes core code for scan operation that randomly picks a tablet server, identifies its minimum and maximum rows range and picks out 'distance' (configurable through opts) number of rows to be scanned in one iteration. MonitorOpts.java : This file is used to provide all the parameters which are taken by the Monitor Class. Interesting Options Include : Ability to set scanner sleep time , between iterations. Setting Batch Size that scanner uses internally. Setting number of iteration or continuous mode. monitor script : This follows same pattern as cingest, rwalk, performance scripts. Caching Splits Schema Agnostic Healthprobe Added Changes to make the healthprobe schema agnostic, while retaining the functionality to scan configurable number of rows. Rows are scanned from beginning of the tablet. Tablet information is loaded once and cached for later iterations. Signed-off-by: Marc Parisi <phroc...@apache.org> --- Dockerfile | 1 + README.md | 7 + src/main/docker/docker-entry => bin/monitor | 34 ++-- src/main/docker/docker-entry | 3 +- .../accumulo/testing/healthprobe/Monitor.java | 172 +++++++++++++++++++++ .../accumulo/testing/healthprobe/MonitorOpts.java | 50 ++++++ 6 files changed, 246 insertions(+), 21 deletions(-) diff --git a/Dockerfile b/Dockerfile index b544020..36afd92 100644 --- a/Dockerfile +++ b/Dockerfile @@ -43,6 +43,7 @@ COPY ./bin/build /opt/at/bin COPY ./bin/cingest /opt/at/bin COPY ./bin/rwalk /opt/at/bin COPY ./bin/gcs /opt/at/bin +COPY ./bin/monitor /opt/at/bin COPY ./src/main/docker/docker-entry /opt/at/bin COPY ./target/accumulo-testing-shaded.jar /opt/at/ diff --git a/README.md b/README.md index e5e4f78..eadf19d 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ Tests are run using the following scripts in `bin/`: * `performance` - Runs performance test * `agitator` - Runs agitator * `gcs` - Runs garbage collection simultation + * `monitor` - Runs availability monitor probe Run the scripts without arguments to view usage. @@ -43,6 +44,7 @@ run in Docker: * `cingest` - All applications can be run except `verify` & `moru` which launch a MapReduce job. * `rwalk` - All modules can be run. + * `monitor` - All modules can be run. 1. To create the `accumulo-testing` docker image, make sure the following files exist in your clone: @@ -238,6 +240,11 @@ test and produce json result files. There are some utilities for working with the json result files, run the `performance` script with no options to see them. +## Availability Monitor +Monitor class aims at verifying availability of overall accumulo cluster by continually doing +scans of random values across various tablet servers and capturing timing +information related to how long such scans take. + ## Automated Cluster Testing See the [readme.md](/test/automation/README.md). diff --git a/src/main/docker/docker-entry b/bin/monitor similarity index 63% copy from src/main/docker/docker-entry copy to bin/monitor index 3b05bd5..eb606a8 100755 --- a/src/main/docker/docker-entry +++ b/bin/monitor @@ -16,42 +16,36 @@ # limitations under the License. bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) -at_home=$( cd "$( dirname "$bin_dir" )" && pwd ) +source "${bin_dir}/build" function print_usage() { cat <<EOF -Usage: accumulo-testing <script> (<argument>) +Usage: monitor <application> {-o test.<prop>=<value>} - Available scripts: +Available applications: + + readprobe Runs a probe that scans random keys and reports scan timings - cingest Runs continuous ingest script - rwalk Runs random walk script - gcs Runs garbage collection simulation EOF } if [ -z "$1" ]; then - echo "ERROR: <script> needs to be set" + echo "ERROR: <application> needs to be set" print_usage exit 1 fi -if [ -z "$HADOOP_HOME" ]; then - echo "HADOOP_HOME must be set!" - exit 1 -fi - +ci_package="org.apache.accumulo.testing.healthprobe" case "$1" in - cingest|rwalk|gcs) - "${at_home}"/bin/"$1" "${@:2}" - ;; - -h|help) - print_usage - exit 1 + readprobe) + ci_main="${ci_package}.Monitor" ;; *) - echo "ERROR - unknown <script>: $1" + echo "Unknown application: $1" + print_usage exit 1 - ;; esac + +export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH" +java $JAVA_OPTS -DINSTRUMENTATIONKEY="$INSTRUMENTATIONKEY" -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "-c" "$ACCUMULO_CLIENT_PROPS" \ No newline at end of file diff --git a/src/main/docker/docker-entry b/src/main/docker/docker-entry index 3b05bd5..f1e3ab1 100755 --- a/src/main/docker/docker-entry +++ b/src/main/docker/docker-entry @@ -28,6 +28,7 @@ Usage: accumulo-testing <script> (<argument>) cingest Runs continuous ingest script rwalk Runs random walk script gcs Runs garbage collection simulation + monitor Runs a readprobe for monitoring read timings EOF } @@ -43,7 +44,7 @@ if [ -z "$HADOOP_HOME" ]; then fi case "$1" in - cingest|rwalk|gcs) + cingest|rwalk|gcs|monitor) "${at_home}"/bin/"$1" "${@:2}" ;; -h|help) diff --git a/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java b/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java new file mode 100644 index 0000000..a1d097e --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/healthprobe/Monitor.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.healthprobe; + +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.Locations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +public class Monitor { + + private static final Logger log = LoggerFactory.getLogger(Monitor.class); + static long distance = 1l; + static List<TabletId> tablets; + static boolean cacheTablets = true; + + public static void main(String[] args) throws Exception { + MonitorOpts opts = new MonitorOpts(); + opts.parseArgs(Monitor.class.getName(), args); + distance = opts.distance; + Authorizations auth = new Authorizations(); + if (opts.auth != "") { + auth = new Authorizations(opts.auth); + } + + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build(); + Scanner scanner = client.createScanner(opts.tableName, auth)) { + if (opts.isolate) { + scanner.enableIsolation(); + } + int scannerSleepMs = opts.sleep_ms; + LoopControl scanning_condition = opts.continuous ? new ContinuousLoopControl() + : new IterativeLoopControl(opts.scan_iterations); + + while (scanning_condition.keepScanning()) { + + Random tablet_index_generator = new Random(); + TabletId pickedTablet = pickTablet(client.tableOperations(), opts.tableName, + tablet_index_generator); + Range range = pickedTablet.toRange(); + scanner.setRange(range); + + if (opts.batch_size > 0) { + scanner.setBatchSize(opts.batch_size); + } + try { + long startTime = System.nanoTime(); + long count = consume(scanner, opts.distance); + long stopTime = System.nanoTime(); + MDC.put("StartTime", String.valueOf(startTime)); + MDC.put("TabletId", String.valueOf(pickedTablet)); + MDC.put("TableName", String.valueOf(opts.tableName)); + MDC.put("TotalTime", String.valueOf((stopTime - startTime))); + MDC.put("StartRow", String.valueOf(range.getStartKey())); + MDC.put("EndRow", String.valueOf(range.getEndKey())); + MDC.put("TotalRecords", String.valueOf(count)); + + log.info("SCN starttime={} tabletindex={} tablename={} totaltime={} totalrecords={}", + startTime, tablet_index_generator, opts.tableName, (stopTime - startTime), count); + if (scannerSleepMs > 0) { + sleepUninterruptibly(scannerSleepMs, TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + log.error(String.format( + "Exception while scanning range %s. Check the state of Accumulo for errors.", range), + e); + } + } + } + } + + public static int consume(Iterable<Entry<Key,Value>> scanner, long numberOfRows) { + RowIterator rowIter = new RowIterator(scanner); + int count = 0; + + while (rowIter.hasNext()) { + Iterator<Entry<Key,Value>> itr = rowIter.next(); + count++; + if (count >= numberOfRows) { + break; + } + } + return count; + } + + public static TabletId pickTablet(TableOperations tops, String table, Random r) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { + + if (cacheTablets) { + Locations locations = tops.locate(table, Collections.singleton(new Range())); + tablets = new ArrayList<TabletId>(locations.groupByTablet().keySet()); + cacheTablets = false; + } + int index = r.nextInt(tablets.size()); + TabletId tabletId = tablets.get(index); + return tabletId; + } + + /* + * These interfaces + implementations are used to determine how many times the scanner should look + * up a random tablet and scan it. + */ + static interface LoopControl { + public boolean keepScanning(); + } + + // Does a finite number of iterations + static class IterativeLoopControl implements LoopControl { + private final int max; + private int current; + + public IterativeLoopControl(int max) { + this.max = max; + this.current = 0; + } + + @Override + public boolean keepScanning() { + if (current < max) { + ++current; + return true; + } else { + return false; + } + } + } + + // Does an infinite number of iterations + static class ContinuousLoopControl implements LoopControl { + @Override + public boolean keepScanning() { + return true; + } + } +} diff --git a/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java b/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java new file mode 100644 index 0000000..18a3002 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/healthprobe/MonitorOpts.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.testing.healthprobe; + +import org.apache.accumulo.testing.cli.ClientOpts; + +import com.beust.jcommander.Parameter; + +class MonitorOpts extends ClientOpts { + @Parameter(names = {"-t", "--table"}, description = "table to use") + String tableName = ""; + + @Parameter(names = "--isolate", + description = "true to turn on scan isolation, false to turn off. default is false.") + boolean isolate = false; + + @Parameter(names = "--num-iterations", description = "number of scan iterations") + int scan_iterations = 1024; + + @Parameter(names = "--continuous", + description = "continuously scan the table. note that this overrides --num-iterations") + boolean continuous = false; + + @Parameter(names = "--scan-batch-size", description = "scanner batch size") + int batch_size = 5; + + @Parameter(names = "--scanner-sleep-ms", description = "scanner sleep interval in ms") + int sleep_ms = 60000; + + @Parameter(names = "--num-of-rows-per-iteration", + description = "Number of rows scanned together in one iteration of scanner consumption") + long distance = 10l; + + @Parameter(names = "--auth", description = "Provide a auth that can access all/most rows") + String auth = ""; +}