This is an automated email from the ASF dual-hosted git repository. edcoleman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new 18320ad Merge changes from 1.9 for FateConcurrencyIT test improvement (#1106) 18320ad is described below commit 18320ad19fffe6f2ffc8a4cf88cc9196c2129158 Author: EdColeman <d...@etcoleman.com> AuthorDate: Thu Apr 18 11:53:22 2019 -0400 Merge changes from 1.9 for FateConcurrencyIT test improvement (#1106) * Improve FateConcurrencyIT to reduce transient test errors (Issue #1079). (#1082) --- .../test/functional/FateConcurrencyIT.java | 77 +++++++++++++++++++--- 1 file changed, 67 insertions(+), 10 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 3bcaa4e..08d66ae 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -94,6 +94,11 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { private String secret; + // Test development only. When true, multiple tables, multiple compactions will be + // used during the test run which simulates transient condition that was causing + // the test to fail.. + private boolean runMultipleCompactions = false; + @Before public void setup() { accumuloClient = Accumulo.newClient().from(getClientProps()).build(); @@ -212,6 +217,12 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { TableId tableId; + // for development testing - force transient condition that was failing this test so that + // we know if multiple compactions are running, they are properly handled by the test code. + if (runMultipleCompactions) { + runMultipleCompactions(); + } + try { assertEquals("verify table online after created", TableState.ONLINE, @@ -228,8 +239,6 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { Future<?> compactTask = startCompactTask(); - assertTrue("compaction fate transaction exits", findFate(tableName)); - AdminUtil.FateStatus withLocks = null; List<AdminUtil.TransactionStatus> noLocks = null; @@ -312,6 +321,30 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { } /** + * This method was helpful for debugging a condition that was causing transient test failures. + * This forces a condition that the test should be able to handle. This method is not needed + * during normal testing, it was kept to aid future test development / troubleshooting if other + * transient failures occur. + */ + private void runMultipleCompactions() { + + for (int i = 0; i < 4; i++) { + + String aTableName = getUniqueNames(1)[0] + "_" + i; + + createData(aTableName); + + log.debug("Table: {}", aTableName); + + pool.submit(new SlowCompactionRunner(aTableName)); + + assertTrue("verify that compaction running and fate transaction exists", + blockUntilCompactionRunning(aTableName)); + + } + } + + /** * Create and run a slow running compaction task. The method will block until the compaction has * been started. * @@ -331,26 +364,39 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { */ private boolean blockUntilCompactionRunning(final String tableName) { - int runningCompactions = 0; + long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : ((defaultTimeoutSeconds() * 1000) / 2); + + long startWait = System.currentTimeMillis(); List<String> tservers = accumuloClient.instanceOperations().getTabletServers(); /* - * wait for compaction to start - The compaction will acquire a fate transaction lock that used - * to block a subsequent online command while the fate transaction lock was held. + * wait for compaction to start on table - The compaction will acquire a fate transaction lock + * that used to block a subsequent online command while the fate transaction lock was held. */ - while (runningCompactions == 0) { + while (System.currentTimeMillis() < (startWait + maxWait)) { try { + int runningCompactions = 0; + for (String tserver : tservers) { runningCompactions += accumuloClient.instanceOperations().getActiveCompactions(tserver) .size(); log.trace("tserver {}, running compactions {}", tservers, runningCompactions); } + if (runningCompactions > 0) { + // Validate that there is a compaction fate transaction - otherwise test is invalid. + if (findFate(tableName)) { + return true; + } + } + } catch (AccumuloSecurityException | AccumuloException ex) { throw new IllegalStateException("failed to get active compactions, test fails.", ex); + } catch (KeeperException ex) { + log.trace("Saw possible transient zookeeper error"); } try { @@ -361,8 +407,11 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { } } - // Validate that there is a compaction fate transaction - otherwise test is invalid. - return findFate(tableName); + log.debug("Could not find compaction for {} after {} seconds", tableName, + TimeUnit.MILLISECONDS.toSeconds(maxWait)); + + return false; + } /** @@ -370,9 +419,17 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { * check that the test will be valid because the running compaction does have a fate transaction * lock. * + * This method throws can throw either IllegalStateException (failed) or a Zookeeper exception. + * Throwing the Zookeeper exception allows for retries if desired to handle transient zookeeper + * issues. + * + * @param tableName + * a table name * @return true if corresponding fate transaction found, false otherwise + * @throws KeeperException + * if a zookeeper error occurred - allows for retries. */ - private boolean findFate(final String tableName) { + private boolean findFate(final String tableName) throws KeeperException { AdminUtil<String> admin = new AdminUtil<>(false); @@ -400,7 +457,7 @@ public class FateConcurrencyIT extends AccumuloClusterHarness { return true; } - } catch (KeeperException | TableNotFoundException | InterruptedException ex) { + } catch (TableNotFoundException | InterruptedException ex) { throw new IllegalStateException(ex); }