This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7bd038358341b4cabf70b4baa41f3dfc68cc6786 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Thu Jan 9 11:45:41 2020 +0100 JAMES-3025 introduce randomlyDistributed operations in ConcurrentTestRunner --- .../util/concurrency/ConcurrentTestRunner.java | 27 ++++++- .../util/concurrency/ConcurrentTestRunnerTest.java | 88 ++++++++++++++++++++++ 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java index ed1e2bd..a47fe94 100644 --- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java +++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java @@ -25,6 +25,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -39,7 +40,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; - import reactor.core.publisher.Mono; public class ConcurrentTestRunner implements Closeable { @@ -53,6 +53,31 @@ public class ConcurrentTestRunner implements Closeable { default RequireThreadCount reactorOperation(ReactorOperation reactorOperation) { return operation(reactorOperation.blocking()); } + + default RequireThreadCount randomlyDistributedOperations(ConcurrentOperation firstOperation, ConcurrentOperation... operations) { + Random random = createReproductibleRandom(); + ConcurrentOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstOperation, operations).execute(threadNumber, step); + return operation(aggregateOperation); + } + + default RequireThreadCount randomlyDistributedReactorOperations(ReactorOperation firstReactorOperation, ReactorOperation... reactorOperations) { + Random random = createReproductibleRandom(); + ReactorOperation aggregateOperation = (threadNumber, step) -> selectRandomOperation(random, firstReactorOperation, reactorOperations).execute(threadNumber, step); + return reactorOperation(aggregateOperation); + } + + default Random createReproductibleRandom() { + return new Random(2134); + } + + default <OperationT> OperationT selectRandomOperation(Random random, OperationT firstReactorOperation, OperationT... reactorOperations) { + int whichAction = random.nextInt(reactorOperations.length + 1); + if (whichAction == 0) { + return firstReactorOperation; + } else { + return reactorOperations[whichAction - 1]; + } + } } @FunctionalInterface diff --git a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java index b40077e..39c1d20 100644 --- a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java @@ -26,12 +26,19 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.Closeable; import java.io.IOException; import java.time.Duration; +import java.util.IntSummaryStatistics; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; + class ConcurrentTestRunnerTest { private static final ConcurrentTestRunner.ConcurrentOperation NOOP = (threadNumber, step) -> { }; private static final Duration DEFAULT_AWAIT_TIME = Duration.ofMillis(100); @@ -231,4 +238,85 @@ class ConcurrentTestRunnerTest { assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1"); } + + @Test + void runRandomlyDistributedOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException { + AtomicBoolean firstOperationRun = new AtomicBoolean(false); + AtomicBoolean secondOperationRun = new AtomicBoolean(false); + AtomicBoolean thirdOperationRun = new AtomicBoolean(false); + ConcurrentTestRunner.builder() + .randomlyDistributedOperations( + (threadNumber, step) -> firstOperationRun.set(true), + (threadNumber, step) -> secondOperationRun.set(true), + (threadNumber, step) -> thirdOperationRun.set(true)) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true); + } + + @Test + void runRandomlyDistributedOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException { + AtomicInteger firstOperationRuns = new AtomicInteger(0); + AtomicInteger secondOperationRuns = new AtomicInteger(0); + AtomicInteger thirdOperationRuns = new AtomicInteger(0); + int threadCount = 10; + int operationCount = 1000; + ConcurrentTestRunner.builder() + .randomlyDistributedOperations( + (threadNumber, step) -> firstOperationRuns.incrementAndGet(), + (threadNumber, step) -> secondOperationRuns.incrementAndGet(), + (threadNumber, step) -> thirdOperationRuns.incrementAndGet()) + .threadCount(threadCount) + .operationCount(operationCount) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics(); + int min = statistics.getMin(); + int max = statistics.getMax(); + + assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100); + } + + @Test + void runRandomlyDistributedReactorOperationsShouldRunAllOperations() throws ExecutionException, InterruptedException { + AtomicBoolean firstOperationRun = new AtomicBoolean(false); + AtomicBoolean secondOperationRun = new AtomicBoolean(false); + AtomicBoolean thirdOperationRun = new AtomicBoolean(false); + ConcurrentTestRunner.builder() + .randomlyDistributedReactorOperations( + (threadNumber, step) -> Mono.fromRunnable(() -> firstOperationRun.set(true)), + (threadNumber, step) -> Mono.fromRunnable(() -> secondOperationRun.set(true)), + (threadNumber, step) -> Mono.fromRunnable(() -> thirdOperationRun.set(true))) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(Stream.of(firstOperationRun, secondOperationRun, thirdOperationRun).map(AtomicBoolean::get)).containsOnly(true); + } + + @Test + void runRandomlyDistributedReactorOperationsShouldRunAllOperationsEvenly() throws ExecutionException, InterruptedException { + AtomicInteger firstOperationRuns = new AtomicInteger(0); + AtomicInteger secondOperationRuns = new AtomicInteger(0); + AtomicInteger thirdOperationRuns = new AtomicInteger(0); + int threadCount = 10; + int operationCount = 1000; + ConcurrentTestRunner.builder() + .randomlyDistributedReactorOperations( + (threadNumber, step) -> Mono.fromRunnable(firstOperationRuns::incrementAndGet), + (threadNumber, step) -> Mono.fromRunnable(secondOperationRuns::incrementAndGet), + (threadNumber, step) -> Mono.fromRunnable(thirdOperationRuns::incrementAndGet)) + .threadCount(threadCount) + .operationCount(operationCount) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + IntSummaryStatistics statistics = IntStream.of(firstOperationRuns.get(), secondOperationRuns.get(), thirdOperationRuns.get()).summaryStatistics(); + int min = statistics.getMin(); + int max = statistics.getMax(); + + assertThat(max - min).isLessThan((threadCount * operationCount) * 5 / 100); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org