This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 01701901c636ccbda91cf3c747f07a17fc197914
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Thu Jan 9 14:29:57 2020 +0100

    JAMES-3028 refactor ReadSaveDumbBlobStoreContract to use 
randomlyDistributedReactorOperations
---
 .../blob/api/ReadSaveDumbBlobStoreContract.java    | 37 +++++++++++-----------
 1 file changed, 18 insertions(+), 19 deletions(-)

diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
index 665a5d3..98b589a 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
@@ -36,13 +36,10 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.Test;
-import org.reactivestreams.Publisher;
 
 import com.google.common.io.ByteSource;
 import reactor.core.publisher.Mono;
@@ -310,7 +307,12 @@ public interface ReadSaveDumbBlobStoreContract {
     default void concurrentSaveBytesShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
         testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
         ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
bytes)))
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, SHORT_BYTEARRAY),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ELEVEN_KILOBYTES),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, TWELVE_MEGABYTES),
+                (threadNumber, step) -> checkConcurrentSaveOperation()
+            )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
@@ -320,7 +322,12 @@ public interface ReadSaveDumbBlobStoreContract {
     default void concurrentSaveInputStreamShouldReturnConsistentValues() 
throws ExecutionException, InterruptedException {
         testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
         ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
new ByteArrayInputStream(bytes))))
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(ELEVEN_KILOBYTES)),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(TWELVE_MEGABYTES)),
+                (threadNumber, step) -> checkConcurrentSaveOperation()
+            )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
@@ -330,25 +337,17 @@ public interface ReadSaveDumbBlobStoreContract {
     default void concurrentSaveByteSourceShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
         testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
         ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(bytes))))
+            .randomlyDistributedReactorOperations(
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)),
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(TWELVE_MEGABYTES)),
+                (threadNumber, step) -> checkConcurrentSaveOperation()
+            )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
     }
 
-    default Publisher<Void> getConcurrentOperation(Function<byte[], 
Mono<Void>> save) {
-        switch (ThreadLocalRandom.current().nextInt(4)) {
-            case 0:
-                return save.apply(SHORT_BYTEARRAY);
-            case 1:
-                return save.apply(ELEVEN_KILOBYTES);
-            case 2:
-                return save.apply(TWELVE_MEGABYTES);
-            default:
-                return checkConcurrentSaveOperation();
-        }
-    }
-
     default Mono<Void> checkConcurrentSaveOperation() {
         return Mono
             .fromCallable(() ->


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to