This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push: new 7453c37 Misc. improvements to randomwalk code (#223) 7453c37 is described below commit 7453c37b89e6bcc794285ff80afacf3b19197d9e Author: Dom G <domgargu...@apache.org> AuthorDate: Mon Sep 19 17:39:48 2022 -0400 Misc. improvements to randomwalk code (#223) --- .../apache/accumulo/testing/randomwalk/Module.java | 27 +++---- .../accumulo/testing/randomwalk/RandWalkEnv.java | 5 +- .../testing/randomwalk/bulk/BulkImportTest.java | 4 +- .../testing/randomwalk/bulk/BulkPlusOne.java | 49 ++++++------ .../accumulo/testing/randomwalk/bulk/Merge.java | 2 +- .../testing/randomwalk/bulk/SelectiveBulkTest.java | 16 ++-- .../testing/randomwalk/bulk/SelectiveQueueing.java | 8 +- .../accumulo/testing/randomwalk/bulk/Setup.java | 6 +- .../accumulo/testing/randomwalk/bulk/Split.java | 11 +-- .../accumulo/testing/randomwalk/bulk/Verify.java | 27 ++++--- .../testing/randomwalk/concurrent/AddSplits.java | 2 +- .../testing/randomwalk/concurrent/BatchScan.java | 17 ++-- .../testing/randomwalk/concurrent/BatchWrite.java | 25 +++--- .../testing/randomwalk/concurrent/BulkImport.java | 6 +- .../randomwalk/concurrent/ChangePermissions.java | 2 +- .../randomwalk/concurrent/CheckPermission.java | 4 +- .../testing/randomwalk/concurrent/CloneTable.java | 4 +- .../randomwalk/concurrent/ConcurrentFixture.java | 6 +- .../testing/randomwalk/concurrent/Config.java | 24 +++--- .../testing/randomwalk/concurrent/DeleteRange.java | 8 +- .../testing/randomwalk/concurrent/DeleteTable.java | 2 +- .../testing/randomwalk/concurrent/ListSplits.java | 2 +- .../randomwalk/concurrent/OfflineTable.java | 2 +- .../testing/randomwalk/concurrent/RenameTable.java | 2 +- .../testing/randomwalk/concurrent/ScanTable.java | 3 +- .../testing/randomwalk/conditional/Compact.java | 7 +- .../testing/randomwalk/conditional/Flush.java | 7 +- .../testing/randomwalk/conditional/Init.java | 21 ++--- .../testing/randomwalk/conditional/Merge.java | 6 +- .../testing/randomwalk/conditional/Setup.java | 11 ++- .../testing/randomwalk/conditional/Split.java | 6 +- .../testing/randomwalk/conditional/Transfer.java | 4 +- .../testing/randomwalk/conditional/Utils.java | 15 +++- .../testing/randomwalk/conditional/Verify.java | 11 ++- .../testing/randomwalk/image/ImageFixture.java | 19 ++--- .../testing/randomwalk/image/ScanMeta.java | 78 +++++++------------ .../accumulo/testing/randomwalk/image/TableOp.java | 5 +- .../accumulo/testing/randomwalk/image/Verify.java | 49 ++++++------ .../testing/randomwalk/multitable/BulkImport.java | 33 ++++---- .../testing/randomwalk/multitable/CopyTable.java | 14 ++-- .../testing/randomwalk/multitable/CreateTable.java | 14 ++-- .../randomwalk/multitable/OfflineTable.java | 2 +- .../testing/randomwalk/multitable/Write.java | 2 +- .../testing/randomwalk/sequential/BatchVerify.java | 9 +-- .../randomwalk/sequential/MapRedVerify.java | 24 +++--- .../randomwalk/sequential/SequentialFixture.java | 10 ++- .../testing/randomwalk/shard/BulkInsert.java | 91 +++++++++++----------- .../testing/randomwalk/shard/CloneIndex.java | 2 +- .../testing/randomwalk/shard/CompactFilter.java | 28 +++---- .../accumulo/testing/randomwalk/shard/Delete.java | 8 +- .../testing/randomwalk/shard/DeleteSomeDocs.java | 52 ++++++------- .../testing/randomwalk/shard/DeleteWord.java | 28 +++---- .../testing/randomwalk/shard/ExportIndex.java | 61 ++++++++------- .../accumulo/testing/randomwalk/shard/Flush.java | 11 +-- .../accumulo/testing/randomwalk/shard/Grep.java | 51 ++++++------ .../accumulo/testing/randomwalk/shard/Insert.java | 19 ++--- .../accumulo/testing/randomwalk/shard/Merge.java | 5 +- .../accumulo/testing/randomwalk/shard/Reindex.java | 29 +++---- .../accumulo/testing/randomwalk/shard/Search.java | 79 ++++++++----------- .../testing/randomwalk/shard/ShardFixture.java | 14 ++-- .../accumulo/testing/randomwalk/shard/Split.java | 4 +- .../testing/randomwalk/shard/VerifyIndex.java | 48 ++++++------ 62 files changed, 551 insertions(+), 590 deletions(-) diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java index 8e85e20..929b2a8 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/Module.java @@ -57,7 +57,7 @@ public class Module extends Node { private static class Dummy extends Node { - String name; + final String name; Dummy(String name) { this.name = name; @@ -100,7 +100,7 @@ public class Module extends Node { Node target; String targetId; - String id; + final String id; Alias(String id) { target = null; @@ -143,7 +143,6 @@ public class Module extends Node { private final List<Edge> edges = new ArrayList<>(); private int totalWeight = 0; - private Random rand = new Random(); /** * Adds a neighbor node and weight of edge @@ -163,10 +162,10 @@ public class Module extends Node { * * @return Node or null if no edges */ - private String randomNeighbor() throws Exception { + private String randomNeighbor() { String nodeId = null; - rand = new Random(); + Random rand = new Random(); int randNum = rand.nextInt(totalWeight) + 1; int sum = 0; @@ -212,11 +211,8 @@ public class Module extends Node { else maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0")); - if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true") - || prop.equals("")) - teardown = true; - else - teardown = false; + teardown = (prop = initProps.getProperty("teardown")) == null || prop.equals("true") + || prop.equals(""); if (fixture != null) { fixture.setUp(state, env); @@ -386,7 +382,7 @@ public class Module extends Node { Thread timer = null; final int time = 5 * 1000 * 60; - AtomicBoolean runningLong = new AtomicBoolean(false); + final AtomicBoolean runningLong = new AtomicBoolean(false); long systemTime; /** @@ -549,16 +545,13 @@ public class Module extends Node { Properties initProps = new Properties(); String attr = initEl.getAttribute("maxHops"); - if (attr != null) - initProps.setProperty("maxHops", attr); + initProps.setProperty("maxHops", attr); attr = initEl.getAttribute("maxSec"); - if (attr != null) - initProps.setProperty("maxSec", attr); + initProps.setProperty("maxSec", attr); attr = initEl.getAttribute("teardown"); - if (attr != null) - initProps.setProperty("teardown", attr); + initProps.setProperty("teardown", attr); localProps.put("_init", initProps); // parse all nodes diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/RandWalkEnv.java b/src/main/java/org/apache/accumulo/testing/randomwalk/RandWalkEnv.java index e3ed805..9dd311c 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/RandWalkEnv.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/RandWalkEnv.java @@ -16,8 +16,6 @@ */ package org.apache.accumulo.testing.randomwalk; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.testing.TestEnv; import org.slf4j.Logger; @@ -48,8 +46,7 @@ public class RandWalkEnv extends TestEnv { * @throws NumberFormatException * if any configuration property cannot be parsed */ - public MultiTableBatchWriter getMultiTableBatchWriter() - throws AccumuloException, AccumuloSecurityException { + public MultiTableBatchWriter getMultiTableBatchWriter() { if (mtbw == null) { mtbw = getAccumuloClient().createMultiTableBatchWriter(); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkImportTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkImportTest.java index 36dae32..c1782dc 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkImportTest.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkImportTest.java @@ -59,7 +59,7 @@ public abstract class BulkImportTest extends BulkTest { log.debug("Waiting 30s before continuing"); try { Thread.sleep(30 * 1000); - } catch (InterruptedException e) {} + } catch (InterruptedException ignored) {} return; } else { @@ -78,7 +78,7 @@ public abstract class BulkImportTest extends BulkTest { } } - private boolean shouldQueueMoreImports(State state, RandWalkEnv env) throws Exception { + private boolean shouldQueueMoreImports(State state, RandWalkEnv env) { // Only selectively import when it's BulkPlusOne. If we did a // BulkPlusOne, // we must also do a BulkMinusOne to keep the table consistent diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java index 55d81c1..872d58c 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java @@ -18,10 +18,12 @@ package org.apache.accumulo.testing.randomwalk.bulk; import java.util.ArrayList; import java.util.List; -import java.util.Random; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.rfile.RFile; @@ -41,13 +43,11 @@ public class BulkPlusOne extends BulkImportTest { public static final int COLS = 10; public static final int HEX_SIZE = (int) Math.ceil(Math.log(LOTS) / Math.log(16)); public static final String FMT = "r%0" + HEX_SIZE + "x"; - public static final List<Column> COLNAMES = new ArrayList<>(); public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); - static { - for (int i = 0; i < COLS; i++) { - COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i)))); - } - } + public static final List<Column> COLNAMES = IntStream.range(0, COLS) + .mapToObj(i -> String.format("%03d", i)).map(Text::new) + .map(t -> new Column(CHECK_COLUMN_FAMILY, t)).collect(Collectors.toList()); + public static final Text MARKER_CF = new Text("marker"); static final AtomicLong counter = new AtomicLong(); @@ -55,19 +55,16 @@ public class BulkPlusOne extends BulkImportTest { static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception { final FileSystem fs = (FileSystem) state.get("fs"); - final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID().toString()); + final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID()); log.debug("Bulk loading from {}", dir); - final Random rand = state.getRandom(); - final int parts = rand.nextInt(10) + 1; + final int parts = env.getRandom().nextInt(10) + 1; - TreeSet<Integer> startRows = new TreeSet<>(); + TreeSet<Integer> startRows = Stream.generate(() -> env.getRandom().nextInt(LOTS)).limit(parts) + .collect(Collectors.toCollection(TreeSet::new)); startRows.add(0); - while (startRows.size() < parts) - startRows.add(rand.nextInt(LOTS)); - List<String> printRows = new ArrayList<>(startRows.size()); - for (Integer row : startRows) - printRows.add(String.format(FMT, row)); + List<String> printRows = startRows.stream().map(row -> String.format(FMT, row)) + .collect(Collectors.toList()); String markerColumnQualifier = String.format("%07d", counter.incrementAndGet()); log.debug("preparing bulk files with start rows " + printRows + " last row " @@ -80,18 +77,18 @@ public class BulkPlusOne extends BulkImportTest { String fileName = dir + "/" + String.format("part_%d.rf", i); log.debug("Creating {}", fileName); - RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build(); - writer.startDefaultLocalityGroup(); - int start = rows.get(i); - int end = rows.get(i + 1); - for (int j = start; j < end; j++) { - Text row = new Text(String.format(FMT, j)); - for (Column col : COLNAMES) { - writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value); + try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) { + writer.startDefaultLocalityGroup(); + int start = rows.get(i); + int end = rows.get(i + 1); + for (int j = start; j < end; j++) { + Text row = new Text(String.format(FMT, j)); + for (Column col : COLNAMES) { + writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value); + } + writer.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); } - writer.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); } - writer.close(); } env.getAccumuloClient().tableOperations().importDirectory(dir.toString()) .to(Setup.getTableName()).tableTime(true); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Merge.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Merge.java index 1ffc764..f182e1e 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Merge.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Merge.java @@ -40,7 +40,7 @@ public class Merge extends SelectiveBulkTest { public static Text getRandomRow(Random rand) { return new Text( - String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS)); + String.format(BulkPlusOne.FMT, (rand.nextLong() & 0x7fffffffffffffffL) % BulkPlusOne.LOTS)); } public static Text[] getRandomTabletRange(State state) { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveBulkTest.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveBulkTest.java index 371b131..2ae503d 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveBulkTest.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveBulkTest.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.testing.randomwalk.bulk; +import static java.util.concurrent.TimeUnit.SECONDS; + import java.util.Properties; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; @@ -31,14 +33,14 @@ public abstract class SelectiveBulkTest extends BulkTest { public void visit(State state, RandWalkEnv env, Properties props) throws Exception { if (SelectiveQueueing.shouldQueueOperation(state, env)) { super.visit(state, env, props); - } else { - log.debug("Skipping queueing of " + getClass().getSimpleName() - + " because of excessive queued tasks already"); - log.debug("Waiting 30 seconds before continuing"); - try { - Thread.sleep(30 * 1000); - } catch (InterruptedException e) {} + return; } + log.debug("Skipping queueing of {} because of excessive queued tasks already", + getClass().getSimpleName()); + log.debug("Waiting 30 seconds before continuing"); + try { + Thread.sleep(SECONDS.toMillis(30)); + } catch (InterruptedException ignored) {} } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java index 2efc07f..79bc30b 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/SelectiveQueueing.java @@ -25,13 +25,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Chooses whether or not an operation should be queued based on the current thread pool queue - * length and the number of available TServers. + * Chooses whether an operation should be queued based on the current thread pool queue length and + * the number of available TServers. */ public class SelectiveQueueing { private static final Logger log = LoggerFactory.getLogger(SelectiveQueueing.class); - public static boolean shouldQueueOperation(State state, RandWalkEnv env) throws Exception { + public static boolean shouldQueueOperation(State state, RandWalkEnv env) { final ThreadPoolExecutor pool = (ThreadPoolExecutor) state.get("pool"); long queuedThreads = pool.getTaskCount() - pool.getActiveCount() - pool.getCompletedTaskCount(); final AccumuloClient client = env.getAccumuloClient(); @@ -46,6 +46,6 @@ public class SelectiveQueueing { } private static boolean shouldQueue(long queuedThreads, int numTservers) { - return queuedThreads < numTservers * 50; + return queuedThreads < numTservers * 50L; } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java index b2b6485..df92741 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Setup.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.iterators.LongCombiner; import org.apache.accumulo.core.iterators.user.SummingCombiner; @@ -46,11 +47,10 @@ public class Setup extends Test { TableOperations tableOps = env.getAccumuloClient().tableOperations(); try { if (!tableOps.exists(getTableName())) { - tableOps.create(getTableName()); IteratorSetting is = new IteratorSetting(10, SummingCombiner.class); SummingCombiner.setEncodingType(is, LongCombiner.Type.STRING); SummingCombiner.setCombineAllColumns(is, true); - tableOps.attachIterator(getTableName(), is); + tableOps.create(getTableName(), new NewTableConfiguration().attachIterator(is)); } } catch (TableExistsException ex) { // expected if there are multiple walkers @@ -58,7 +58,7 @@ public class Setup extends Test { state.setRandom(env.getRandom()); state.set("fs", FileSystem.get(env.getHadoopConfiguration())); state.set("bulkImportSuccess", "true"); - BulkPlusOne.counter.set(0l); + BulkPlusOne.counter.set(0L); ThreadPoolExecutor e = ThreadPools.getServerThreadPools().createFixedThreadPool(MAX_POOL_SIZE, "bulkImportPool", false); state.set("pool", e); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Split.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Split.java index eb5e28e..57fd9e8 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Split.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Split.java @@ -19,6 +19,8 @@ package org.apache.accumulo.testing.randomwalk.bulk; import java.util.Random; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; @@ -28,13 +30,12 @@ public class Split extends SelectiveBulkTest { @Override protected void runLater(State state, RandWalkEnv env) throws Exception { - SortedSet<Text> splits = new TreeSet<>(); Random rand = state.getRandom(); int count = rand.nextInt(20); - for (int i = 0; i < count; i++) - splits.add(new Text(String.format(BulkPlusOne.FMT, - (rand.nextLong() & 0x7fffffffffffffffl) % BulkPlusOne.LOTS))); - log.info("splitting " + splits); + SortedSet<Text> splits = Stream.generate(rand::nextLong).map(l -> l & 0x7fffffffffffffffL) + .map(l -> l % BulkPlusOne.LOTS).map(l -> String.format(BulkPlusOne.FMT, l)).map(Text::new) + .limit(count).collect(Collectors.toCollection(TreeSet::new)); + log.info("splitting {}", splits); env.getAccumuloClient().tableOperations().addSplits(Setup.getTableName(), splits); log.info("split for " + splits + " finished"); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java index 387e356..5034abb 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/bulk/Verify.java @@ -64,18 +64,20 @@ public class Verify extends Test { String user = env.getAccumuloClient().whoami(); Authorizations auths = env.getAccumuloClient().securityOperations().getUserAuthorizations(user); - Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths); - scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); - for (Entry<Key,Value> entry : scanner) { - byte[] value = entry.getValue().get(); - if (!Arrays.equals(value, zero)) { - throw new Exception("Bad key at " + entry); + RowIterator rowIter; + try (Scanner scanner = env.getAccumuloClient().createScanner(Setup.getTableName(), auths)) { + scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); + for (Entry<Key,Value> entry : scanner) { + byte[] value = entry.getValue().get(); + if (!Arrays.equals(value, zero)) { + throw new Exception("Bad key at " + entry); + } } - } - scanner.clearColumns(); - scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF); - RowIterator rowIter = new RowIterator(scanner); + scanner.clearColumns(); + scanner.fetchColumnFamily(BulkPlusOne.MARKER_CF); + rowIter = new RowIterator(scanner); + } while (rowIter.hasNext()) { Iterator<Entry<Key,Value>> row = rowIter.next(); @@ -117,8 +119,9 @@ public class Verify extends Test { public static void main(String[] args) throws Exception { Opts opts = new Opts(); opts.parseArgs(Verify.class.getName(), args); - try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build()) { - Scanner scanner = client.createScanner(opts.tableName, opts.auths); + try (AccumuloClient client = Accumulo.newClient().from(opts.getClientProps()).build(); + Scanner scanner = client.createScanner(opts.tableName, opts.auths)) { + scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); Text startBadRow = null; Text lastBadRow = null; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/AddSplits.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/AddSplits.java index ab4cbce..87602b3 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/AddSplits.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/AddSplits.java @@ -43,7 +43,7 @@ public class AddSplits extends Test { TreeSet<Text> splits = new TreeSet<>(); for (int i = 0; i < rand.nextInt(10) + 1; i++) - splits.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl))); + splits.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL))); try { client.tableOperations().addSplits(tableName, splits); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchScan.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchScan.java index e5c1934..6a1f803 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchScan.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchScan.java @@ -45,25 +45,20 @@ public class BatchScan extends Test { Random rand = state.getRandom(); String tableName = state.getRandomTableName(); - try { - BatchScanner bs = client.createBatchScanner(tableName, Authorizations.EMPTY, 3); + try (BatchScanner bs = client.createBatchScanner(tableName, Authorizations.EMPTY, 3)) { List<Range> ranges = new ArrayList<>(); for (int i = 0; i < rand.nextInt(2000) + 1; i++) - ranges.add(new Range(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl))); + ranges.add(new Range(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL))); bs.setRanges(ranges); - try { - Iterator<Entry<Key,Value>> iter = bs.iterator(); - while (iter.hasNext()) - iter.next(); - } finally { - bs.close(); - } + Iterator<Entry<Key,Value>> iter = bs.iterator(); + while (iter.hasNext()) + iter.next(); log.debug("Wrote to " + tableName); } catch (TableNotFoundException e) { - log.debug("BatchScan " + tableName + " failed, doesnt exist"); + log.debug("BatchScan " + tableName + " failed, doesn't exist"); } catch (TableDeletedException tde) { log.debug("BatchScan " + tableName + " failed, table deleted"); } catch (TableOfflineException e) { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchWrite.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchWrite.java index fc2c6a1..a0187c7 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchWrite.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BatchWrite.java @@ -42,30 +42,29 @@ public class BatchWrite extends Test { Random rand = state.getRandom(); String tableName = state.getRandomTableName(); - try { - try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig())) { - int numRows = rand.nextInt(100000); - for (int i = 0; i < numRows; i++) { - Mutation m = new Mutation(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl)); - long val = rand.nextLong() & 0x7fffffffffffffffl; - for (int j = 0; j < 10; j++) { - m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(UTF_8))); - } + try (BatchWriter bw = client.createBatchWriter(tableName, new BatchWriterConfig())) { - bw.addMutation(m); + int numRows = rand.nextInt(100000); + for (int i = 0; i < numRows; i++) { + Mutation m = new Mutation(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL)); + long val = rand.nextLong() & 0x7fffffffffffffffL; + for (int j = 0; j < 10; j++) { + m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(UTF_8))); } + + bw.addMutation(m); } log.debug("Wrote to " + tableName); } catch (TableNotFoundException e) { - log.debug("BatchWrite " + tableName + " failed, doesnt exist"); + log.debug("BatchWrite " + tableName + " failed, table doesn't exist"); } catch (TableOfflineException e) { - log.debug("BatchWrite " + tableName + " failed, offline"); + log.debug("BatchWrite " + tableName + " failed, table offline"); } catch (MutationsRejectedException mre) { if (mre.getCause() instanceof TableDeletedException) log.debug("BatchWrite " + tableName + " failed, table deleted"); else if (mre.getCause() instanceof TableOfflineException) - log.debug("BatchWrite " + tableName + " failed, offline"); + log.debug("BatchWrite " + tableName + " failed, table offline"); else throw mre; } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BulkImport.java index b23bde5..13fb710 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BulkImport.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/BulkImport.java @@ -98,7 +98,7 @@ public class BulkImport extends Test { FileSystem fs = FileSystem.get(env.getHadoopConfiguration()); String bulkDir = env.getHdfsRoot() + "/tmp/concurrent_bulk/b_" - + String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl); + + String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL); fs.mkdirs(new Path(bulkDir)); fs.mkdirs(new Path(bulkDir + "_f")); @@ -109,12 +109,12 @@ public class BulkImport extends Test { TreeSet<Long> rows = new TreeSet<>(); int numRows = rand.nextInt(100000); for (int i = 0; i < numRows; i++) { - rows.add(rand.nextLong() & 0x7fffffffffffffffl); + rows.add(rand.nextLong() & 0x7fffffffffffffffL); } for (Long row : rows) { Mutation m = new Mutation(String.format("%016x", row)); - long val = rand.nextLong() & 0x7fffffffffffffffl; + long val = rand.nextLong() & 0x7fffffffffffffffL; for (int j = 0; j < 10; j++) { m.put("cf", "cq" + j, new Value(String.format("%016x", val).getBytes(UTF_8))); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ChangePermissions.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ChangePermissions.java index 2570065..9bc1a37 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ChangePermissions.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ChangePermissions.java @@ -48,7 +48,7 @@ public class ChangePermissions extends Test { changeSystemPermission(client, rand, userName); else if (dice == 1) changeTablePermission(client, rand, userName, tableName); - else if (dice == 2) + else changeNamespacePermission(client, rand, userName, namespace); } catch (AccumuloSecurityException | AccumuloException ex) { log.debug("Unable to change user permissions: " + ex.getCause()); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CheckPermission.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CheckPermission.java index f62abee..447cc68 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CheckPermission.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CheckPermission.java @@ -39,7 +39,7 @@ public class CheckPermission extends Test { String namespace = state.getRandomNamespace(); try { - int dice = rand.nextInt(2); + int dice = rand.nextInt(3); if (dice == 0) { log.debug("Checking systerm permission " + userName); client.securityOperations().hasSystemPermission(userName, @@ -48,7 +48,7 @@ public class CheckPermission extends Test { log.debug("Checking table permission " + userName + " " + tableName); client.securityOperations().hasTablePermission(userName, tableName, TablePermission.values()[rand.nextInt(TablePermission.values().length)]); - } else if (dice == 2) { + } else { log.debug("Checking namespace permission " + userName + " " + namespace); client.securityOperations().hasNamespacePermission(userName, namespace, NamespacePermission.values()[rand.nextInt(NamespacePermission.values().length)]); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CloneTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CloneTable.java index bdabd2c..064a5fb 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CloneTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/CloneTable.java @@ -49,10 +49,10 @@ public class CloneTable extends Test { } catch (TableNotFoundException e) { log.debug("Clone " + srcTableName + " failed, doesnt exist"); } catch (IllegalArgumentException e) { - log.debug("Clone: " + e.toString()); + log.debug("Clone: " + e); } catch (AccumuloException e) { Throwable cause = e.getCause(); - if (cause != null && cause instanceof NamespaceNotFoundException) + if (cause instanceof NamespaceNotFoundException) log.debug( "Clone: " + srcTableName + " to " + newTableName + " failed, namespace not found"); else diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ConcurrentFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ConcurrentFixture.java index 71595b2..dca4606 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ConcurrentFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ConcurrentFixture.java @@ -28,8 +28,6 @@ import org.apache.hadoop.io.Text; /** * When multiple instance of this test suite are run, all instances will operate on the same set of * table names. - * - * */ public class ConcurrentFixture extends Fixture { @@ -57,9 +55,9 @@ public class ConcurrentFixture extends Fixture { // Having all negative values = null might be too frequent if (firstLong >= 0) - first = new Text(String.format("%016x", firstLong & 0x7fffffffffffffffl)); + first = new Text(String.format("%016x", firstLong & 0x7fffffffffffffffL)); if (secondLong >= 0) - second = new Text(String.format("%016x", secondLong & 0x7fffffffffffffffl)); + second = new Text(String.format("%016x", secondLong & 0x7fffffffffffffffL)); if (first != null && second != null && first.compareTo(second) > 0) { Text swap = first; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java index feb3604..e7c79d2 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/Config.java @@ -37,9 +37,9 @@ public class Config extends Test { private static final String LAST_NAMESPACE_SETTING = "lastNamespaceSetting"; static class Setting { - public Property property; - public long min; - public long max; + final public Property property; + final public long min; + final public long max; public Setting(Property property, long min, long max) { this.property = property; @@ -53,9 +53,9 @@ public class Config extends Test { } @SuppressWarnings("deprecation") - Property TSERV_READ_AHEAD_MAXCONCURRENT_deprecated = Property.TSERV_READ_AHEAD_MAXCONCURRENT; + final Property TSERV_READ_AHEAD_MAXCONCURRENT_deprecated = Property.TSERV_READ_AHEAD_MAXCONCURRENT; // @formatter:off - Setting[] settings = { + final Setting[] settings = { s(Property.TSERV_BLOOM_LOAD_MAXCONCURRENT, 1, 10), s(Property.TSERV_BULK_PROCESS_THREADS, 1, 10), s(Property.TSERV_BULK_RETRY, 1, 10), @@ -81,8 +81,7 @@ public class Config extends Test { s(Property.TSERV_SESSION_MAXIDLE, 100, 5 * 60 * 1000), s(Property.TSERV_WAL_SORT_BUFFER_SIZE, 1024 * 1024, 1024 * 1024 * 1024L), s(Property.TSERV_TABLET_SPLIT_FINDMIDPOINT_MAXOPEN, 5, 100), - s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024, - 1024 * 1024 * 1024 * 10L), + s(Property.TSERV_WAL_BLOCKSIZE, 1024 * 1024,1024 * 1024 * 1024 * 10L), s(Property.TSERV_WORKQ_THREADS, 1, 10), s(Property.MANAGER_BULK_THREADPOOL_SIZE, 1, 10), s(Property.MANAGER_BULK_RETRIES, 1, 10), @@ -93,16 +92,13 @@ public class Config extends Test { s(Property.MANAGER_THREADCHECK, 100, 10000), s(Property.MANAGER_MINTHREADS, 1, 200),}; - Setting[] tableSettings = { + final Setting[] tableSettings = { s(Property.TABLE_MAJC_RATIO, 1, 10), - s(Property.TABLE_SPLIT_THRESHOLD, 10 * 1024, - 10L * 1024 * 1024 * 1024), + s(Property.TABLE_SPLIT_THRESHOLD, 10 * 1024, 10L * 1024 * 1024 * 1024), s(Property.TABLE_MINC_COMPACT_IDLETIME, 100, 100 * 60 * 60 * 1000L), s(Property.TABLE_SCAN_MAXMEM, 10 * 1024, 10 * 1024 * 1024), - s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, 10 * 1024, - 10 * 1024 * 1024L), - s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, 10 * 1024, - 10 * 1024 * 1024L), + s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, 10 * 1024, 10 * 1024 * 1024L), + s(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, 10 * 1024, 10 * 1024 * 1024L), s(Property.TABLE_FILE_REPLICATION, 0, 5), s(Property.TABLE_FILE_MAX, 2, 50),}; // @formatter:on diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteRange.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteRange.java index 965656b..8314fe4 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteRange.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteRange.java @@ -40,8 +40,8 @@ public class DeleteRange extends Test { List<Text> range = new ArrayList<>(); do { - range.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl))); - range.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffl))); + range.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL))); + range.add(new Text(String.format("%016x", rand.nextLong() & 0x7fffffffffffffffL))); } while (range.get(0).equals(range.get(1))); Collections.sort(range); if (rand.nextInt(20) == 0) @@ -53,9 +53,9 @@ public class DeleteRange extends Test { client.tableOperations().deleteRows(tableName, range.get(0), range.get(1)); log.debug("deleted rows (" + range.get(0) + " -> " + range.get(1) + "] in " + tableName); } catch (TableNotFoundException tne) { - log.debug("deleted rows " + tableName + " failed, doesnt exist"); + log.debug("deleted rows " + tableName + " failed, table doesn't exist"); } catch (TableOfflineException toe) { - log.debug("deleted rows " + tableName + " failed, offline"); + log.debug("deleted rows " + tableName + " failed, table offline"); } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteTable.java index 6a45b00..43c9edf 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/DeleteTable.java @@ -35,7 +35,7 @@ public class DeleteTable extends Test { client.tableOperations().delete(tableName); log.debug("Deleted table " + tableName); } catch (TableNotFoundException e) { - log.debug("Delete " + tableName + " failed, doesnt exist"); + log.debug("Delete " + tableName + " failed, table doesn't exist"); } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ListSplits.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ListSplits.java index 1678bd5..3a93a27 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ListSplits.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ListSplits.java @@ -38,7 +38,7 @@ public class ListSplits extends Test { Collection<Text> splits = client.tableOperations().listSplits(tableName); log.debug("Table " + tableName + " had " + splits.size() + " splits"); } catch (TableNotFoundException e) { - log.debug("listSplits " + tableName + " failed, doesnt exist"); + log.debug("listSplits " + tableName + " failed, table doesn't exist"); } catch (AccumuloSecurityException ase) { log.debug("listSplits " + tableName + " failed, " + ase.getMessage()); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/OfflineTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/OfflineTable.java index a3af9d5..c4a369e 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/OfflineTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/OfflineTable.java @@ -43,7 +43,7 @@ public class OfflineTable extends Test { client.tableOperations().online(tableName, rand.nextBoolean()); log.debug("Onlined " + tableName); } catch (TableNotFoundException tne) { - log.debug("offline or online failed " + tableName + ", doesnt exist"); + log.debug("offline or online failed " + tableName + ", doesn't exist"); } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/RenameTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/RenameTable.java index 526a6c1..f780707 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/RenameTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/RenameTable.java @@ -67,7 +67,7 @@ public class RenameTable extends Test { log.debug("Rename " + srcTableName + " failed, doesnt exist"); } catch (IllegalArgumentException e) { - log.debug("Rename: " + e.toString()); + log.debug("Rename: " + e); } catch (AccumuloException e) { // Catch the expected failure when we try to rename a table into a // new namespace diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ScanTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ScanTable.java index c5ad06e..dab5838 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ScanTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/concurrent/ScanTable.java @@ -40,8 +40,7 @@ public class ScanTable extends Test { AccumuloClient client = env.getAccumuloClient(); String tableName = state.getRandomTableName(); - try { - Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); + try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { Iterator<Entry<Key,Value>> iter = scanner.iterator(); while (iter.hasNext()) { iter.next(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Compact.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Compact.java index a31d3fb..96ac8bf 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Compact.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Compact.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; +import static org.apache.accumulo.testing.randomwalk.conditional.Utils.getRowFromBank; + import java.util.Properties; import java.util.Random; @@ -34,8 +36,9 @@ public class Compact extends Test { String table = state.getString("tableName"); Random rand = state.getRandom(); AccumuloClient client = env.getAccumuloClient(); - Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); - Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + + Text row1 = getRowFromBank(rand, state); + Text row2 = getRowFromBank(rand, state); if (row1.compareTo(row2) >= 0) { row1 = null; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Flush.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Flush.java index 86f2775..32bca57 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Flush.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Flush.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; +import static org.apache.accumulo.testing.randomwalk.conditional.Utils.getRowFromBank; + import java.util.Properties; import java.util.Random; @@ -34,8 +36,9 @@ public class Flush extends Test { String table = state.getString("tableName"); Random rand = state.getRandom(); AccumuloClient client = env.getAccumuloClient(); - Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); - Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + + Text row1 = getRowFromBank(rand, state); + Text row2 = getRowFromBank(rand, state); if (row1.compareTo(row2) >= 0) { row1 = null; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Init.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Init.java index acc6e98..023ec20 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Init.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Init.java @@ -16,10 +16,12 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; -import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Status; @@ -38,19 +40,18 @@ public class Init extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - int numBanks = (Integer) state.get("numBanks"); - int numAccts = (Integer) state.get("numAccts"); + int numBanks = state.getInteger("numBanks"); + int numAccts = state.getInteger("numAccts"); // add some splits to spread ingest out a little - TreeSet<Text> splits = new TreeSet<>(); - for (int i = 1; i < 10; i++) - splits.add(new Text(Utils.getBank((int) (numBanks * .1 * i)))); - env.getAccumuloClient().tableOperations().addSplits((String) state.get("tableName"), splits); + TreeSet<Text> splits = IntStream.range(1, 10).map(i -> (int) (numBanks * .1 * i)) + .mapToObj(Utils::getBank).map(Text::new).collect(Collectors.toCollection(TreeSet::new)); + + env.getAccumuloClient().tableOperations().addSplits(state.getString("tableName"), splits); log.info("Added splits " + splits); - ArrayList<Integer> banks = new ArrayList<>(); - for (int i = 0; i < numBanks; i++) - banks.add(i); + List<Integer> banks = IntStream.range(0, numBanks).boxed().collect(Collectors.toList()); + // shuffle for case when multiple threads are adding banks Collections.shuffle(banks, state.getRandom()); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Merge.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Merge.java index 393dc6e..7a7d4a6 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Merge.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Merge.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; +import static org.apache.accumulo.testing.randomwalk.conditional.Utils.getRowFromBank; + import java.util.Properties; import java.util.Random; @@ -34,8 +36,8 @@ public class Merge extends Test { String table = state.getString("tableName"); Random rand = state.getRandom(); AccumuloClient client = env.getAccumuloClient(); - Text row1 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); - Text row2 = new Text(Utils.getBank(rand.nextInt((Integer) state.get("numBanks")))); + Text row1 = getRowFromBank(rand, state); + Text row2 = getRowFromBank(rand, state); if (row1.compareTo(row2) >= 0) { row1 = null; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Setup.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Setup.java index 801a3c7..dcc2183 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Setup.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Setup.java @@ -50,11 +50,16 @@ public class Setup extends Test { env.getAccumuloClient().tableOperations().setProperty(tableName, Property.TABLE_BLOCKCACHE_ENABLED.getKey(), blockCache + ""); log.debug("set " + Property.TABLE_BLOCKCACHE_ENABLED.getKey() + " " + blockCache); - } catch (TableExistsException tee) {} + } catch (TableExistsException ignored) {} - ConditionalWriter cw = env.getAccumuloClient().createConditionalWriter(tableName, + ConditionalWriter newCW = env.getAccumuloClient().createConditionalWriter(tableName, new ConditionalWriterConfig().setMaxWriteThreads(1)); - state.set("cw", cw); + ConditionalWriter previousCW = (ConditionalWriter) state.getOkIfAbsent("cw"); + state.set("cw", newCW); + // close the previous conditional writer if there is one + if (previousCW != null) { + previousCW.close(); + } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Split.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Split.java index cd77682..beae0b8 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Split.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Split.java @@ -16,9 +16,9 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; -import java.util.Arrays; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.TreeSet; import org.apache.accumulo.core.client.AccumuloClient; @@ -36,10 +36,10 @@ public class Split extends Test { String table = state.getString("tableName"); Random rand = state.getRandom(); AccumuloClient client = env.getAccumuloClient(); - String row = Utils.getBank(rand.nextInt((Integer) state.get("numBanks"))); + Text row = Utils.getRowFromBank(rand, state); log.debug("adding split " + row); - client.tableOperations().addSplits(table, new TreeSet<>(Arrays.asList(new Text(row)))); + client.tableOperations().addSplits(table, new TreeSet<>(Set.of(row))); } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Transfer.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Transfer.java index f6b8544..9427d7d 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Transfer.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Transfer.java @@ -66,10 +66,10 @@ public class Transfer extends Test { Random rand = state.getRandom(); AccumuloClient client = env.getAccumuloClient(); - int numAccts = (Integer) state.get("numAccts"); + int numAccts = state.getInteger("numAccts"); // note: non integer exponents are slow - ZipfDistribution zdiBanks = new ZipfDistribution((Integer) state.get("numBanks"), 1); + ZipfDistribution zdiBanks = new ZipfDistribution(state.getInteger("numBanks"), 1); String bank = Utils.getBank(zdiBanks.inverseCumulativeProbability(rand.nextDouble())); ZipfDistribution zdiAccts = new ZipfDistribution(numAccts, 1); String acct1 = Utils.getAccount(zdiAccts.inverseCumulativeProbability(rand.nextDouble())); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Utils.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Utils.java index 72058bd..9325b2f 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Utils.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Utils.java @@ -16,9 +16,11 @@ */ package org.apache.accumulo.testing.randomwalk.conditional; -/** - * - */ +import java.util.Random; + +import org.apache.accumulo.testing.randomwalk.State; +import org.apache.hadoop.io.Text; + public class Utils { static String getBank(int b) { @@ -32,4 +34,11 @@ public class Utils { static String getSeq(int s) { return String.format("%06d", s); } + + static Text getRowFromBank(Random rand, State state) { + Integer numBanks = state.getInteger("numBanks"); + int randomBankIndex = rand.nextInt(numBanks); + String bank = Utils.getBank(randomBankIndex); + return new Text(bank); + } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Verify.java index 3189cae..9a50ed5 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/conditional/Verify.java @@ -20,7 +20,6 @@ import java.util.Map.Entry; import java.util.Properties; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; @@ -42,9 +41,9 @@ public class Verify extends Test { String table = state.getString("tableName"); AccumuloClient client = env.getAccumuloClient(); - int numAccts = (Integer) state.get("numAccts"); - - for (int i = 0; i < (Integer) state.get("numBanks"); i++) + int numAccts = state.getInteger("numAccts"); + int numBanks = state.getInteger("numBanks"); + for (int i = 0; i < numBanks; i++) verifyBank(table, client, Utils.getBank(i), numAccts); } @@ -58,9 +57,9 @@ public class Verify extends Test { int min = Integer.MAX_VALUE; int max = Integer.MIN_VALUE; - // TODO do not use IsolatedScanner, just enable isolation on scanner - try (Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY))) { + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + scanner.enableIsolation(); scanner.setRange(new Range(row)); IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class); ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/image/ImageFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/image/ImageFixture.java index cfcdb65..368a676 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/image/ImageFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/image/ImageFixture.java @@ -18,11 +18,12 @@ package org.apache.accumulo.testing.randomwalk.image; import java.net.InetAddress; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -44,10 +45,8 @@ public class ImageFixture extends Fixture { AccumuloClient client = env.getAccumuloClient(); - SortedSet<Text> splits = new TreeSet<>(); - for (int i = 1; i < 256; i++) { - splits.add(new Text(String.format("%04x", i << 8))); - } + SortedSet<Text> splits = IntStream.range(1, 256).mapToObj(i -> String.format("%04x", i << 8)) + .map(Text::new).collect(Collectors.toCollection(TreeSet::new)); String hostname = InetAddress.getLocalHost().getHostName().replaceAll("[-.]", "_"); String pid = env.getPid(); @@ -93,14 +92,8 @@ public class ImageFixture extends Fixture { static Map<String,Set<Text>> getLocalityGroups() { Map<String,Set<Text>> groups = new HashMap<>(); - - HashSet<Text> lg1 = new HashSet<>(); - lg1.add(Write.CONTENT_COLUMN_FAMILY); - groups.put("lg1", lg1); - - HashSet<Text> lg2 = new HashSet<>(); - lg2.add(Write.META_COLUMN_FAMILY); - groups.put("lg2", lg2); + groups.put("lg1", Set.of(Write.CONTENT_COLUMN_FAMILY)); + groups.put("lg2", Set.of(Write.META_COLUMN_FAMILY)); return groups; } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/image/ScanMeta.java b/src/main/java/org/apache/accumulo/testing/randomwalk/image/ScanMeta.java index d64023a..6947227 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/image/ScanMeta.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/image/ScanMeta.java @@ -16,20 +16,16 @@ */ package org.apache.accumulo.testing.randomwalk.image; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; @@ -52,60 +48,44 @@ public class ScanMeta extends Test { AccumuloClient client = env.getAccumuloClient(); - Scanner imageScanner = client.createScanner(imageTableName, new Authorizations()); + try (Scanner imageScanner = client.createScanner(imageTableName, new Authorizations())) { - imageScanner.setRange(new Range(new Text(uuid), null)); - imageScanner.fetchColumn(Write.META_COLUMN_FAMILY, Write.SHA1_COLUMN_QUALIFIER); + imageScanner.setRange(new Range(new Text(uuid), null)); + imageScanner.fetchColumn(Write.META_COLUMN_FAMILY, Write.SHA1_COLUMN_QUALIFIER); - int minScan = Integer.parseInt(props.getProperty("minScan")); - int maxScan = Integer.parseInt(props.getProperty("maxScan")); + int minScan = Integer.parseInt(props.getProperty("minScan")); + int maxScan = Integer.parseInt(props.getProperty("maxScan")); - int numToScan = env.getRandom().nextInt(maxScan - minScan) + minScan; + int numToScan = env.getRandom().nextInt(maxScan - minScan) + minScan; - Map<Text,Text> hashes = new HashMap<>(); + Map<Text,Text> hashes = imageScanner.stream().limit(numToScan).collect(Collectors + .toMap(entry -> new Text(entry.getValue().get()), entry -> entry.getKey().getRow())); - Iterator<Entry<Key,Value>> iter = imageScanner.iterator(); + log.debug("Found " + hashes.size() + " hashes starting at " + uuid); - while (iter.hasNext() && numToScan > 0) { + if (hashes.isEmpty()) { + return; + } - Entry<Key,Value> entry = iter.next(); + // use batch scanner to verify all of these exist in index + try (BatchScanner indexScanner = client.createBatchScanner(indexTableName, + Authorizations.EMPTY, 3)) { + List<Range> ranges = hashes.keySet().stream().map(Range::new).collect(Collectors.toList()); - hashes.put(new Text(entry.getValue().get()), entry.getKey().getRow()); + indexScanner.setRanges(ranges); - numToScan--; - } - - log.debug("Found " + hashes.size() + " hashes starting at " + uuid); - - if (hashes.isEmpty()) { - return; - } - - // use batch scanner to verify all of these exist in index - BatchScanner indexScanner = client.createBatchScanner(indexTableName, Authorizations.EMPTY, 3); - ArrayList<Range> ranges = new ArrayList<>(); - for (Text row : hashes.keySet()) { - ranges.add(new Range(row)); - } + Map<Text,Text> hashes2 = indexScanner.stream().collect(Collectors + .toMap(entry -> entry.getKey().getRow(), entry -> new Text(entry.getValue().get()))); - indexScanner.setRanges(ranges); + log.debug("Looked up " + ranges.size() + " ranges, found " + hashes2.size()); - Map<Text,Text> hashes2 = new HashMap<>(); - - for (Entry<Key,Value> entry : indexScanner) - hashes2.put(entry.getKey().getRow(), new Text(entry.getValue().get())); - - log.debug("Looked up " + ranges.size() + " ranges, found " + hashes2.size()); - - if (!hashes.equals(hashes2)) { - log.error("uuids from doc table : " + hashes.values()); - log.error("uuids from index : " + hashes2.values()); - throw new Exception( - "Mismatch between document table and index " + indexTableName + " " + imageTableName); + if (!hashes.equals(hashes2)) { + log.error("uuids from doc table : " + hashes.values()); + log.error("uuids from index : " + hashes2.values()); + throw new Exception( + "Mismatch between document table and index " + indexTableName + " " + imageTableName); + } + } } - - indexScanner.close(); - } - } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/image/TableOp.java b/src/main/java/org/apache/accumulo/testing/randomwalk/image/TableOp.java index 2d573ba..f90bc6d 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/image/TableOp.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/image/TableOp.java @@ -50,8 +50,7 @@ public class TableOp extends Test { } // choose a random action - int num = env.getRandom().nextInt(10); - if (num > 6) { + if (env.getRandom().nextInt(10) > 6) { log.debug("Retrieving info for " + tableName); tableOps.getLocalityGroups(tableName); tableOps.getProperties(tableName); @@ -65,7 +64,7 @@ public class TableOp extends Test { if (env.getRandom().nextInt(10) < 3) { Map<String,Set<Text>> groups = tableOps.getLocalityGroups(state.getString("imageTableName")); - if (groups.size() == 0) { + if (groups.isEmpty()) { log.debug("Adding locality groups to " + state.getString("imageTableName")); groups = ImageFixture.getLocalityGroups(); } else { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/image/Verify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/image/Verify.java index 6f9667f..b3c79ec 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/image/Verify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/image/Verify.java @@ -51,43 +51,44 @@ public class Verify extends Test { AccumuloClient client = env.getAccumuloClient(); - Scanner indexScanner = client.createScanner(indexTableName, new Authorizations()); - Scanner imageScanner = client.createScanner(imageTableName, new Authorizations()); + try (Scanner indexScanner = client.createScanner(indexTableName, new Authorizations()); + Scanner imageScanner = client.createScanner(imageTableName, new Authorizations())) { - String uuid = UUID.randomUUID().toString(); + String uuid = UUID.randomUUID().toString(); - MessageDigest alg = MessageDigest.getInstance("SHA-1"); - alg.update(uuid.getBytes(UTF_8)); + MessageDigest alg = MessageDigest.getInstance("SHA-1"); + alg.update(uuid.getBytes(UTF_8)); - indexScanner.setRange(new Range(new Text(alg.digest()), null)); - indexScanner.setBatchSize(numVerifications); + indexScanner.setRange(new Range(new Text(alg.digest()), null)); + indexScanner.setBatchSize(numVerifications); - Text curRow = null; - int count = 0; - for (Entry<Key,Value> entry : indexScanner) { + Text curRow = null; + int count = 0; + for (Entry<Key,Value> entry : indexScanner) { - curRow = entry.getKey().getRow(); - String rowToVerify = entry.getValue().toString(); + curRow = entry.getKey().getRow(); + String rowToVerify = entry.getValue().toString(); - verifyRow(imageScanner, rowToVerify); + verifyRow(imageScanner, rowToVerify); - count++; - if (count == numVerifications) { - break; + count++; + if (count == numVerifications) { + break; + } } - } - if (count != numVerifications && curRow != null) { - Text lastRow = (Text) state.get("lastIndexRow"); - if (lastRow.compareTo(curRow) != 0) { - log.error("Verified only " + count + " of " + numVerifications + " - curRow " + curRow - + " lastKey " + lastRow); + if (count != numVerifications && curRow != null) { + Text lastRow = (Text) state.get("lastIndexRow"); + if (lastRow.compareTo(curRow) != 0) { + log.error("Verified only " + count + " of " + numVerifications + " - curRow " + curRow + + " lastKey " + lastRow); + } } } - int verified = ((Integer) state.get("verified")).intValue() + numVerifications; + int verified = state.getInteger("verified") + numVerifications; log.debug("Verified " + numVerifications + " - Total " + verified); - state.set("verified", Integer.valueOf(verified)); + state.set("verified", verified); } public void verifyRow(Scanner scanner, String row) throws Exception { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java index e78ef1b..04d1ba9 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java @@ -16,12 +16,13 @@ */ package org.apache.accumulo.testing.randomwalk.multitable; -import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.TableNotFoundException; @@ -39,15 +40,13 @@ import org.apache.hadoop.io.Text; public class BulkImport extends Test { + public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); public static final int ROWS = 1_000_000; public static final int COLS = 10; - public static final List<Column> COLNAMES = new ArrayList<>(); - public static final Text CHECK_COLUMN_FAMILY = new Text("cf"); - static { - for (int i = 0; i < COLS; i++) { - COLNAMES.add(new Column(CHECK_COLUMN_FAMILY, new Text(String.format("%03d", i)))); - } - } + public static final List<Column> COLNAMES = IntStream.range(0, COLS) + .mapToObj(i -> String.format("%03d", i)).map(Text::new) + .map(t -> new Column(CHECK_COLUMN_FAMILY, t)).collect(Collectors.toList()); + public static final Text MARKER_CF = new Text("marker"); static final AtomicLong counter = new AtomicLong(); @@ -69,7 +68,7 @@ public class BulkImport extends Test { String uuid = UUID.randomUUID().toString(); final Path dir = new Path("/tmp/bulk", uuid); - final Path fail = new Path(dir.toString() + "_fail"); + final Path fail = new Path(dir + "_fail"); final FileSystem fs = (FileSystem) state.get("fs"); fs.mkdirs(fail); final int parts = env.getRandom().nextInt(10) + 1; @@ -84,16 +83,16 @@ public class BulkImport extends Test { for (int i = 0; i < parts; i++) { String fileName = dir + "/" + String.format("part_%d.rf", i); - RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build(); - f.startDefaultLocalityGroup(); - for (String r : rows) { - Text row = new Text(r); - for (Column col : COLNAMES) { - f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE); + try (RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build()) { + f.startDefaultLocalityGroup(); + for (String r : rows) { + Text row = new Text(r); + for (Column col : COLNAMES) { + f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE); + } + f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); } - f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE); } - f.close(); } log.debug("Starting {} bulk import to {}", useLegacyBulk ? "legacy" : "new", tableName); try { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java index b52d181..b1da1dc 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CopyTable.java @@ -29,13 +29,14 @@ import org.apache.hadoop.util.ToolRunner; public class CopyTable extends Test { - private final TreeSet<Text> splits; + private final NewTableConfiguration ntc; public CopyTable() { - splits = new TreeSet<>(); + TreeSet<Text> splits = new TreeSet<>(); for (int i = 1; i < 10; i++) { splits.add(new Text(Integer.toString(i))); } + this.ntc = new NewTableConfiguration().withSplits(splits); } @Override @@ -48,13 +49,13 @@ public class CopyTable extends Test { String srcTableName = tables.remove(env.getRandom().nextInt(tables.size())); - int nextId = ((Integer) state.get("nextId")).intValue(); + int nextId = state.getInteger("nextId"); String dstTableName = String.format("%s_%d", state.getString("tableNamePrefix"), nextId); if (env.getAccumuloClient().tableOperations().exists(dstTableName)) { log.debug(dstTableName + " already exists so don't copy."); nextId++; - state.set("nextId", Integer.valueOf(nextId)); + state.set("nextId", nextId); return; } @@ -65,8 +66,7 @@ public class CopyTable extends Test { log.debug("copying " + srcTableName + " to " + dstTableName); - env.getAccumuloClient().tableOperations().create(dstTableName, - new NewTableConfiguration().withSplits(splits)); + env.getAccumuloClient().tableOperations().create(dstTableName, ntc); if (ToolRunner.run(env.getHadoopConfiguration(), new CopyTool(), args) != 0) { log.error("Failed to run map/red verify"); @@ -82,6 +82,6 @@ public class CopyTable extends Test { log.debug("dropped " + srcTableName); nextId++; - state.set("nextId", Integer.valueOf(nextId)); + state.set("nextId", nextId); } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CreateTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CreateTable.java index ec0ad96..fbdb84e 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CreateTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/CreateTable.java @@ -30,29 +30,31 @@ import org.apache.hadoop.io.Text; public class CreateTable extends Test { - private final TreeSet<Text> splits; + private final NewTableConfiguration ntc; public CreateTable() { - splits = new TreeSet<>(); + TreeSet<Text> splits = new TreeSet<>(); for (int i = 1; i < 10; i++) { splits.add(new Text(Integer.toString(i))); } for (String s : "a b c d e f".split(" ")) { splits.add(new Text(s)); } + this.ntc = new NewTableConfiguration().withSplits(splits); } @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { AccumuloClient client = env.getAccumuloClient(); - int nextId = (Integer) state.get("nextId"); + int nextId = state.getInteger("nextId"); String tableName = String.format("%s_%d", state.getString("tableNamePrefix"), nextId); try { - // Create table and add some splits to make the server's life easier - client.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); + // Create table with some splits to make the server's life easier + client.tableOperations().create(tableName, ntc); String tableId = client.tableOperations().tableIdMap().get(tableName); - log.debug("created table {} (id:{}) with {} splits", tableName, tableId, splits.size()); + log.debug("created table {} (id:{}) with {} splits", tableName, tableId, + ntc.getSplits().size()); @SuppressWarnings("unchecked") List<String> tables = (List<String>) state.get("tableList"); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/OfflineTable.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/OfflineTable.java index 0eac64c..a01a698 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/OfflineTable.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/OfflineTable.java @@ -31,7 +31,7 @@ public class OfflineTable extends Test { @SuppressWarnings("unchecked") List<String> tables = (List<String>) state.get("tableList"); - if (tables.size() <= 0) { + if (tables.isEmpty()) { return; } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java index a69ea7b..2f44992 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/multitable/Write.java @@ -50,7 +50,7 @@ public class Write extends Test { String tableName = tables.get(env.getRandom().nextInt(tables.size())); - BatchWriter bw = null; + BatchWriter bw; try { bw = env.getMultiTableBatchWriter().getBatchWriter(tableName); } catch (TableOfflineException e) { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java index ea40a30..2cd4928 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/BatchVerify.java @@ -22,12 +22,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.Properties; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; @@ -80,15 +80,10 @@ public class BatchVerify extends Test { scanner.setRanges(ranges); - List<Key> keys = new ArrayList<>(); - for (Entry<Key,Value> entry : scanner) { - keys.add(entry.getKey()); - } + List<Key> keys = scanner.stream().map(Entry::getKey).sorted().collect(Collectors.toList()); log.debug("scan returned " + keys.size() + " rows. now verifying..."); - Collections.sort(keys); - Iterator<Key> iterator = keys.iterator(); int curKey = Integer.parseInt(iterator.next().getRow().toString()); boolean done = false; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/MapRedVerify.java b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/MapRedVerify.java index db17deb..efb7812 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/MapRedVerify.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/MapRedVerify.java @@ -45,18 +45,21 @@ public class MapRedVerify extends Test { return; } - Scanner outputScanner = env.getAccumuloClient().createScanner(args[2], Authorizations.EMPTY); - outputScanner.setRange(new Range()); + AccumuloClient client = env.getAccumuloClient(); + int count; + try (Scanner outputScanner = client.createScanner(args[2], Authorizations.EMPTY)) { + outputScanner.setRange(new Range()); - int count = 0; - Key lastKey = null; - for (Entry<Key,Value> entry : outputScanner) { - Key current = entry.getKey(); - if (lastKey != null && lastKey.getColumnFamily().equals(current.getRow())) { - log.info(entry.getKey().toString()); - count++; + count = 0; + Key lastKey = null; + for (Entry<Key,Value> entry : outputScanner) { + Key current = entry.getKey(); + if (lastKey != null && lastKey.getColumnFamily().equals(current.getRow())) { + log.info(entry.getKey().toString()); + count++; + } + lastKey = current; } - lastKey = current; } if (count > 1) { @@ -64,7 +67,6 @@ public class MapRedVerify extends Test { } log.debug("Dropping table: " + args[2]); - AccumuloClient client = env.getAccumuloClient(); client.tableOperations().delete(args[2]); } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/SequentialFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/SequentialFixture.java index 251ab36..d3a0d30 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/SequentialFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/sequential/SequentialFixture.java @@ -17,11 +17,13 @@ package org.apache.accumulo.testing.randomwalk.sequential; import java.net.InetAddress; +import java.util.Map; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.testing.randomwalk.Fixture; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; @@ -41,15 +43,17 @@ public class SequentialFixture extends Fixture { System.currentTimeMillis()); state.set("seqTableName", seqTableName); + NewTableConfiguration ntc = new NewTableConfiguration() + .setProperties(Map.of("table.scan.max.memory", "1K")); + try { - client.tableOperations().create(seqTableName); + client.tableOperations().create(seqTableName, ntc); log.debug("Created table " + seqTableName + " (id:" + client.tableOperations().tableIdMap().get(seqTableName) + ")"); } catch (TableExistsException e) { log.warn("Table " + seqTableName + " already exists!"); throw e; } - client.tableOperations().setProperty(seqTableName, "table.scan.max.memory", "1K"); state.set("numWrites", 0L); state.set("totalWrites", 0L); @@ -70,7 +74,7 @@ public class SequentialFixture extends Fixture { env.resetMultiTableBatchWriter(); } - log.debug("Dropping tables: " + seqTableName); + log.debug("Dropping table: " + seqTableName); AccumuloClient client = env.getAccumuloClient(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java index 1916e57..a9457e0 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/BulkInsert.java @@ -40,6 +40,7 @@ import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; import org.apache.accumulo.testing.randomwalk.Test; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -49,7 +50,7 @@ import org.apache.hadoop.util.ToolRunner; public class BulkInsert extends Test { - class SeqfileBatchWriter implements BatchWriter { + static class SeqfileBatchWriter implements BatchWriter { SequenceFile.Writer writer; @@ -98,11 +99,11 @@ public class BulkInsert extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); + int numPartitions = state.getInteger("numPartitions"); Random rand = state.getRandom(); - long nextDocID = (Long) state.get("nextDocID"); + long nextDocID = state.getLong("nextDocID"); int minInsert = Integer.parseInt(props.getProperty("minInsert")); int maxInsert = Integer.parseInt(props.getProperty("maxInsert")); @@ -117,34 +118,32 @@ public class BulkInsert extends Test { fs.mkdirs(new Path(rootDir)); - BatchWriter dataWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/data.seq"); - BatchWriter indexWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/index.seq"); + try (BatchWriter dataWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/data.seq"); + BatchWriter indexWriter = new SeqfileBatchWriter(conf, fs, rootDir + "/index.seq")) { - for (int i = 0; i < numToInsert; i++) { - String docID = Insert.insertRandomDocument(nextDocID++, dataWriter, indexWriter, - indexTableName, dataTableName, numPartitions, rand); - log.debug("Bulk inserting document " + docID); - } - - state.set("nextDocID", Long.valueOf(nextDocID)); + for (int i = 0; i < numToInsert; i++) { + String docID = Insert.insertRandomDocument(nextDocID++, dataWriter, indexWriter, + numPartitions, rand); + log.debug("Bulk inserting document " + docID); + } - dataWriter.close(); - indexWriter.close(); + state.set("nextDocID", nextDocID); + } - sort(state, env, fs, dataTableName, rootDir + "/data.seq", rootDir + "/data_bulk", + sort(env, fs, dataTableName, rootDir + "/data.seq", rootDir + "/data_bulk", rootDir + "/data_work", maxSplits); - sort(state, env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk", + sort(env, fs, indexTableName, rootDir + "/index.seq", rootDir + "/index_bulk", rootDir + "/index_work", maxSplits); - bulkImport(fs, state, env, dataTableName, rootDir, "data"); - bulkImport(fs, state, env, indexTableName, rootDir, "index"); + bulkImport(fs, env, dataTableName, rootDir, "data"); + bulkImport(fs, env, indexTableName, rootDir, "index"); fs.delete(new Path(rootDir), true); } @SuppressWarnings("deprecation") - private void bulkImport(FileSystem fs, State state, RandWalkEnv env, String tableName, - String rootDir, String prefix) throws Exception { + private void bulkImport(FileSystem fs, RandWalkEnv env, String tableName, String rootDir, + String prefix) throws Exception { while (true) { String bulkDir = rootDir + "/" + prefix + "_bulk"; String failDir = rootDir + "/" + prefix + "_failure"; @@ -154,40 +153,42 @@ public class BulkInsert extends Test { env.getAccumuloClient().tableOperations().importDirectory(tableName, bulkDir, failDir, true); FileStatus[] failures = fs.listStatus(failPath); - if (failures != null && failures.length > 0) { - log.warn("Failed to bulk import some files, retrying "); - - for (FileStatus failure : failures) { - if (!failure.getPath().getName().endsWith(".seq")) - fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName())); - else - log.debug("Ignoring " + failure.getPath()); - } - sleepUninterruptibly(3, TimeUnit.SECONDS); - } else + + if (failures == null || failures.length == 0) break; + + log.warn("Failed to bulk import some files, retrying "); + + for (FileStatus failure : failures) { + if (!failure.getPath().getName().endsWith(".seq")) + fs.rename(failure.getPath(), new Path(new Path(bulkDir), failure.getPath().getName())); + else + log.debug("Ignoring " + failure.getPath()); + } + sleepUninterruptibly(3, TimeUnit.SECONDS); + } } - private void sort(State state, RandWalkEnv env, FileSystem fs, String tableName, String seqFile, + private void sort(RandWalkEnv env, FileSystem fs, String tableName, String seqFile, String outputDir, String workDir, int maxSplits) throws Exception { - PrintStream out = new PrintStream( - new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))), false, - UTF_8.name()); + try (FSDataOutputStream fsDataOutputStream = fs.create(new Path(workDir + "/splits.txt")); + BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fsDataOutputStream); + PrintStream printStream = new PrintStream(bufferedOutputStream, false, UTF_8)) { - AccumuloClient client = env.getAccumuloClient(); + AccumuloClient client = env.getAccumuloClient(); - Collection<Text> splits = client.tableOperations().listSplits(tableName, maxSplits); - for (Text split : splits) - out.println(Base64.getEncoder().encodeToString(split.copyBytes())); + Collection<Text> splits = client.tableOperations().listSplits(tableName, maxSplits); + for (Text split : splits) + printStream.println(Base64.getEncoder().encodeToString(split.copyBytes())); - out.close(); + SortTool sortTool = new SortTool(seqFile, outputDir, workDir + "/splits.txt", splits); - SortTool sortTool = new SortTool(seqFile, outputDir, workDir + "/splits.txt", splits); + if (ToolRunner.run(env.getHadoopConfiguration(), sortTool, new String[0]) != 0) { + throw new Exception("Failed to run map/red verify"); + } - if (ToolRunner.run(env.getHadoopConfiguration(), sortTool, new String[0]) != 0) { - throw new Exception("Failed to run map/red verify"); } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CloneIndex.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CloneIndex.java index 9daaf13..4c7b0f7 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CloneIndex.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CloneIndex.java @@ -29,7 +29,7 @@ public class CloneIndex extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); + String indexTableName = state.getString("indexTableName"); String tmpIndexTableName = indexTableName + "_tmp"; long t1 = System.currentTimeMillis(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CompactFilter.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CompactFilter.java index e4689af..bbac5ff 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CompactFilter.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/CompactFilter.java @@ -42,8 +42,8 @@ public class CompactFilter extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String docTableName = (String) state.get("docTableName"); + String indexTableName = state.getString("indexTableName"); + String docTableName = state.getString("docTableName"); Random rand = state.getRandom(); String deleteChar = Integer.toHexString(rand.nextInt(16)) + ""; @@ -77,22 +77,22 @@ public class CompactFilter extends Test { log.debug( "Filtered documents using compaction iterators " + regex + " " + (t3) + " " + (t2 - t1)); - BatchScanner bscanner = env.getAccumuloClient().createBatchScanner(docTableName, - new Authorizations(), 10); + try (BatchScanner bscanner = env.getAccumuloClient().createBatchScanner(docTableName, + new Authorizations(), 10)) { - List<Range> ranges = new ArrayList<>(); - for (int i = 0; i < 16; i++) { - ranges.add(Range.prefix(new Text(Integer.toHexString(i) + "" + deleteChar))); - } + List<Range> ranges = new ArrayList<>(); + for (int i = 0; i < 16; i++) { + ranges.add(Range.prefix(new Text(Integer.toHexString(i) + "" + deleteChar))); + } - bscanner.setRanges(ranges); - Iterator<Entry<Key,Value>> iter = bscanner.iterator(); + bscanner.setRanges(ranges); + Iterator<Entry<Key,Value>> iter = bscanner.iterator(); - if (iter.hasNext()) { - throw new Exception("Saw unexpected document " + iter.next().getKey()); - } + if (iter.hasNext()) { + throw new Exception("Saw unexpected document " + iter.next().getKey()); + } - bscanner.close(); + } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Delete.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Delete.java index 340a6b3..fb00b3b 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Delete.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Delete.java @@ -31,12 +31,12 @@ public class Delete extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); + int numPartitions = state.getInteger("numPartitions"); Random rand = state.getRandom(); - Entry<Key,Value> entry = Search.findRandomDocument(state, env, dataTableName, rand); + Entry<Key,Value> entry = Search.findRandomDocument(env, dataTableName, rand); if (entry == null) return; diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteSomeDocs.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteSomeDocs.java index baf4719..9cc9836 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteSomeDocs.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteSomeDocs.java @@ -16,10 +16,11 @@ */ package org.apache.accumulo.testing.randomwalk.shard; -import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.Random; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchDeleter; import org.apache.accumulo.core.client.BatchWriterConfig; @@ -36,47 +37,44 @@ public class DeleteSomeDocs extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - // delete documents that where the document id matches a given pattern - // from doc and index table - // using the batch deleter + // delete documents where the document id matches a given pattern + // from doc and index table using the batch deleter Random rand = state.getRandom(); - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); - ArrayList<String> patterns = new ArrayList<>(); - - for (Object key : props.keySet()) - if (key instanceof String && ((String) key).startsWith("pattern")) - patterns.add(props.getProperty((String) key)); + List<String> patterns = props.keySet().stream().filter(key -> key instanceof String) + .map(key -> (String) key).filter(key -> key.startsWith("pattern")).map(props::getProperty) + .collect(Collectors.toList()); String pattern = patterns.get(rand.nextInt(patterns.size())); BatchWriterConfig bwc = new BatchWriterConfig(); - BatchDeleter ibd = env.getAccumuloClient().createBatchDeleter(indexTableName, - Authorizations.EMPTY, 8, bwc); - ibd.setRanges(Collections.singletonList(new Range())); - IteratorSetting iterSettings = new IteratorSetting(100, RegExFilter.class); - RegExFilter.setRegexs(iterSettings, null, null, pattern, null, false); + try (BatchDeleter ibd = env.getAccumuloClient().createBatchDeleter(indexTableName, + Authorizations.EMPTY, 8, bwc)) { + ibd.setRanges(Collections.singletonList(new Range())); + + RegExFilter.setRegexs(iterSettings, null, null, pattern, null, false); - ibd.addScanIterator(iterSettings); + ibd.addScanIterator(iterSettings); - ibd.delete(); + ibd.delete(); - ibd.close(); + } - BatchDeleter dbd = env.getAccumuloClient().createBatchDeleter(dataTableName, - Authorizations.EMPTY, 8, bwc); - dbd.setRanges(Collections.singletonList(new Range())); + try (BatchDeleter dbd = env.getAccumuloClient().createBatchDeleter(dataTableName, + Authorizations.EMPTY, 8, bwc)) { + dbd.setRanges(Collections.singletonList(new Range())); - iterSettings = new IteratorSetting(100, RegExFilter.class); - RegExFilter.setRegexs(iterSettings, pattern, null, null, null, false); + iterSettings = new IteratorSetting(100, RegExFilter.class); + RegExFilter.setRegexs(iterSettings, pattern, null, null, null, false); - dbd.addScanIterator(iterSettings); + dbd.addScanIterator(iterSettings); - dbd.delete(); + dbd.delete(); - dbd.close(); + } log.debug("Deleted documents w/ id matching '" + pattern + "'"); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteWord.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteWord.java index e61c4ad..807b4fb 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteWord.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteWord.java @@ -16,10 +16,11 @@ */ package org.apache.accumulo.testing.randomwalk.shard; -import java.util.ArrayList; +import java.util.List; import java.util.Map.Entry; import java.util.Properties; import java.util.Random; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; @@ -43,9 +44,9 @@ public class DeleteWord extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String docTableName = (String) state.get("docTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String indexTableName = state.getString("indexTableName"); + String docTableName = state.getString("docTableName"); + int numPartitions = state.getInteger("numPartitions"); Random rand = state.getRandom(); String wordToDelete = Insert.generateRandomWord(rand); @@ -53,16 +54,17 @@ public class DeleteWord extends Test { // use index to find all documents containing word Scanner scanner = env.getAccumuloClient().createScanner(indexTableName, Authorizations.EMPTY); scanner.fetchColumnFamily(new Text(wordToDelete)); + List<Range> documentsToDelete = scanner.stream().onClose(scanner::close).map(Entry::getKey) + .map(Key::getColumnQualifier).map(Range::new).collect(Collectors.toList()); - ArrayList<Range> documentsToDelete = new ArrayList<>(); - - for (Entry<Key,Value> entry : scanner) - documentsToDelete.add(new Range(entry.getKey().getColumnQualifier())); + if (documentsToDelete.isEmpty()) { + log.debug("No documents to delete containing " + wordToDelete); + return; + } - if (documentsToDelete.size() > 0) { - // use a batch scanner to fetch all documents - BatchScanner bscanner = env.getAccumuloClient().createBatchScanner(docTableName, - Authorizations.EMPTY, 8); + // use a batch scanner to fetch all documents + try (BatchScanner bscanner = env.getAccumuloClient().createBatchScanner(docTableName, + Authorizations.EMPTY, 8)) { bscanner.setRanges(documentsToDelete); BatchWriter ibw = env.getMultiTableBatchWriter().getBatchWriter(indexTableName); @@ -83,8 +85,6 @@ public class DeleteWord extends Test { count++; } - bscanner.close(); - env.getMultiTableBatchWriter().flush(); if (count != documentsToDelete.size()) { diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ExportIndex.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ExportIndex.java index 7c81e3c..aa657b5 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ExportIndex.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ExportIndex.java @@ -22,13 +22,15 @@ import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.HashMap; import java.util.HashSet; -import java.util.Map.Entry; import java.util.Properties; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; import org.apache.accumulo.testing.randomwalk.State; import org.apache.accumulo.testing.randomwalk.Test; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -42,16 +44,16 @@ public class ExportIndex extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); + String indexTableName = state.getString("indexTableName"); String tmpIndexTableName = indexTableName + "_tmp"; - String exportDir = "/tmp/shard_export/" + indexTableName; - String copyDir = "/tmp/shard_export/" + tmpIndexTableName; + Path exportDir = new Path("/tmp/shard_export/" + indexTableName); + Path copyDir = new Path("/tmp/shard_export/" + tmpIndexTableName); FileSystem fs = FileSystem.get(env.getHadoopConfiguration()); - fs.delete(new Path("/tmp/shard_export/" + indexTableName), true); - fs.delete(new Path("/tmp/shard_export/" + tmpIndexTableName), true); + fs.delete(exportDir, true); + fs.delete(copyDir, true); // disable spits, so that splits can be compared later w/o worrying one // table splitting and the other not @@ -65,31 +67,31 @@ public class ExportIndex extends Test { long t2 = System.currentTimeMillis(); - env.getAccumuloClient().tableOperations().exportTable(indexTableName, exportDir); + env.getAccumuloClient().tableOperations().exportTable(indexTableName, exportDir.toString()); long t3 = System.currentTimeMillis(); // copy files - BufferedReader reader = new BufferedReader( - new InputStreamReader(fs.open(new Path(exportDir, "distcp.txt")), UTF_8)); - String file = null; - while ((file = reader.readLine()) != null) { - Path src = new Path(file); - Path dest = new Path(new Path(copyDir), src.getName()); - FileUtil.copy(fs, src, fs, dest, false, true, env.getHadoopConfiguration()); + try (FSDataInputStream fsDataInputStream = fs.open(new Path(exportDir, "distcp.txt")); + InputStreamReader inputStreamReader = new InputStreamReader(fsDataInputStream, UTF_8); + BufferedReader reader = new BufferedReader(inputStreamReader)) { + String file; + while ((file = reader.readLine()) != null) { + Path src = new Path(file); + Path dest = new Path(copyDir, src.getName()); + FileUtil.copy(fs, src, fs, dest, false, true, env.getHadoopConfiguration()); + } } - reader.close(); - long t4 = System.currentTimeMillis(); env.getAccumuloClient().tableOperations().online(indexTableName); - env.getAccumuloClient().tableOperations().importTable(tmpIndexTableName, copyDir); + env.getAccumuloClient().tableOperations().importTable(tmpIndexTableName, copyDir.toString()); long t5 = System.currentTimeMillis(); - fs.delete(new Path(exportDir), true); - fs.delete(new Path(copyDir), true); + fs.delete(exportDir, true); + fs.delete(copyDir, true); HashSet<Text> splits1 = new HashSet<>( env.getAccumuloClient().tableOperations().listSplits(indexTableName)); @@ -99,15 +101,8 @@ public class ExportIndex extends Test { if (!splits1.equals(splits2)) throw new Exception("Splits not equals " + indexTableName + " " + tmpIndexTableName); - HashMap<String,String> props1 = new HashMap<>(); - for (Entry<String,String> entry : env.getAccumuloClient().tableOperations() - .getProperties(indexTableName)) - props1.put(entry.getKey(), entry.getValue()); - - HashMap<String,String> props2 = new HashMap<>(); - for (Entry<String,String> entry : env.getAccumuloClient().tableOperations() - .getProperties(tmpIndexTableName)) - props2.put(entry.getKey(), entry.getValue()); + HashMap<String,String> props1 = getPropsFromTable(indexTableName, env); + HashMap<String,String> props2 = getPropsFromTable(tmpIndexTableName, env); if (!props1.equals(props2)) throw new Exception("Props not equals " + indexTableName + " " + tmpIndexTableName); @@ -123,4 +118,14 @@ public class ExportIndex extends Test { } + private static HashMap<String,String> getPropsFromTable(String tableName, RandWalkEnv env) + throws AccumuloException, TableNotFoundException { + return new HashMap<>() { + { + for (var entry : env.getAccumuloClient().tableOperations().getProperties(tableName)) + put(entry.getKey(), entry.getValue()); + } + }; + } + } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Flush.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Flush.java index 741a60c..ddd6f8b 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Flush.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Flush.java @@ -27,16 +27,11 @@ public class Flush extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); Random rand = state.getRandom(); - String table; - - if (rand.nextDouble() < .5) - table = indexTableName; - else - table = dataTableName; + String table = rand.nextBoolean() ? indexTableName : dataTableName; env.getAccumuloClient().tableOperations().flush(table, null, null, true); log.debug("Flushed " + table); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Grep.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Grep.java index acbbcb4..809659a 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Grep.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Grep.java @@ -43,8 +43,8 @@ public class Grep extends Test { // pick a few randoms words... grep for those words and search the index // ensure both return the same set of documents - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); Random rand = state.getRandom(); Text[] words = new Text[rand.nextInt(4) + 2]; @@ -53,40 +53,41 @@ public class Grep extends Test { words[i] = new Text(Insert.generateRandomWord(rand)); } - BatchScanner bs = env.getAccumuloClient().createBatchScanner(indexTableName, - Authorizations.EMPTY, 16); - IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class.getName()); - IntersectingIterator.setColumnFamilies(ii, words); - bs.addScanIterator(ii); - bs.setRanges(Collections.singleton(new Range())); - HashSet<Text> documentsFoundInIndex = new HashSet<>(); - for (Entry<Key,Value> entry2 : bs) { - documentsFoundInIndex.add(entry2.getKey().getColumnQualifier()); + try (BatchScanner bs = env.getAccumuloClient().createBatchScanner(indexTableName, + Authorizations.EMPTY, 16)) { + IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class.getName()); + IntersectingIterator.setColumnFamilies(ii, words); + bs.addScanIterator(ii); + bs.setRanges(Collections.singleton(new Range())); + + for (Entry<Key,Value> entry : bs) { + documentsFoundInIndex.add(entry.getKey().getColumnQualifier()); + } + } - bs.close(); + HashSet<Text> documentsFoundByGrep = new HashSet<>(); - bs = env.getAccumuloClient().createBatchScanner(dataTableName, Authorizations.EMPTY, 16); + try (BatchScanner bs = env.getAccumuloClient().createBatchScanner(dataTableName, + Authorizations.EMPTY, 16)) { - for (int i = 0; i < words.length; i++) { - IteratorSetting more = new IteratorSetting(20 + i, "ii" + i, RegExFilter.class); - RegExFilter.setRegexs(more, null, null, null, "(^|(.*\\s))" + words[i] + "($|(\\s.*))", - false); - bs.addScanIterator(more); - } + for (int i = 0; i < words.length; i++) { + IteratorSetting more = new IteratorSetting(20 + i, "ii" + i, RegExFilter.class); + RegExFilter.setRegexs(more, null, null, null, "(^|(.*\\s))" + words[i] + "($|(\\s.*))", + false); + bs.addScanIterator(more); + } - bs.setRanges(Collections.singleton(new Range())); + bs.setRanges(Collections.singleton(new Range())); - HashSet<Text> documentsFoundByGrep = new HashSet<>(); + for (Entry<Key,Value> entry : bs) { + documentsFoundByGrep.add(entry.getKey().getRow()); + } - for (Entry<Key,Value> entry2 : bs) { - documentsFoundByGrep.add(entry2.getKey().getRow()); } - bs.close(); - if (!documentsFoundInIndex.equals(documentsFoundByGrep)) { throw new Exception("Set of documents found not equal for words " + Arrays.toString(words) + " " + documentsFoundInIndex + " " + documentsFoundByGrep); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Insert.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Insert.java index 4b86dbf..695a378 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Insert.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Insert.java @@ -20,10 +20,7 @@ import java.util.HashSet; import java.util.Properties; import java.util.Random; -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.testing.randomwalk.RandWalkEnv; @@ -38,26 +35,24 @@ public class Insert extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); + int numPartitions = state.getInteger("numPartitions"); + long nextDocID = state.getLong("nextDocID"); Random rand = state.getRandom(); - long nextDocID = (Long) state.get("nextDocID"); BatchWriter dataWriter = env.getMultiTableBatchWriter().getBatchWriter(dataTableName); BatchWriter indexWriter = env.getMultiTableBatchWriter().getBatchWriter(indexTableName); - String docID = insertRandomDocument(nextDocID++, dataWriter, indexWriter, indexTableName, - dataTableName, numPartitions, rand); + String docID = insertRandomDocument(nextDocID++, dataWriter, indexWriter, numPartitions, rand); log.debug("Inserted document " + docID); - state.set("nextDocID", Long.valueOf(nextDocID)); + state.set("nextDocID", nextDocID); } static String insertRandomDocument(long did, BatchWriter dataWriter, BatchWriter indexWriter, - String indexTableName, String dataTableName, int numPartitions, Random rand) - throws TableNotFoundException, Exception, AccumuloException, AccumuloSecurityException { + int numPartitions, Random rand) throws Exception { String doc = createDocument(rand); String docID = new StringBuilder(String.format("%016x", did)).reverse().toString(); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Merge.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Merge.java index 91917ee..e6456c6 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Merge.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Merge.java @@ -30,7 +30,7 @@ public class Merge extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); + String indexTableName = state.getString("indexTableName"); Collection<Text> splits = env.getAccumuloClient().tableOperations().listSplits(indexTableName); SortedSet<Text> splitSet = new TreeSet<>(splits); @@ -40,8 +40,7 @@ public class Merge extends Test { merge.mergomatic(env.getAccumuloClient(), indexTableName, null, null, 256 * 1024 * 1024, true); splits = env.getAccumuloClient().tableOperations().listSplits(indexTableName); if (splits.size() > splitSet.size()) { - // throw an excpetion so that test will die an no further changes to - // table will occur... + // throw an exception so that test will die and no further changes to table will occur... // this way table is left as is for debugging. throw new Exception( "There are more tablets after a merge: " + splits.size() + " was " + splitSet.size()); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Reindex.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Reindex.java index 11d3c61..419e00d 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Reindex.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Reindex.java @@ -34,31 +34,32 @@ public class Reindex extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); + String indexTableName = state.getString("indexTableName"); String tmpIndexTableName = indexTableName + "_tmp"; - String docTableName = (String) state.get("docTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String docTableName = state.getString("docTableName"); + int numPartitions = state.getInteger("numPartitions"); Random rand = state.getRandom(); ShardFixture.createIndexTable(this.log, state, env, "_tmp", rand); - Scanner scanner = env.getAccumuloClient().createScanner(docTableName, Authorizations.EMPTY); - BatchWriter tbw = env.getAccumuloClient().createBatchWriter(tmpIndexTableName, - new BatchWriterConfig()); - int count = 0; - for (Entry<Key,Value> entry : scanner) { - String docID = entry.getKey().getRow().toString(); - String doc = entry.getValue().toString(); + try ( + Scanner scanner = env.getAccumuloClient().createScanner(docTableName, Authorizations.EMPTY); + BatchWriter tbw = env.getAccumuloClient().createBatchWriter(tmpIndexTableName, + new BatchWriterConfig())) { - Insert.indexDocument(tbw, doc, docID, numPartitions); + for (Entry<Key,Value> entry : scanner) { + String docID = entry.getKey().getRow().toString(); + String doc = entry.getValue().toString(); - count++; - } + Insert.indexDocument(tbw, doc, docID, numPartitions); - tbw.close(); + count++; + } + + } log.debug("Reindexed " + count + " documents into " + tmpIndexTableName); diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Search.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Search.java index 56ee193..163cd43 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Search.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Search.java @@ -17,11 +17,12 @@ package org.apache.accumulo.testing.randomwalk.shard; import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; import java.util.Map.Entry; import java.util.Properties; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; @@ -40,12 +41,12 @@ public class Search extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - String dataTableName = (String) state.get("docTableName"); + String indexTableName = state.getString("indexTableName"); + String dataTableName = state.getString("docTableName"); Random rand = state.getRandom(); - Entry<Key,Value> entry = findRandomDocument(state, env, dataTableName, rand); + Entry<Key,Value> entry = findRandomDocument(env, dataTableName, rand); if (entry == null) return; @@ -53,56 +54,42 @@ public class Search extends Test { String doc = entry.getValue().toString(); String[] tokens = doc.split("\\W+"); - int numSearchTerms = rand.nextInt(6); - if (numSearchTerms < 2) - numSearchTerms = 2; - - HashSet<String> searchTerms = new HashSet<>(); - while (searchTerms.size() < numSearchTerms) - searchTerms.add(tokens[rand.nextInt(tokens.length)]); - - Text[] columns = new Text[searchTerms.size()]; - int index = 0; - for (String term : searchTerms) { - columns[index++] = new Text(term); - } + int numSearchTerms = rand.nextInt(4) + 2; - log.debug("Looking up terms " + searchTerms + " expect to find " + docID); + Set<String> searchTerms = Stream.generate(() -> tokens[rand.nextInt(tokens.length)]) + .limit(numSearchTerms).collect(Collectors.toSet()); - BatchScanner bs = env.getAccumuloClient().createBatchScanner(indexTableName, - Authorizations.EMPTY, 10); - IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class); - IntersectingIterator.setColumnFamilies(ii, columns); - bs.addScanIterator(ii); - bs.setRanges(Collections.singleton(new Range())); + Text[] columns = searchTerms.stream().map(Text::new).toArray(Text[]::new); - boolean sawDocID = false; + log.debug("Looking up terms " + searchTerms + " expect to find " + docID); - for (Entry<Key,Value> entry2 : bs) { - if (entry2.getKey().getColumnQualifier().equals(docID)) { - sawDocID = true; - break; - } - } + try (BatchScanner bs = env.getAccumuloClient().createBatchScanner(indexTableName, + Authorizations.EMPTY, 10)) { - bs.close(); + IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class); + IntersectingIterator.setColumnFamilies(ii, columns); + bs.addScanIterator(ii); + bs.setRanges(Collections.singleton(new Range())); - if (!sawDocID) - throw new Exception("Did not see doc " + docID + " in index. terms:" + searchTerms + " " - + indexTableName + " " + dataTableName); - } + boolean sawDocID = bs.stream().map(Entry::getKey).map(Key::getColumnQualifier) + .anyMatch(cq -> cq.equals(docID)); - static Entry<Key,Value> findRandomDocument(State state, RandWalkEnv env, String dataTableName, - Random rand) throws Exception { - Scanner scanner = env.getAccumuloClient().createScanner(dataTableName, Authorizations.EMPTY); - scanner.setBatchSize(1); - scanner.setRange(new Range(Integer.toString(rand.nextInt(0xfffffff), 16), null)); + if (!sawDocID) + throw new Exception("Did not see doc " + docID + " in index. terms:" + searchTerms + " " + + indexTableName + " " + dataTableName); + } - Iterator<Entry<Key,Value>> iter = scanner.iterator(); - if (!iter.hasNext()) - return null; + } - return iter.next(); + static Entry<Key,Value> findRandomDocument(RandWalkEnv env, String dataTableName, Random rand) + throws Exception { + try (Scanner scanner = env.getAccumuloClient().createScanner(dataTableName, + Authorizations.EMPTY)) { + scanner.setBatchSize(1); + scanner.setRange(new Range(Integer.toString(rand.nextInt(0xfffffff), 16), null)); + var entry = scanner.stream().findAny(); + return entry.orElse(null); + } } } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ShardFixture.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ShardFixture.java index b967051..fa381f1 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ShardFixture.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/ShardFixture.java @@ -54,7 +54,7 @@ public class ShardFixture extends Fixture { static void createIndexTable(Logger log, State state, RandWalkEnv env, String suffix, Random rand) throws Exception { AccumuloClient client = env.getAccumuloClient(); - String name = state.get("indexTableName") + suffix; + String name = state.getString("indexTableName") + suffix; NewTableConfiguration ntc = new NewTableConfiguration(); @@ -86,10 +86,10 @@ public class ShardFixture extends Fixture { String.format("ST_index_%s_%s_%d", hostname, pid, System.currentTimeMillis())); state.set("docTableName", String.format("ST_docs_%s_%s_%d", hostname, pid, System.currentTimeMillis())); - state.set("numPartitions", Integer.valueOf(numPartitions)); - state.set("cacheIndex", env.getRandom().nextDouble() < .5); + state.set("numPartitions", numPartitions); + state.set("cacheIndex", env.getRandom().nextBoolean()); state.set("rand", env.getRandom()); - state.set("nextDocID", Long.valueOf(0)); + state.set("nextDocID", 0L); AccumuloClient client = env.getAccumuloClient(); @@ -99,7 +99,7 @@ public class ShardFixture extends Fixture { NewTableConfiguration ntc = new NewTableConfiguration(); SortedSet<Text> splits = genSplits(0xff, env.getRandom().nextInt(32) + 1, "%02x"); ntc.withSplits(splits); - if (env.getRandom().nextDouble() < .5) { + if (env.getRandom().nextBoolean()) { ntc.setProperties(Map.of(Property.TABLE_BLOOM_ENABLED.getKey(), "true")); log.info("Enabling bloom filters for table {}", docTableName); } @@ -128,8 +128,8 @@ public class ShardFixture extends Fixture { log.info("Deleting index and doc tables"); - client.tableOperations().delete((String) state.get("indexTableName")); - client.tableOperations().delete((String) state.get("docTableName")); + client.tableOperations().delete(state.getString("indexTableName")); + client.tableOperations().delete(state.getString("docTableName")); log.debug("Exiting shard test"); } diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Split.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Split.java index 1cf381e..f578745 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Split.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/Split.java @@ -29,8 +29,8 @@ public class Split extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); - int numPartitions = (Integer) state.get("numPartitions"); + String indexTableName = state.getString("indexTableName"); + int numPartitions = state.getInteger("numPartitions"); Random rand = state.getRandom(); SortedSet<Text> splitSet = ShardFixture.genSplits(numPartitions, diff --git a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/VerifyIndex.java b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/VerifyIndex.java index 554d541..e059388 100644 --- a/src/main/java/org/apache/accumulo/testing/randomwalk/shard/VerifyIndex.java +++ b/src/main/java/org/apache/accumulo/testing/randomwalk/shard/VerifyIndex.java @@ -34,39 +34,41 @@ public class VerifyIndex extends Test { @Override public void visit(State state, RandWalkEnv env, Properties props) throws Exception { - String indexTableName = (String) state.get("indexTableName"); + String indexTableName = state.getString("indexTableName"); String tmpIndexTableName = indexTableName + "_tmp"; // scan new and old index and verify identical - Scanner indexScanner1 = env.getAccumuloClient().createScanner(tmpIndexTableName, - Authorizations.EMPTY); - Scanner indexScanner2 = env.getAccumuloClient().createScanner(indexTableName, - Authorizations.EMPTY); + try ( + Scanner indexScanner1 = env.getAccumuloClient().createScanner(tmpIndexTableName, + Authorizations.EMPTY); + Scanner indexScanner2 = env.getAccumuloClient().createScanner(indexTableName, + Authorizations.EMPTY)) { - Iterator<Entry<Key,Value>> iter = indexScanner2.iterator(); + Iterator<Entry<Key,Value>> iter = indexScanner2.iterator(); - int count = 0; + int count = 0; - for (Entry<Key,Value> entry : indexScanner1) { - if (!iter.hasNext()) - throw new Exception("index rebuild mismatch " + entry.getKey() + " " + indexTableName); + for (Entry<Key,Value> entry : indexScanner1) { + if (!iter.hasNext()) + throw new Exception("index rebuild mismatch " + entry.getKey() + " " + indexTableName); - Key key1 = entry.getKey(); - Key key2 = iter.next().getKey(); + Key key1 = entry.getKey(); + Key key2 = iter.next().getKey(); - if (!key1.equals(key2, PartialKey.ROW_COLFAM_COLQUAL)) - throw new Exception("index rebuild mismatch " + key1 + " " + key2 + " " + indexTableName - + " " + tmpIndexTableName); - count++; - if (count % 1000 == 0) - makingProgress(); - } + if (!key1.equals(key2, PartialKey.ROW_COLFAM_COLQUAL)) + throw new Exception("index rebuild mismatch " + key1 + " " + key2 + " " + indexTableName + + " " + tmpIndexTableName); + count++; + if (count % 1000 == 0) + makingProgress(); + } - if (iter.hasNext()) - throw new Exception( - "index rebuild mismatch " + iter.next().getKey() + " " + tmpIndexTableName); + if (iter.hasNext()) + throw new Exception( + "index rebuild mismatch " + iter.next().getKey() + " " + tmpIndexTableName); - log.debug("Verified " + count + " index entries "); + log.debug("Verified " + count + " index entries "); + } env.getAccumuloClient().tableOperations().delete(indexTableName); env.getAccumuloClient().tableOperations().rename(tmpIndexTableName, indexTableName);