[java client] Few ITClient improvements ITClient has been flaky for a while now, mostly due to the "Row count regressed" issue. I fixed it by using snapshot timestamps, which made me refactor how we build scanners, which made me add a new counting method in BaseKuduTest.
I continued running the test and saw other issues. Some unchecked errors were not killing the test, so I added an UncaughtExceptionHandler. I also saw invalid scanner sequence ID errors that are normal due to how this test runs that were killing the test. Finally, I converted some plain Exceptions into KuduExceptions which gave us access to their Status. Change-Id: I3b5ddca26b66e9fc1f737aaacf98df340f0b9024 Reviewed-on: http://gerrit.cloudera.org:8080/4489 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <a...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0ce5ba59 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0ce5ba59 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0ce5ba59 Branch: refs/heads/master Commit: 0ce5ba594412de4365625485ea7b3c1ee21bf28d Parents: 8fc75a5 Author: Jean-Daniel Cryans <jdcry...@apache.org> Authored: Tue Sep 20 18:20:01 2016 -0700 Committer: Jean-Daniel Cryans <jdcry...@apache.org> Committed: Thu Sep 22 00:33:27 2016 +0000 ---------------------------------------------------------------------- .../org/apache/kudu/client/BaseKuduTest.java | 19 +++-- .../java/org/apache/kudu/client/ITClient.java | 75 +++++++++++++++----- 2 files changed, 68 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java index 41015a4..8989f50 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java @@ -140,13 +140,17 @@ public class BaseKuduTest { data.addCallbacks(cb, defaultErrorCB); data.join(DEFAULT_SLEEP); } - - Deferred<RowResultIterator> closer = scanner.close(); - closer.addCallbacks(cb, defaultErrorCB); - closer.join(DEFAULT_SLEEP); return counter.get(); } + protected static int countRowsInScan(KuduScanner scanner) throws KuduException { + int counter = 0; + while (scanner.hasMoreRows()) { + counter += scanner.nextRows().getNumRows(); + } + return counter; + } + /** * Scans the table and returns the number of rows. * @param table the table @@ -155,17 +159,12 @@ public class BaseKuduTest { */ protected long countRowsInTable(KuduTable table, KuduPredicate... predicates) throws KuduException { - long count = 0; KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table); for (KuduPredicate predicate : predicates) { scanBuilder.addPredicate(predicate); } scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of()); - KuduScanner scanner = scanBuilder.build(); - while (scanner.hasMoreRows()) { - count += scanner.nextRows().getNumRows(); - } - return count; + return countRowsInScan(scanBuilder.build()); } protected List<String> scanTableToStrings(KuduTable table, http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java ---------------------------------------------------------------------- diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java index 10cf231..c482a86 100644 --- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java +++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java @@ -60,6 +60,8 @@ public class ITClient extends BaseKuduTest { private static KuduTable table; private static long runtimeInSeconds; + private volatile long sharedWriteTimestamp; + @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -89,6 +91,9 @@ public class ITClient extends BaseKuduTest { @Test(timeout = TEST_TIMEOUT_SECONDS) public void test() throws Exception { + + UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler(); + ArrayList<Thread> threads = new ArrayList<>(); Thread chaosThread = new Thread(new ChaosThread()); Thread writerThread = new Thread(new WriterThread()); @@ -99,6 +104,7 @@ public class ITClient extends BaseKuduTest { threads.add(scannerThread); for (Thread thread : threads) { + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); thread.start(); } @@ -235,6 +241,7 @@ public class ITClient extends BaseKuduTest { @Override public void run() { + session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED); while (KEEP_RUNNING_LATCH.getCount() > 0) { try { OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey)); @@ -279,6 +286,13 @@ public class ITClient extends BaseKuduTest { " returned this error: " + resp.getRowError(), null); return true; } + + if (resp == null) { + return false; + } + + sharedWriteTimestamp = resp.getWriteTimestampRaw(); + return false; } } @@ -299,8 +313,11 @@ public class ITClient extends BaseKuduTest { boolean shouldContinue; - // Always scan until we find rows. - if (lastRowCount == 0 || random.nextBoolean()) { + // First check if we've written at least one row. + if (sharedWriteTimestamp == 0) { + shouldContinue = true; + } else if (lastRowCount == 0 || // Need to full scan once before random reading + random.nextBoolean()) { shouldContinue = fullScan(); } else { shouldContinue = randomGet(); @@ -322,14 +339,16 @@ public class ITClient extends BaseKuduTest { } /** - * Reads a row at random that it knows to exist (smaller than lastRowCount). - * @return + * Reads a row at random that should exist (smaller than lastRowCount). + * @return true if the get was successful, false if there was an error */ private boolean randomGet() { int key = random.nextInt(lastRowCount); KuduPredicate predicate = KuduPredicate.newComparisonPredicate( table.getSchema().getColumnByIndex(0), KuduPredicate.ComparisonOp.EQUAL, key); - KuduScanner scanner = localClient.newScannerBuilder(table).addPredicate(predicate).build(); + KuduScanner scanner = getScannerBuilder() + .addPredicate(predicate) + .build(); List<RowResult> results = new ArrayList<>(); while (scanner.hasMoreRows()) { @@ -338,7 +357,7 @@ public class ITClient extends BaseKuduTest { for (RowResult row : ite) { results.add(row); } - } catch (Exception e) { + } catch (KuduException e) { return checkAndReportError("Got error while getting row " + key, e); } } @@ -357,25 +376,33 @@ public class ITClient extends BaseKuduTest { } /** - * Rusn a full table scan and updates the lastRowCount. - * @return + * Runs a full table scan and updates the lastRowCount. + * @return true if the full scan was successful, false if there was an error */ private boolean fullScan() { - AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build(); + KuduScanner scanner = getScannerBuilder().build(); try { - int rowCount = countRowsInScan(scannerBuilder); + int rowCount = countRowsInScan(scanner); if (rowCount < lastRowCount) { reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null); return false; } - lastRowCount = rowCount; - LOG.info("New row count {}", lastRowCount); - } catch (Exception e) { - checkAndReportError("Got error while row counting", e); + if (rowCount > lastRowCount) { + lastRowCount = rowCount; + LOG.info("New row count {}", lastRowCount); + } + } catch (KuduException e) { + return checkAndReportError("Got error while row counting", e); } return true; } + private KuduScanner.KuduScannerBuilder getScannerBuilder() { + return localClient.newScannerBuilder(table) + .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) + .snapshotTimestampRaw(sharedWriteTimestamp); + } + /** * Checks the passed exception contains "Scanner not found". If it does then it returns true, * else it reports the error and returns false. @@ -384,12 +411,28 @@ public class ITClient extends BaseKuduTest { * @param e the exception to check * @return true if the scanner failed because it wasn't false, otherwise false */ - private boolean checkAndReportError(String message, Exception e) { - if (!e.getCause().getMessage().contains("Scanner not found")) { + private boolean checkAndReportError(String message, KuduException e) { + // Do nasty things, expect nasty results. The scanners are a bit too happy to retry TS + // disconnections so we might end up retrying a scanner on a node that restarted, or we might + // get disconnected just after sending an RPC so when we reconnect to the same TS we might get + // the "Invalid call sequence ID" message. + if (!e.getStatus().isNotFound() && + !e.getStatus().getMessage().contains("Invalid call sequence ID")) { reportError(message, e); return false; } return true; } } + + class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + + @Override + public void uncaughtException(Thread t, Throwable e) { + // Only report an error if we're still running, else we'll spam the log. + if (KEEP_RUNNING_LATCH.getCount() != 0) { + reportError("Uncaught exception", new Exception(e)); + } + } + } } \ No newline at end of file