mikewalch closed pull request #14: Update batch example to use new Connector builder URL: https://github.com/apache/accumulo-examples/pull/14
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/README.md b/README.md index c4450eb..dbea091 100644 --- a/README.md +++ b/README.md @@ -34,10 +34,11 @@ Before running any of the examples, the following steps must be performed. git clone https://github.com/apache/accumulo-examples.git mvn clean package -4. Specify Accumulo connection information. All examples read connection information from a - properties file. Copy the template and edit it. +4. Specify Accumulo connection information in `conf/accumulo-client.properties`. Some old examples + still read connection information from an examples.conf file so that should also be configured. cd accumulo-examples + nano conf/accumulo-client.properties cp examples.conf.template examples.conf nano examples.conf diff --git a/docs/batch.md b/docs/batch.md index 19acf84..305a732 100644 --- a/docs/batch.md +++ b/docs/batch.md @@ -16,42 +16,40 @@ limitations under the License. --> # Apache Accumulo Batch Writing and Scanning Example -This tutorial uses the following Java classes: - - * [SequentialBatchWriter.java] - writes mutations with sequential rows and random values - * [RandomBatchWriter.java] - used by SequentialBatchWriter to generate random values - * [RandomBatchScanner.java] - reads random rows and verifies their values - This is an example of how to use the BatchWriter and BatchScanner. -First, you must ensure that the user you are running with (i.e `myuser` below) has the -`exampleVis` authorization. - - $ accumulo shell -u root -e "setauths -u myuser -s exampleVis" - -Second, you must create the table, batchtest1, ahead of time. - - $ accumulo shell -u root -e "createtable batchtest1" - -The command below adds 10000 entries with random 50 bytes values to Accumulo. +This tutorial uses the following Java classes. - $ ./bin/runex client.SequentialBatchWriter -c ./examples.conf -t batchtest1 --start 0 --num 10000 --size 50 --batchMemory 20M --batchLatency 500 --batchThreads 20 --vis exampleVis - -The command below will do 100 random queries. - - $ ./bin/runex client.RandomBatchScanner -c ./examples.conf -t batchtest1 --num 100 --min 0 --max 10000 --size 50 --scanThreads 20 --auths exampleVis - - 07 11:33:11,103 [client.CountingVerifyingReceiver] INFO : Generating 100 random queries... - 07 11:33:11,112 [client.CountingVerifyingReceiver] INFO : finished - 07 11:33:11,260 [client.CountingVerifyingReceiver] INFO : 694.44 lookups/sec 0.14 secs - - 07 11:33:11,260 [client.CountingVerifyingReceiver] INFO : num results : 100 - - 07 11:33:11,364 [client.CountingVerifyingReceiver] INFO : Generating 100 random queries... - 07 11:33:11,370 [client.CountingVerifyingReceiver] INFO : finished - 07 11:33:11,416 [client.CountingVerifyingReceiver] INFO : 2173.91 lookups/sec 0.05 secs + * [SequentialBatchWriter.java] - writes mutations with sequential rows and random values + * [RandomBatchScanner.java] - reads random rows and verifies their values - 07 11:33:11,416 [client.CountingVerifyingReceiver] INFO : num results : 100 +Run `SequentialBatchWriter` to add 10000 entries with random 50 bytes values to Accumulo. + + $ ./bin/runex client.SequentialBatchWriter + +Verify data was ingested by scanning the table using the Accumulo shell: + + $ accumulo shell + root@instance> table batch + root@instance batch> scan + +Run `RandomBatchScanner` to perform 1000 random queries and verify the results. + + $ ./bin/runex client.RandomBatchScanner + 16:04:05,950 [examples.client.RandomBatchScanner] INFO : Generating 1000 random ranges for BatchScanner to read + 16:04:06,020 [examples.client.RandomBatchScanner] INFO : Reading ranges using BatchScanner + 16:04:06,283 [examples.client.RandomBatchScanner] TRACE: 100 lookups + 16:04:06,290 [examples.client.RandomBatchScanner] TRACE: 200 lookups + 16:04:06,294 [examples.client.RandomBatchScanner] TRACE: 300 lookups + 16:04:06,297 [examples.client.RandomBatchScanner] TRACE: 400 lookups + 16:04:06,301 [examples.client.RandomBatchScanner] TRACE: 500 lookups + 16:04:06,304 [examples.client.RandomBatchScanner] TRACE: 600 lookups + 16:04:06,307 [examples.client.RandomBatchScanner] TRACE: 700 lookups + 16:04:06,309 [examples.client.RandomBatchScanner] TRACE: 800 lookups + 16:04:06,316 [examples.client.RandomBatchScanner] TRACE: 900 lookups + 16:04:06,320 [examples.client.RandomBatchScanner] TRACE: 1000 lookups + 16:04:06,330 [examples.client.RandomBatchScanner] INFO : Scan finished! 3246.75 lookups/sec, 0.31 secs, 1000 results + 16:04:06,331 [examples.client.RandomBatchScanner] INFO : All expected rows were scanned [SequentialBatchWriter.java]: ../src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java [RandomBatchWriter.java]: ../src/main/java/org/apache/accumulo/examples/client/RandomBatchWriter.java diff --git a/src/main/java/org/apache/accumulo/examples/client/CountingVerifyingReceiver.java b/src/main/java/org/apache/accumulo/examples/client/CountingVerifyingReceiver.java index 51fc370..ac5eb11 100644 --- a/src/main/java/org/apache/accumulo/examples/client/CountingVerifyingReceiver.java +++ b/src/main/java/org/apache/accumulo/examples/client/CountingVerifyingReceiver.java @@ -35,9 +35,9 @@ long count = 0; int expectedValueSize = 0; - HashMap<Text,Boolean> expectedRows; + HashMap<String,Boolean> expectedRows; - CountingVerifyingReceiver(HashMap<Text,Boolean> expectedRows, int expectedValueSize) { + CountingVerifyingReceiver(HashMap<String,Boolean> expectedRows, int expectedValueSize) { this.expectedRows = expectedRows; this.expectedValueSize = expectedValueSize; } @@ -56,7 +56,7 @@ public void receive(Key key, Value value) { if (!expectedRows.containsKey(key.getRow())) { log.error("Got unexpected key " + key); } else { - expectedRows.put(key.getRow(), true); + expectedRows.put(key.getRow().toString(), true); } count++; diff --git a/src/main/java/org/apache/accumulo/examples/client/RandomBatchScanner.java b/src/main/java/org/apache/accumulo/examples/client/RandomBatchScanner.java index ac32827..9a88d39 100644 --- a/src/main/java/org/apache/accumulo/examples/client/RandomBatchScanner.java +++ b/src/main/java/org/apache/accumulo/examples/client/RandomBatchScanner.java @@ -16,179 +16,102 @@ */ package org.apache.accumulo.examples.client; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.examples.client.RandomBatchWriter.abs; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.Map.Entry; import java.util.Random; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.examples.cli.BatchScannerOpts; -import org.apache.accumulo.examples.cli.ClientOnRequiredTable; -import org.apache.hadoop.io.Text; +import org.apache.accumulo.core.security.Authorizations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.beust.jcommander.Parameter; - /** * Simple example for reading random batches of data from Accumulo. */ public class RandomBatchScanner { - private static final Logger log = LoggerFactory.getLogger(RandomBatchScanner.class); - - /** - * Generate a number of ranges, each covering a single random row. - * - * @param num - * the number of ranges to generate - * @param min - * the minimum row that will be generated - * @param max - * the maximum row that will be generated - * @param r - * a random number generator - * @param ranges - * a set in which to store the generated ranges - * @param expectedRows - * a map in which to store the rows covered by the ranges (initially mapped to false) - */ - static void generateRandomQueries(int num, long min, long max, Random r, HashSet<Range> ranges, HashMap<Text,Boolean> expectedRows) { - log.info(String.format("Generating %,d random queries...", num)); - while (ranges.size() < num) { - long rowid = (abs(r.nextLong()) % (max - min)) + min; - Text row1 = new Text(String.format("row_%010d", rowid)); + private static final Logger log = LoggerFactory.getLogger(RandomBatchScanner.class); - Range range = new Range(new Text(row1)); - ranges.add(range); - expectedRows.put(row1, false); + public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build(); + try { + connector.tableOperations().create("batch"); + } catch (TableExistsException e) { + // ignore } - log.info("finished"); - } - - /** - * Prints a count of the number of rows mapped to false. - * - * @return boolean indicating "were all the rows found?" - */ - private static boolean checkAllRowsFound(HashMap<Text,Boolean> expectedRows) { - int count = 0; - boolean allFound = true; - for (Entry<Text,Boolean> entry : expectedRows.entrySet()) - if (!entry.getValue()) - count++; - - if (count > 0) { - log.warn("Did not find " + count + " rows"); - allFound = false; + int totalLookups = 1000; + int totalEntries = 10000; + Random r = new Random(); + HashSet<Range> ranges = new HashSet<>(); + HashMap<String,Boolean> expectedRows = new HashMap<>(); + log.info("Generating {} random ranges for BatchScanner to read", totalLookups); + while (ranges.size() < totalLookups) { + long rowId = abs(r.nextLong()) % totalEntries; + String row = String.format("row_%010d", rowId); + ranges.add(new Range(row)); + expectedRows.put(row, false); } - return allFound; - } - - /** - * Generates a number of random queries, verifies that the key/value pairs returned were in the queried ranges and that the values were generated by - * {@link RandomBatchWriter#createValue(long, int)}. Prints information about the results. - * - * @param num - * the number of queries to generate - * @param min - * the min row to query - * @param max - * the max row to query - * @param evs - * the expected size of the values - * @param r - * a random number generator - * @param tsbr - * a batch scanner - * @return boolean indicating "did the queries go fine?" - */ - static boolean doRandomQueries(int num, long min, long max, int evs, Random r, BatchScanner tsbr) { - - HashSet<Range> ranges = new HashSet<>(num); - HashMap<Text,Boolean> expectedRows = new java.util.HashMap<>(); - - generateRandomQueries(num, min, max, r, ranges, expectedRows); - - tsbr.setRanges(ranges); - - CountingVerifyingReceiver receiver = new CountingVerifyingReceiver(expectedRows, evs); long t1 = System.currentTimeMillis(); - - for (Entry<Key,Value> entry : tsbr) { - receiver.receive(entry.getKey(), entry.getValue()); + long lookups = 0; + + log.info("Reading ranges using BatchScanner"); + try (BatchScanner scan = connector.createBatchScanner("batch", Authorizations.EMPTY, 20)) { + scan.setRanges(ranges); + for (Entry<Key, Value> entry : scan) { + Key key = entry.getKey(); + Value value = entry.getValue(); + String row = key.getRow().toString(); + long rowId = Integer.parseInt(row.split("_")[1]); + + Value expectedValue = SequentialBatchWriter.createValue(rowId); + + if (!Arrays.equals(expectedValue.get(), value.get())) { + log.error("Unexpected value for key: {} expected: {} actual: {}", key, + new String(expectedValue.get(), UTF_8), new String(value.get(), UTF_8)); + } + + if (!expectedRows.containsKey(key.getRow().toString())) { + log.error("Encountered unexpected key: {} ", key); + } else { + expectedRows.put(key.getRow().toString(), true); + } + + lookups++; + if (lookups % 100 == 0) { + log.trace("{} lookups", lookups); + } + } } long t2 = System.currentTimeMillis(); + log.info(String.format("Scan finished! %6.2f lookups/sec, %.2f secs, %d results", + lookups / ((t2 - t1) / 1000.0), ((t2 - t1) / 1000.0), lookups)); - log.info(String.format("%6.2f lookups/sec %6.2f secs%n", num / ((t2 - t1) / 1000.0), ((t2 - t1) / 1000.0))); - log.info(String.format("num results : %,d%n", receiver.count)); - - return checkAllRowsFound(expectedRows); - } - - public static class Opts extends ClientOnRequiredTable { - @Parameter(names = "--min", description = "miniumum row that will be generated") - long min = 0; - @Parameter(names = "--max", description = "maximum ow that will be generated") - long max = 0; - @Parameter(names = "--num", required = true, description = "number of ranges to generate") - int num = 0; - @Parameter(names = "--size", required = true, description = "size of the value to write") - int size = 0; - @Parameter(names = "--seed", description = "seed for pseudo-random number generator") - Long seed = null; - } - - /** - * Scans over a specified number of entries to Accumulo using a {@link BatchScanner}. Completes scans twice to compare times for a fresh query with those for - * a repeated query which has cached metadata and connections already established. - */ - public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - Opts opts = new Opts(); - BatchScannerOpts bsOpts = new BatchScannerOpts(); - opts.parseArgs(RandomBatchScanner.class.getName(), args, bsOpts); - - Connector connector = opts.getConnector(); - BatchScanner batchReader = connector.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads); - batchReader.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS); - - Random r; - if (opts.seed == null) - r = new Random(); - else - r = new Random(opts.seed); - - // do one cold - boolean status = doRandomQueries(opts.num, opts.min, opts.max, opts.size, r, batchReader); - - System.gc(); - System.gc(); - System.gc(); - - if (opts.seed == null) - r = new Random(); - else - r = new Random(opts.seed); - - // do one hot (connections already established, metadata table cached) - status = status && doRandomQueries(opts.num, opts.min, opts.max, opts.size, r, batchReader); - - batchReader.close(); - if (!status) { + int count = 0; + for (Entry<String,Boolean> entry : expectedRows.entrySet()) { + if (!entry.getValue()) { + count++; + } + } + if (count > 0) { + log.warn("Did not find {} rows", count); System.exit(1); } + log.info("All expected rows were scanned"); } } diff --git a/src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java b/src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java index 9b57739..6f630f9 100644 --- a/src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java +++ b/src/main/java/org/apache/accumulo/examples/client/SequentialBatchWriter.java @@ -16,53 +16,63 @@ */ package org.apache.accumulo.examples.client; +import java.util.Random; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.security.ColumnVisibility; -import org.apache.accumulo.examples.cli.BatchWriterOpts; -import org.apache.accumulo.examples.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.data.Value; -import com.beust.jcommander.Parameter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Simple example for writing random data in sequential order to Accumulo. */ public class SequentialBatchWriter { - static class Opts extends ClientOnRequiredTable { - @Parameter(names = "--start") - long start = 0; - @Parameter(names = "--num", required = true) - long num = 0; - @Parameter(names = "--size", required = true, description = "size of the value to write") - int valueSize = 0; - @Parameter(names = "--vis", converter = VisibilityConverter.class) - ColumnVisibility vis = new ColumnVisibility(); + private static final Logger log = LoggerFactory.getLogger(SequentialBatchWriter.class); + + public static Value createValue(long rowId) { + Random r = new Random(rowId); + byte value[] = new byte[50]; + + r.nextBytes(value); + + // transform to printable chars + for (int j = 0; j < value.length; j++) { + value[j] = (byte) (((0xff & value[j]) % 92) + ' '); + } + + return new Value(value); } /** - * Writes a specified number of entries to Accumulo using a {@link BatchWriter}. The rows of the entries will be sequential starting at a specified number. - * The column families will be "foo" and column qualifiers will be "1". The values will be random byte arrays of a specified size. + * Writes 1000 entries to Accumulo using a {@link BatchWriter}. The rows of the entries will be sequential starting from 0. + * The column families will be "foo" and column qualifiers will be "1". The values will be random 50 byte arrays. */ - public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException { - Opts opts = new Opts(); - BatchWriterOpts bwOpts = new BatchWriterOpts(); - opts.parseArgs(SequentialBatchWriter.class.getName(), args, bwOpts); - Connector connector = opts.getConnector(); - BatchWriter bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()); - - long end = opts.start + opts.num; - - for (long i = opts.start; i < end; i++) { - Mutation m = RandomBatchWriter.createMutation(i, opts.valueSize, opts.vis); - bw.addMutation(m); + public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + Connector connector = Connector.builder().usingProperties("conf/accumulo-client.properties").build(); + try { + connector.tableOperations().create("batch"); + } catch (TableExistsException e) { + // ignore } - bw.close(); + try (BatchWriter bw = connector.createBatchWriter("batch")) { + for (int i = 0; i < 10000; i++) { + Mutation m = new Mutation(String.format("row_%010d", i)); + // create a random value that is a function of row id for verification purposes + m.put("foo", "1", createValue(i)); + bw.addMutation(m); + if (i % 1000 == 0) { + log.trace("wrote {} entries", i); + } + } + } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services