This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 28808b8c73505a120b1f6302e465c8d5685c9142
Author: Rémi KOWALSKI <rkowal...@linagora.com>
AuthorDate: Thu Jan 9 17:45:02 2020 +0100

    JAMES-3028 Relax contract for dumb blob api: blobid is considered as unique
---
 .../org/apache/james/blob/api/DumbBlobStore.java   |  1 +
 .../blob/api/DeleteDumbBlobStoreContract.java      |  2 +-
 .../blob/api/ReadSaveDumbBlobStoreContract.java    | 91 +++++++++++-----------
 3 files changed, 49 insertions(+), 45 deletions(-)

diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
index b19161a..79305d1 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
@@ -51,6 +51,7 @@ public interface DumbBlobStore {
      * Save the blob with the provided blob id, and overwrite the previous 
blob with the same id if it already exists
      * The bucket is created if it not already exists.
      * This operation should be atomic and isolated
+     * Two blobs having the same blobId must have the same content
      * @return an empty Mono when the save succeed,
      *  otherwise an IOObjectStoreException in its error channel
      */
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
index e7d4a40..afd8f0e 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
@@ -187,7 +187,7 @@ public interface DeleteDumbBlobStoreContract  {
                     if (!string.equals(TWELVE_MEGABYTES_STRING)) {
                         throw new RuntimeException("Should not read partial 
blob when an other thread is deleting it. Size : " + string.length());
                     }
-                } catch (ObjectStoreException exception) {
+                } catch (ObjectNotFoundException exception) {
                     // normal behavior here
                 }
 
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
index 98b589a..c6ee9ab 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
@@ -36,10 +36,14 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Stream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import com.google.common.io.ByteSource;
 import reactor.core.publisher.Mono;
@@ -213,38 +217,40 @@ public interface ReadSaveDumbBlobStoreContract {
         assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(TWELVE_MEGABYTES));
     }
 
-    @Test
-    default void saveBytesShouldOverwritePreviousData() {
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void saveBytesShouldBeIdempotent(byte[] bytes) {
         DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
 
         byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
 
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+        assertThat(read).isEqualTo(bytes);
     }
 
-    @Test
-    default void saveByteSourceShouldOverwritePreviousData() {
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void saveByteSourceShouldBeIdempotent(byte[] bytes) {
         DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(SHORT_BYTEARRAY)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(bytes)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(bytes)).block();
 
         byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
 
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+        assertThat(read).isEqualTo(bytes);
     }
 
-
-    @Test
-    default void saveInputStreamShouldOverwritePreviousData() {
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void saveInputStreamShouldBeIdempotent(byte[] bytes) {
         DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new 
ByteArrayInputStream(SHORT_BYTEARRAY)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(bytes)).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new 
ByteArrayInputStream(bytes)).block();
 
         byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
 
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
+        assertThat(read).isEqualTo(bytes);
     }
 
     @Test
@@ -303,61 +309,58 @@ public interface ReadSaveDumbBlobStoreContract {
             .isInstanceOf(IOObjectStoreException.class);
     }
 
-    @Test
-    default void concurrentSaveBytesShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+    static Stream<Arguments> blobs() {
+        return Stream.of(SHORT_BYTEARRAY, ELEVEN_KILOBYTES, TWELVE_MEGABYTES)
+            .map(Arguments::of);
+    }
+
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void concurrentSaveBytesShouldReturnConsistentValues(byte[] bytes) 
throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
         ConcurrentTestRunner.builder()
             .randomlyDistributedReactorOperations(
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, SHORT_BYTEARRAY),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ELEVEN_KILOBYTES),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, TWELVE_MEGABYTES),
-                (threadNumber, step) -> checkConcurrentSaveOperation()
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, bytes),
+                (threadNumber, step) -> checkConcurrentSaveOperation(bytes)
             )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
     }
 
-    @Test
-    default void concurrentSaveInputStreamShouldReturnConsistentValues() 
throws ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void concurrentSaveInputStreamShouldReturnConsistentValues(byte[] 
bytes) throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
         ConcurrentTestRunner.builder()
             .randomlyDistributedReactorOperations(
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(ELEVEN_KILOBYTES)),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(TWELVE_MEGABYTES)),
-                (threadNumber, step) -> checkConcurrentSaveOperation()
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, new ByteArrayInputStream(bytes)),
+                (threadNumber, step) -> checkConcurrentSaveOperation(bytes)
             )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
     }
 
-    @Test
-    default void concurrentSaveByteSourceShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+    @ParameterizedTest
+    @MethodSource("blobs")
+    default void concurrentSaveByteSourceShouldReturnConsistentValues(byte[] 
bytes) throws ExecutionException, InterruptedException {
+        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block();
         ConcurrentTestRunner.builder()
             .randomlyDistributedReactorOperations(
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)),
-                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(TWELVE_MEGABYTES)),
-                (threadNumber, step) -> checkConcurrentSaveOperation()
+                (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, 
TEST_BLOB_ID, ByteSource.wrap(bytes)),
+                (threadNumber, step) -> checkConcurrentSaveOperation(bytes)
             )
             .threadCount(10)
             .operationCount(100)
             .runSuccessfullyWithin(Duration.ofMinutes(2));
     }
 
-    default Mono<Void> checkConcurrentSaveOperation() {
+    default Mono<Void> checkConcurrentSaveOperation(byte[] bytes) {
         return Mono
             .fromCallable(() ->
                 testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID))
-            .flatMap(inputstream -> Mono.fromCallable(() -> 
IOUtils.toByteArray(inputstream)))
-            .doOnNext(inputStream -> assertThat(inputStream).isIn(
-                SHORT_BYTEARRAY,
-                ELEVEN_KILOBYTES,
-                TWELVE_MEGABYTES
-            ))
+            .doOnNext(inputStream -> 
assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(bytes)))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to