[java client] Few ITClient improvements

ITClient has been flaky for a while now, mostly due to the "Row count regressed"
issue. I fixed it by using snapshot timestamps, which made me refactor how we 
build
scanners, which made me add a new counting method in BaseKuduTest.

I continued running the test and saw other issues. Some unchecked errors were 
not
killing the test, so I added an UncaughtExceptionHandler. I also saw invalid
scanner sequence ID errors that are normal due to how this test runs that were 
killing
the test. Finally, I converted some plain Exceptions into KuduExceptions which 
gave
us access to their Status.

Change-Id: I3b5ddca26b66e9fc1f737aaacf98df340f0b9024
Reviewed-on: http://gerrit.cloudera.org:8080/4489
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <a...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0ce5ba59
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0ce5ba59
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0ce5ba59

Branch: refs/heads/master
Commit: 0ce5ba594412de4365625485ea7b3c1ee21bf28d
Parents: 8fc75a5
Author: Jean-Daniel Cryans <jdcry...@apache.org>
Authored: Tue Sep 20 18:20:01 2016 -0700
Committer: Jean-Daniel Cryans <jdcry...@apache.org>
Committed: Thu Sep 22 00:33:27 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/BaseKuduTest.java    | 19 +++--
 .../java/org/apache/kudu/client/ITClient.java   | 75 +++++++++++++++-----
 2 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 41015a4..8989f50 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -140,13 +140,17 @@ public class BaseKuduTest {
       data.addCallbacks(cb, defaultErrorCB);
       data.join(DEFAULT_SLEEP);
     }
-
-    Deferred<RowResultIterator> closer = scanner.close();
-    closer.addCallbacks(cb, defaultErrorCB);
-    closer.join(DEFAULT_SLEEP);
     return counter.get();
   }
 
