This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 01701901c636ccbda91cf3c747f07a17fc197914 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Thu Jan 9 14:29:57 2020 +0100 JAMES-3028 refactor ReadSaveDumbBlobStoreContract to use randomlyDistributedReactorOperations --- .../blob/api/ReadSaveDumbBlobStoreContract.java | 37 +++++++++++----------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java index 665a5d3..98b589a 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java @@ -36,13 +36,10 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; import com.google.common.io.ByteSource; import reactor.core.publisher.Mono; @@ -310,7 +307,12 @@ public interface ReadSaveDumbBlobStoreContract { default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException { testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes))) + .randomlyDistributedReactorOperations( + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES), + (threadNumber, step) -> checkConcurrentSaveOperation() + ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); @@ -320,7 +322,12 @@ public interface ReadSaveDumbBlobStoreContract { default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException { testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)))) + .randomlyDistributedReactorOperations( + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(ELEVEN_KILOBYTES)), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(TWELVE_MEGABYTES)), + (threadNumber, step) -> checkConcurrentSaveOperation() + ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); @@ -330,25 +337,17 @@ public interface ReadSaveDumbBlobStoreContract { default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException { testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)))) + .randomlyDistributedReactorOperations( + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)), + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(TWELVE_MEGABYTES)), + (threadNumber, step) -> checkConcurrentSaveOperation() + ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); } - default Publisher<Void> getConcurrentOperation(Function<byte[], Mono<Void>> save) { - switch (ThreadLocalRandom.current().nextInt(4)) { - case 0: - return save.apply(SHORT_BYTEARRAY); - case 1: - return save.apply(ELEVEN_KILOBYTES); - case 2: - return save.apply(TWELVE_MEGABYTES); - default: - return checkConcurrentSaveOperation(); - } - } - default Mono<Void> checkConcurrentSaveOperation() { return Mono .fromCallable(() -> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org