This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new a432a81999 Add a concurrent splits test to SplitIT (#3415) a432a81999 is described below commit a432a8199923acf0eaaaf56dc893c235537c3729 Author: Dom G <domgargu...@apache.org> AuthorDate: Tue May 30 11:52:49 2023 -0400 Add a concurrent splits test to SplitIT (#3415) * add split IT with concurrent splits * fix potential bug in TestIngest.getSplitPoints --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../java/org/apache/accumulo/test/TestIngest.java | 8 +- .../apache/accumulo/test/functional/SplitIT.java | 99 ++++++++++++++++++++++ 2 files changed, 105 insertions(+), 2 deletions(-) diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java index 99551e411c..101cbbf073 100644 --- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java @@ -193,10 +193,14 @@ public class TestIngest { public static TreeSet<Text> getSplitPoints(long start, long end, long numsplits) { long splitSize = (end - start) / numsplits; - long pos = start + splitSize; - TreeSet<Text> splits = new TreeSet<>(); + if (splitSize < 1) { + return splits; + } + + long pos = start + splitSize; + while (pos < end) { splits.add(new Text(String.format("row_%010d", pos))); pos += splitSize; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java index 3a342b78e8..33a93f40b6 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java @@ -19,17 +19,26 @@ package org.apache.accumulo.test.functional; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.accumulo.test.VerifyIngest.verifyIngest; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeTrue; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; @@ -37,15 +46,18 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.InstanceOperations; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletHostingGoal; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; @@ -61,6 +73,8 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + public class SplitIT extends AccumuloClusterHarness { private static final Logger log = LoggerFactory.getLogger(SplitIT.class); @@ -242,4 +256,89 @@ public class SplitIT extends AccumuloClusterHarness { assertEquals(splits1, new TreeSet<>(c.tableOperations().listSplits(tableName))); } } + + @Test + public void concurrentSplit() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + + final String tableName = getUniqueNames(1)[0]; + + log.debug("Creating table {}", tableName); + c.tableOperations().create(tableName); + + final int numRows = 100_000; + log.debug("Ingesting {} rows into {}", numRows, tableName); + VerifyParams params = new VerifyParams(getClientProps(), tableName, numRows); + TestIngest.ingest(c, params); + + log.debug("Verifying {} rows ingested into {}", numRows, tableName); + VerifyIngest.verifyIngest(c, params); + + log.debug("Creating futures that add random splits to the table"); + ExecutorService es = Executors.newFixedThreadPool(10); + final int totalFutures = 100; + final int splitsPerFuture = 4; + final Set<Text> totalSplits = new HashSet<>(); + List<Callable<Void>> tasks = new ArrayList<>(totalFutures); + for (int i = 0; i < totalFutures; i++) { + final Pair<Integer,Integer> splitBounds = getRandomSplitBounds(numRows); + final TreeSet<Text> splits = TestIngest.getSplitPoints(splitBounds.getFirst().longValue(), + splitBounds.getSecond().longValue(), splitsPerFuture); + totalSplits.addAll(splits); + tasks.add(() -> { + c.tableOperations().addSplits(tableName, splits); + return null; + }); + } + + log.debug("Submitting futures"); + List<Future<Void>> futures = + tasks.parallelStream().map(es::submit).collect(Collectors.toList()); + + log.debug("Waiting for futures to complete"); + for (Future<?> f : futures) { + f.get(); + } + es.shutdown(); + + log.debug("Checking that {} splits were created ", totalSplits.size()); + + assertEquals(totalSplits, new HashSet<>(c.tableOperations().listSplits(tableName)), + "Did not see expected splits"); + + // ELASTICITY_TODO the following could be removed after #3309. Currently scanning an ondemand + // table with lots of tablets will cause the test to timeout. + c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS); + + log.debug("Verifying {} rows ingested into {}", numRows, tableName); + VerifyIngest.verifyIngest(c, params); + } + } + + /** + * Generates a pair of integers that represent the start and end of a range of splits. The start + * and end are randomly generated between 0 and upperBound. The start is guaranteed to be less + * than the end and the two bounds are guaranteed to be different values. + * + * @param upperBound the upper bound of the range of splits + * @return a pair of integers that represent the start and end of a range of splits + */ + private Pair<Integer,Integer> getRandomSplitBounds(int upperBound) { + Preconditions.checkArgument(upperBound > 1, "upperBound must be greater than 1"); + + int start = random.nextInt(upperBound); + int end = random.nextInt(upperBound - 1); + + // ensure start is less than end and that end is not equal to start + if (end >= start) { + end += 1; + } else { + int tmp = start; + start = end; + end = tmp; + } + + return new Pair<>(start, end); + } + }