+  protected static int countRowsInScan(KuduScanner scanner) throws 
KuduException {
+    int counter = 0;
+    while (scanner.hasMoreRows()) {
+      counter += scanner.nextRows().getNumRows();
+    }
+    return counter;
+  }
+
   /**
    * Scans the table and returns the number of rows.
    * @param table the table
@@ -155,17 +159,12 @@ public class BaseKuduTest {
    */
   protected long countRowsInTable(KuduTable table, KuduPredicate... predicates)
       throws KuduException {
-    long count = 0;
     KuduScanner.KuduScannerBuilder scanBuilder = 
syncClient.newScannerBuilder(table);
     for (KuduPredicate predicate : predicates) {
       scanBuilder.addPredicate(predicate);
     }
     scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
-    KuduScanner scanner = scanBuilder.build();
-    while (scanner.hasMoreRows()) {
-      count += scanner.nextRows().getNumRows();
-    }
-    return count;
+    return countRowsInScan(scanBuilder.build());
   }
 
   protected List<String> scanTableToStrings(KuduTable table,

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 10cf231..c482a86 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -60,6 +60,8 @@ public class ITClient extends BaseKuduTest {
   private static KuduTable table;
   private static long runtimeInSeconds;
 
+  private volatile long sharedWriteTimestamp;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
 
@@ -89,6 +91,9 @@ public class ITClient extends BaseKuduTest {
 
   @Test(timeout = TEST_TIMEOUT_SECONDS)
   public void test() throws Exception {
+
+    UncaughtExceptionHandler uncaughtExceptionHandler = new 
UncaughtExceptionHandler();
+
     ArrayList<Thread> threads = new ArrayList<>();
     Thread chaosThread = new Thread(new ChaosThread());
     Thread writerThread = new Thread(new WriterThread());
@@ -99,6 +104,7 @@ public class ITClient extends BaseKuduTest {
     threads.add(scannerThread);
 
     for (Thread thread : threads) {
+      thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
       thread.start();
     }
 
@@ -235,6 +241,7 @@ public class ITClient extends BaseKuduTest {
 
     @Override
     public void run() {
+      
session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED);
       while (KEEP_RUNNING_LATCH.getCount() > 0) {
         try {
           OperationResponse resp = 
session.apply(createBasicSchemaInsert(table, currentRowKey));
@@ -279,6 +286,13 @@ public class ITClient extends BaseKuduTest {
             " returned this error: " + resp.getRowError(), null);
         return true;
       }
+
+      if (resp == null) {
+        return false;
+      }
+
+      sharedWriteTimestamp = resp.getWriteTimestampRaw();
+
       return false;
     }
   }
@@ -299,8 +313,11 @@ public class ITClient extends BaseKuduTest {
 
         boolean shouldContinue;
 
-        // Always scan until we find rows.
-        if (lastRowCount == 0 || random.nextBoolean()) {
+        // First check if we've written at least one row.
+        if (sharedWriteTimestamp == 0) {
+          shouldContinue = true;
+        } else if (lastRowCount == 0 || // Need to full scan once before 
random reading
+            random.nextBoolean()) {
           shouldContinue = fullScan();
         } else {
           shouldContinue = randomGet();
@@ -322,14 +339,16 @@ public class ITClient extends BaseKuduTest {
     }
 
     /**
-     * Reads a row at random that it knows to exist (smaller than 
lastRowCount).
-     * @return
+     * Reads a row at random that should exist (smaller than lastRowCount).
+     * @return true if the get was successful, false if there was an error
      */
     private boolean randomGet() {
       int key = random.nextInt(lastRowCount);
       KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
           table.getSchema().getColumnByIndex(0), 
KuduPredicate.ComparisonOp.EQUAL, key);
-      KuduScanner scanner = 
localClient.newScannerBuilder(table).addPredicate(predicate).build();
+      KuduScanner scanner = getScannerBuilder()
+          .addPredicate(predicate)
+          .build();
 
       List<RowResult> results = new ArrayList<>();
       while (scanner.hasMoreRows()) {
@@ -338,7 +357,7 @@ public class ITClient extends BaseKuduTest {
           for (RowResult row : ite) {
             results.add(row);
           }
-        } catch (Exception e) {
+        } catch (KuduException e) {
           return checkAndReportError("Got error while getting row " + key, e);
         }
       }
@@ -357,25 +376,33 @@ public class ITClient extends BaseKuduTest {
     }
 
     /**
-     * Rusn a full table scan and updates the lastRowCount.
-     * @return
+     * Runs a full table scan and updates the lastRowCount.
+     * @return true if the full scan was successful, false if there was an 
error
      */
     private boolean fullScan() {
-      AsyncKuduScanner scannerBuilder = 
localAsyncClient.newScannerBuilder(table).build();
+      KuduScanner scanner = getScannerBuilder().build();
       try {
-        int rowCount = countRowsInScan(scannerBuilder);
+        int rowCount = countRowsInScan(scanner);
         if (rowCount < lastRowCount) {
           reportError("Row count regressed: " + rowCount + " < " + 
lastRowCount, null);
           return false;
         }
-        lastRowCount = rowCount;
-        LOG.info("New row count {}", lastRowCount);
-      } catch (Exception e) {
-        checkAndReportError("Got error while row counting", e);
+        if (rowCount > lastRowCount) {
+          lastRowCount = rowCount;
+          LOG.info("New row count {}", lastRowCount);
+        }
+      } catch (KuduException e) {
+        return checkAndReportError("Got error while row counting", e);
       }
       return true;
     }
 
+    private KuduScanner.KuduScannerBuilder getScannerBuilder() {
+      return localClient.newScannerBuilder(table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+          .snapshotTimestampRaw(sharedWriteTimestamp);
+    }
+
     /**
      * Checks the passed exception contains "Scanner not found". If it does 
then it returns true,
      * else it reports the error and returns false.
@@ -384,12 +411,28 @@ public class ITClient extends BaseKuduTest {
      * @param e the exception to check
      * @return true if the scanner failed because it wasn't false, otherwise 
false
      */
-    private boolean checkAndReportError(String message, Exception e) {
-      if (!e.getCause().getMessage().contains("Scanner not found")) {
+    private boolean checkAndReportError(String message, KuduException e) {
+      // Do nasty things, expect nasty results. The scanners are a bit too 
happy to retry TS
+      // disconnections so we might end up retrying a scanner on a node that 
restarted, or we might
+      // get disconnected just after sending an RPC so when we reconnect to 
the same TS we might get
+      // the "Invalid call sequence ID" message.
+      if (!e.getStatus().isNotFound() &&
+          !e.getStatus().getMessage().contains("Invalid call sequence ID")) {
         reportError(message, e);
         return false;
       }
       return true;
     }
   }
+
+  class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      // Only report an error if we're still running, else we'll spam the log.
+      if (KEEP_RUNNING_LATCH.getCount() != 0) {
+        reportError("Uncaught exception", new Exception(e));
+      }
+    }
+  }
 }
\ No newline at end of file

Reply via email to