This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 2b8243b  Fix #1308 - Refactor fate concurrency IT (#1414)
2b8243b is described below

commit 2b8243b72eace4e1fcba0c77077981a84401b8ea
Author: EdColeman <d...@etcoleman.com>
AuthorDate: Thu Nov 7 17:30:32 2019 -0500

    Fix #1308 - Refactor fate concurrency IT (#1414)
    
    fix error that caused test failure.
---
 .../test/functional/FateConcurrencyIT.java         | 316 +++++--------------
 .../org/apache/accumulo/test/util/SlowOps.java     | 347 +++++++++++++++++++++
 2 files changed, 419 insertions(+), 244 deletions(-)

diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 863e3c4..c3a4d79 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -16,7 +16,6 @@
  */
 package org.apache.accumulo.test.functional;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -24,9 +23,7 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -35,28 +32,19 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.state.tables.TableState;
-import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.AdminUtil;
 import org.apache.accumulo.fate.ZooStore;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
-import org.apache.hadoop.io.Text;
+import org.apache.accumulo.test.util.SlowOps;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -90,10 +78,9 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
 
   private String secret;
 
-  // Test development only. When true, multiple tables, multiple compactions 
will be
-  // used during the test run which simulates transient condition that was 
causing
-  // the test to fail..
-  private boolean runMultipleCompactions = false;
+  private long maxWait;
+
+  private SlowOps slowOps;
 
   @Before
   public void setup() {
@@ -104,7 +91,9 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
 
     secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
 
-    createData(tableName);
+    maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : 
((defaultTimeoutSeconds() * 1000) / 2);
+
+    slowOps = new SlowOps(connector, tableName, maxWait, 1);
   }
 
   @AfterClass
@@ -161,8 +150,7 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
 
     // launch a full table compaction with the slow iterator to ensure table 
lock is acquired and
     // held by the compaction
-
-    Future<?> compactTask = startCompactTask();
+    slowOps.startCompactTask();
 
     // try to set online while fate transaction is in progress - before 
ACCUMULO-4574 this would
     // block
@@ -178,8 +166,7 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
 
     assertEquals("verify table is still online", TableState.ONLINE, 
getTableState(tableName));
 
-    assertTrue("verify compaction still running and fate transaction still 
exists",
-        blockUntilCompactionRunning(tableName));
+    assertTrue("Find FATE operation for table", findFate(tableName));
 
     // test complete, cancel compaction and move on.
     connector.tableOperations().cancelCompaction(tableName);
@@ -193,8 +180,31 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
         TimeUnit.MILLISECONDS.convert(timing3.runningTime(), 
TimeUnit.NANOSECONDS));
 
     // block if compaction still running
-    compactTask.get();
+    slowOps.blockWhileCompactionRunning();
+
+  }
+
+  private boolean findFate(String aTableName) {
 
+    for (int retry = 0; retry < 5; retry++) {
+
+      try {
+        boolean found = lookupFateInZookeeper(aTableName);
+        log.trace("Try {}: Fate in zk for table {} : {}", retry, aTableName, 
found);
+        if (found) {
+          log.trace("found for {}", aTableName);
+          return true;
+        } else {
+          Thread.sleep(150);
+        }
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        return false;
+      } catch (Exception ex) {
+        log.debug("Find fate failed for table name {} with exception, will 
retry", aTableName, ex);
+      }
+    }
+    return false;
   }
 
   /**
@@ -208,12 +218,6 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
     Instance instance = connector.getInstance();
     String tableId;
 
-    // for development testing - force transient condition that was failing 
this test so that
-    // we know if multiple compactions are running, they are properly handled 
by the test code.
-    if (runMultipleCompactions) {
-      runMultipleCompactions();
-    }
-
     try {
 
       assertEquals("verify table online after created", TableState.ONLINE,
@@ -228,7 +232,7 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
           String.format("Table %s does not exist, failing test", tableName));
     }
 
-    Future<?> compactTask = startCompactTask();
+    slowOps.startCompactTask();
 
     AdminUtil.FateStatus withLocks = null;
     List<AdminUtil.TransactionStatus> noLocks = null;
@@ -300,114 +304,19 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
       connector.tableOperations().cancelCompaction(tableName);
 
       // block if compaction still running
-      compactTask.get();
-
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-    } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException
-        | ExecutionException ex) {
-      log.debug("Could not cancel compaction", ex);
-    }
-  }
-
-  /**
-   * This method was helpful for debugging a condition that was causing 
transient test failures.
-   * This forces a condition that the test should be able to handle. This 
method is not needed
-   * during normal testing, it was kept to aid future test development / 
troubleshooting if other
-   * transient failures occur.
-   */
-  private void runMultipleCompactions() {
-
-    for (int i = 0; i < 4; i++) {
-
-      String aTableName = getUniqueNames(1)[0] + "_" + i;
-
-      createData(aTableName);
-
-      log.debug("Table: {}", aTableName);
-
-      pool.submit(new SlowCompactionRunner(aTableName));
-
-      assertTrue("verify that compaction running and fate transaction exists",
-          blockUntilCompactionRunning(aTableName));
+      boolean cancelled = slowOps.blockWhileCompactionRunning();
+      log.debug("Cancel completed successfully: {}", cancelled);
 
+    } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException ex) {
+      log.debug("Could not cancel compaction due to exception", ex);
     }
   }
 
   /**
-   * Create and run a slow running compaction task. The method will block 
until the compaction has
-   * been started.
-   *
-   * @return a reference to the running compaction task.
-   */
-  private Future<?> startCompactTask() {
-    Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
-    assertTrue("verify that compaction running and fate transaction exists",
-        blockUntilCompactionRunning(tableName));
-    return compactTask;
-  }
-
-  /**
-   * Blocks current thread until compaction is running.
-   *
-   * @return true if compaction and associate fate found.
-   */
-  private boolean blockUntilCompactionRunning(final String tableName) {
-
-    long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : 
((defaultTimeoutSeconds() * 1000) / 2);
-
-    long startWait = System.currentTimeMillis();
-
-    List<String> tservers = connector.instanceOperations().getTabletServers();
-
-    /*
-     * wait for compaction to start on table - The compaction will acquire a 
fate transaction lock
-     * that used to block a subsequent online command while the fate 
transaction lock was held.
-     */
-    while (System.currentTimeMillis() < (startWait + maxWait)) {
-
-      try {
-
-        int runningCompactions = 0;
-
-        for (String tserver : tservers) {
-          runningCompactions += 
connector.instanceOperations().getActiveCompactions(tserver).size();
-          log.trace("tserver {}, running compactions {}", tservers, 
runningCompactions);
-        }
-
-        if (runningCompactions > 0) {
-          // Validate that there is a compaction fate transaction - otherwise 
test is invalid.
-          if (findFate(tableName)) {
-            return true;
-          }
-        }
-
-      } catch (AccumuloSecurityException | AccumuloException ex) {
-        throw new IllegalStateException("failed to get active compactions, 
test fails.", ex);
-      } catch (KeeperException ex) {
-        log.trace("Saw possible transient zookeeper error");
-      }
-
-      try {
-        Thread.sleep(250);
-      } catch (InterruptedException ex) {
-        // reassert interrupt
-        Thread.currentThread().interrupt();
-      }
-    }
-
-    log.debug("Could not find compaction for {} after {} seconds", tableName,
-        TimeUnit.MILLISECONDS.toSeconds(maxWait));
-
-    return false;
-
-  }
-
-  /**
    * Checks fates in zookeeper looking for transaction associated with a 
compaction as a double
    * check that the test will be valid because the running compaction does 
have a fate transaction
    * lock.
-   *
+   * <p>
    * This method throws can throw either IllegalStateException (failed) or a 
Zookeeper exception.
    * Throwing the Zookeeper exception allows for retries if desired to handle 
transient zookeeper
    * issues.
@@ -418,7 +327,7 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
    * @throws KeeperException
    *           if a zookeeper error occurred - allows for retries.
    */
-  private boolean findFate(final String tableName) throws KeeperException {
+  private boolean lookupFateInZookeeper(final String tableName) throws 
KeeperException {
 
     Instance instance = connector.getInstance();
     AdminUtil<String> admin = new AdminUtil<>(false);
@@ -497,61 +406,6 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
   }
 
   /**
-   * Create the provided table and populate with some data using a batch 
writer. The table is
-   * scanned to ensure it was populated as expected.
-   *
-   * @param tableName
-   *          the name of the table
-   */
-  private void createData(final String tableName) {
-
-    try {
-
-      // create table.
-      connector.tableOperations().create(tableName);
-      BatchWriter bw = connector.createBatchWriter(tableName, new 
BatchWriterConfig());
-
-      // populate
-      for (int i = 0; i < NUM_ROWS; i++) {
-        Mutation m = new Mutation(new Text(String.format("%05d", i)));
-        m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"), new 
Value("junk".getBytes(UTF_8)));
-        bw.addMutation(m);
-      }
-      bw.close();
-
-      long startTimestamp = System.nanoTime();
-
-      int count = scanCount(tableName);
-
-      log.trace("Scan time for {} rows {} ms", NUM_ROWS, TimeUnit.MILLISECONDS
-          .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
-
-      if (count != NUM_ROWS) {
-        throw new IllegalStateException(
-            String.format("Number of rows %1$d does not match expected %2$d", 
count, NUM_ROWS));
-      }
-    } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException
-        | TableExistsException ex) {
-      throw new IllegalStateException("Create data failed with exception", ex);
-    }
-  }
-
-  private int scanCount(String tableName) throws TableNotFoundException {
-
-    Scanner scanner = connector.createScanner(tableName, Authorizations.EMPTY);
-    int count = 0;
-    for (Map.Entry<Key,Value> elt : scanner) {
-      String expected = String.format("%05d", count);
-      assert (elt.getKey().getRow().toString().equals(expected));
-      count++;
-    }
-
-    scanner.close();
-
-    return count;
-  }
-
-  /**
    * Provides timing information for online operation.
    */
   private static class OnlineOpTiming {
@@ -614,74 +468,48 @@ public class FateConcurrencyIT extends 
AccumuloClusterHarness {
   }
 
   /**
-   * Instance to create / run a compaction using a slow iterator.
+   * Concurrency testing - ensure that tests are valid id multiple compactions 
are running. for
+   * development testing - force transient condition that was failing this 
test so that we know if
+   * multiple compactions are running, they are properly handled by the test 
code and the tests are
+   * valid.
    */
-  private class SlowCompactionRunner implements Runnable {
-
-    private final String tableName;
+  @Test
+  public void multipleCompactions() {
 
-    /**
-     * Create an instance of this class.
-     *
-     * @param tableName
-     *          the name of the table that will be compacted with the slow 
iterator.
-     */
-    SlowCompactionRunner(final String tableName) {
-      this.tableName = tableName;
-    }
+    int tableCount = 4;
 
-    @Override
-    public void run() {
+    List<SlowOps> tables = new ArrayList<>();
 
-      long startTimestamp = System.nanoTime();
+    for (int i = 0; i < tableCount; i++) {
+      String uniqueName = getUniqueNames(1)[0] + "_" + i;
+      SlowOps gen = new SlowOps(connector, uniqueName, maxWait, tableCount);
+      tables.add(gen);
+      gen.startCompactTask();
+    }
 
-      IteratorSetting slow = new IteratorSetting(30, "slow", 
SlowIterator.class);
-      SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+    int foundCount = 0;
 
-      List<IteratorSetting> compactIterators = new ArrayList<>();
-      compactIterators.add(slow);
+    for (SlowOps t : tables) {
+      log.debug("Look for fate {}", t.getTableName());
+      if (findFate(t.getTableName())) {
+        log.debug("Found fate {}", t.getTableName());
+        foundCount++;
+      }
+    }
 
-      log.trace("Slow iterator {}", slow.toString());
+    assertEquals(tableCount, foundCount);
 
+    for (SlowOps t : tables) {
       try {
-
-        log.trace("Start compaction");
-
-        connector.tableOperations().compact(tableName, new Text("0"), new 
Text("z"),
-            compactIterators, true, true);
-
-        log.trace("Compaction wait is complete");
-
-        log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, 
TimeUnit.MILLISECONDS
-            .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
-
-        // validate that number of rows matches expected.
-
-        startTimestamp = System.nanoTime();
-
-        // validate expected data created and exists in table.
-
-        int count = scanCount(tableName);
-
-        log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, 
TimeUnit.MILLISECONDS
-            .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
-
-        if (count != NUM_ROWS) {
-          throw new IllegalStateException(
-              String.format("After compaction, number of rows %1$d does not 
match expected %2$d",
-                  count, NUM_ROWS));
-        }
-
-      } catch (TableNotFoundException ex) {
-        throw new IllegalStateException("test failed, table " + tableName + " 
does not exist", ex);
-      } catch (AccumuloSecurityException ex) {
-        throw new IllegalStateException(
-            "test failed, could not add iterator due to security exception", 
ex);
-      } catch (AccumuloException ex) {
-        // test cancels compaction on complete, so ignore it as an exception.
-        if (!ex.getMessage().contains("Compaction canceled")) {
-          throw new IllegalStateException("test failed with an Accumulo 
exception", ex);
+        connector.tableOperations().cancelCompaction(t.getTableName());
+        // block if compaction still running
+        boolean cancelled = t.blockWhileCompactionRunning();
+        if (!cancelled) {
+          log.info("Failed to cancel compaction during multiple compaction 
test clean-up for {}",
+              t.getTableName());
         }
+      } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException ex) {
+        log.debug("Exception throw during multiple table test clean-up", ex);
       }
     }
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java 
b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
new file mode 100644
index 0000000..bd51990
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.util;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.ActiveCompaction;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.functional.SlowIterator;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common methods for performing operations that are deliberately take some 
period of time so that
+ * tests can interact while the operations are in progress.
+ */
+public class SlowOps {
+
+  private static final Logger log = LoggerFactory.getLogger(SlowOps.class);
+
+  private static final String TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX =
+      "tserver.compaction.major.concurrent.max";
+
+  private static final long SLOW_SCAN_SLEEP_MS = 250L;
+  private static final int NUM_DATA_ROWS = 1000;
+
+  private final Connector connector;
+  private final String tableName;
+  private final long maxWait;
+
+  // private final int numRows = DEFAULT_NUM_DATA_ROWS;
+
+  private static final ExecutorService pool = Executors.newCachedThreadPool();
+
+  private Future<?> compactTask = null;
+
+  private SlowOps(final Connector connector, final String tableName, final 
long maxWait) {
+
+    this.connector = connector;
+    this.tableName = tableName;
+    this.maxWait = maxWait;
+
+    createData();
+  }
+
+  public SlowOps(final Connector connector, final String tableName, final long 
maxWait,
+      final int numParallelExpected) {
+
+    this(connector, tableName, maxWait);
+
+    setExpectedCompactions(numParallelExpected);
+
+  }
+
+  public void setExpectedCompactions(final int numParallelExpected) {
+
+    final int target = numParallelExpected + 1;
+
+    Map<String,String> sysConfig;
+
+    try {
+
+      sysConfig = connector.instanceOperations().getSystemConfiguration();
+
+      int current = 
Integer.parseInt(sysConfig.get("tserver.compaction.major.concurrent.max"));
+
+      if (current < target) {
+        
connector.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
+            Integer.toString(target));
+
+        sysConfig = connector.instanceOperations().getSystemConfiguration();
+
+      }
+
+      Integer.parseInt(sysConfig.get(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX));
+
+    } catch (AccumuloException | AccumuloSecurityException | 
NumberFormatException ex) {
+      throw new IllegalStateException("Could not set parallel compaction limit 
to " + target, ex);
+    }
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  private void createData() {
+
+    try {
+
+      // create table.
+      connector.tableOperations().create(tableName);
+
+      log.info("Created table id: {}, name \'{}\'",
+          connector.tableOperations().tableIdMap().get(tableName), tableName);
+
+      try (BatchWriter bw = connector.createBatchWriter(tableName, new 
BatchWriterConfig())) {
+        // populate
+        for (int i = 0; i < NUM_DATA_ROWS; i++) {
+          Mutation m = new Mutation(new Text(String.format("%05d", i)));
+          m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"),
+              new Value("junk".getBytes(UTF_8)));
+          bw.addMutation(m);
+        }
+      }
+
+      verifyRows();
+
+    } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException
+        | TableExistsException ex) {
+      throw new IllegalStateException("Create data failed with exception", ex);
+    }
+  }
+
+  private void verifyRows() {
+
+    long startTimestamp = System.nanoTime();
+
+    int count = scanCount();
+
+    log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
+        TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
+
+    if (count != NUM_DATA_ROWS) {
+      throw new IllegalStateException(
+          String.format("Number of rows %1$d does not match expected %2$d", 
count, NUM_DATA_ROWS));
+    }
+  }
+
+  private int scanCount() {
+    try (Scanner scanner = connector.createScanner(tableName, 
Authorizations.EMPTY)) {
+
+      int count = 0;
+
+      for (Map.Entry<Key,Value> elt : scanner) {
+        String expected = String.format("%05d", count);
+        assert (elt.getKey().getRow().toString().equals(expected));
+        count++;
+      }
+      return count;
+    } catch (TableNotFoundException ex) {
+      log.debug("cannot verify row count, table \'{}\' does not exist", 
tableName);
+      throw new IllegalStateException(ex);
+    }
+  }
+
+  /**
+   * Create and run a slow running compaction task. The method will block 
until the compaction has
+   * been started. The compaction should be cancelled using Accumulo tableOps, 
and then the caller
+   * can use blockWhileCompactionRunning() on the instance of this class.
+   */
+  public void startCompactTask() {
+
+    compactTask = pool.submit(new SlowCompactionRunner());
+
+    if (!blockUntilCompactionRunning()) {
+      throw new IllegalStateException("Compaction could not be started for " + 
tableName);
+    }
+  }
+
+  /**
+   * Instance to create / run a compaction using a slow iterator.
+   */
+  private class SlowCompactionRunner implements Runnable {
+
+    SlowCompactionRunner() {}
+
+    @Override
+    public void run() {
+
+      long startTimestamp = System.nanoTime();
+
+      IteratorSetting slow = new IteratorSetting(30, "slow", 
SlowIterator.class);
+      SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+
+      List<IteratorSetting> compactIterators = new ArrayList<>();
+      compactIterators.add(slow);
+
+      log.trace("Starting slow operation using iterator: {}", slow);
+
+      int retry = 0;
+      boolean completed = false;
+
+      while (!completed && retry++ < 5) {
+
+        try {
+          log.info("Starting compaction.  Attempt {}", retry);
+          connector.tableOperations().compact(tableName, null, null, 
compactIterators, true, true);
+          completed = true;
+        } catch (Throwable ex) {
+          // test cancels compaction on complete, so ignore it as an exception.
+          if (ex.getMessage().contains("Compaction canceled")) {
+            return;
+          }
+          log.info("Exception thrown while waiting for compaction - will 
retry", ex);
+          try {
+            Thread.sleep(10_000 * retry);
+          } catch (InterruptedException iex) {
+            Thread.currentThread().interrupt();
+            return;
+          }
+        }
+      }
+      log.debug("Compaction wait is complete");
+
+      log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS, 
TimeUnit.MILLISECONDS
+          .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
+
+      // validate that number of rows matches expected.
+
+      startTimestamp = System.nanoTime();
+
+      // validate expected data created and exists in table.
+
+      int count = scanCount();
+
+      log.trace("After compaction, scan time for {} rows {} ms", NUM_DATA_ROWS,
+          TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp),
+              TimeUnit.NANOSECONDS));
+
+      if (count != NUM_DATA_ROWS) {
+        throw new IllegalStateException(
+            String.format("After compaction, number of rows %1$d does not 
match expected %2$d",
+                count, NUM_DATA_ROWS));
+      }
+    }
+  }
+
+  /**
+   * Blocks current thread until compaction is running.
+   *
+   * @return true if compaction and associate fate found.
+   */
+  private boolean blockUntilCompactionRunning() {
+
+    long startWait = System.currentTimeMillis();
+
+    List<String> tservers = connector.instanceOperations().getTabletServers();
+
+    /*
+     * wait for compaction to start on table - The compaction will acquire a 
fate transaction lock
+     * that used to block a subsequent online command while the fate 
transaction lock was held.
+     */
+    while (System.currentTimeMillis() < (startWait + maxWait)) {
+
+      try {
+
+        List<ActiveCompaction> activeCompactions = new ArrayList<>();
+
+        for (String tserver : tservers) {
+          List<ActiveCompaction> ac = 
connector.instanceOperations().getActiveCompactions(tserver);
+          activeCompactions.addAll(ac);
+          // runningCompactions += ac.size();
+          log.trace("tserver {}, running compactions {}", tservers, ac.size());
+        }
+
+        if (!activeCompactions.isEmpty()) {
+          try {
+            for (ActiveCompaction compaction : activeCompactions) {
+              log.debug("Compaction running for {}", compaction.getTable());
+              if (compaction.getTable().compareTo(tableName) == 0) {
+                return true;
+              }
+            }
+          } catch (TableNotFoundException ex) {
+            log.trace("Compaction found for unknown table {}", 
activeCompactions);
+          }
+        }
+      } catch (AccumuloSecurityException | AccumuloException ex) {
+        throw new IllegalStateException("failed to get active compactions, 
test fails.", ex);
+      }
+
+      try {
+        Thread.sleep(3_000);
+      } catch (InterruptedException ex) {
+        // reassert interrupt
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    log.debug("Could not find compaction for {} after {} seconds", tableName,
+        TimeUnit.MILLISECONDS.toSeconds(maxWait));
+
+    return false;
+
+  }
+
+  /**
+   * Will block as long as the underlying compaction task is running. This 
method is intended to be
+   * used when the the compaction is cancelled via table operation cancel 
method - when the cancel
+   * command completed, the running task will terminate and then this method 
will return.
+   *
+   * @return true if the task returned.
+   */
+  public boolean blockWhileCompactionRunning() {
+
+    try {
+      if (compactTask == null) {
+        throw new IllegalStateException(
+            "Compaction task has not been started - call startCompactionTask() 
before blocking");
+      }
+      compactTask.get();
+      return true;
+    } catch (InterruptedException ex) {
+      Thread.currentThread().interrupt();
+      return false;
+    } catch (ExecutionException ex) {
+      return false;
+    }
+  }
+
+}

Reply via email to