This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 19147b082fa5ff18afb4109255c72eecb5a128a5
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Thu Jan 9 11:04:35 2020 +0100

    JAMES-3028 add Delete and Bucket contract for DumbBlobStore
---
 .../org/apache/james/blob/api/DumbBlobStore.java   |   6 +
 .../blob/api/BucketDumbBlobStoreContract.java      | 173 ++++++++++
 .../blob/api/DeleteDumbBlobStoreContract.java      | 233 +++++++++++++
 .../james/blob/api/DumbBlobStoreContract.java      | 368 +--------------------
 .../james/blob/api/DumbBlobStoreFixture.java       |  37 +++
 ...act.java => ReadSaveDumbBlobStoreContract.java} |  40 ++-
 .../james/blob/memory/MemoryDumbBlobStore.java     |  14 +-
 7 files changed, 486 insertions(+), 385 deletions(-)

diff --git 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
index 97dbc11..b19161a 100644
--- 
a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
+++ 
b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java
@@ -79,8 +79,10 @@ public interface DumbBlobStore {
 
     /**
      * Remove a Blob based on its BucketName and its BlobId.
+     * This operation should be atomic
      *
      * @return a successful Mono if the Blob is deleted or did not exist
+     * (either the blob doesn't exist in the bucket or the bucket itself 
doesn't exist)
      *  otherwise an IOObjectStoreException in its error channel
      */
     Mono<Void> delete(BucketName bucketName, BlobId blobId);
@@ -88,6 +90,10 @@ public interface DumbBlobStore {
     /**
      * Remove a bucket based on its BucketName
      *
+     * Deleting a bucket is not guaranteed to be atomic nor isolated.
+     * Saving or reading blobs concurrently of bucket deletion can lead
+     * to an inconsistent state.
+     *
      * @return a successful Mono if the bucket is deleted or did not exist
      *  otherwise an IOObjectStoreException in its error channel
      */
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
new file mode 100644
index 0000000..cbf8543
--- /dev/null
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -0,0 +1,173 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import static 
org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME;
+import static 
org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_STRING;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.time.Duration;
+
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+
+public interface BucketDumbBlobStoreContract {
+
+    DumbBlobStore testee();
+
+    @Test
+    default void deleteBucketShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.deleteBucket(null).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void deleteBucketShouldDeleteExistingBucketWithItsData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.deleteBucket(TEST_BUCKET_NAME).block();
+
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void deleteBucketShouldBeIdempotent() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.deleteBucket(TEST_BUCKET_NAME).block();
+
+        assertThatCode(() -> store.deleteBucket(TEST_BUCKET_NAME).block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    default void saveBytesShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, 
SHORT_BYTEARRAY).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void saveStringShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, 
SHORT_STRING).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void saveInputStreamShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, new 
ByteArrayInputStream(SHORT_BYTEARRAY)).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void readShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        assertThatThrownBy(() -> store.read(null, TEST_BLOB_ID))
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void readBytesShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        assertThatThrownBy(() -> store.readBytes(null, TEST_BLOB_ID).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void readStreamShouldThrowWhenBucketDoesNotExist() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID))
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void readBytesShouldThrowWhenBucketDoesNotExist() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        assertThatThrownBy(() -> store.readBytes(CUSTOM_BUCKET_NAME, 
TEST_BLOB_ID).block())
+            .isInstanceOf(ObjectNotFoundException.class);
+    }
+
+    @Test
+    default void shouldBeAbleToSaveDataInMultipleBuckets() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, 
SHORT_BYTEARRAY).block();
+
+        byte[] bytesDefault = store.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID).block();
+        byte[] bytesCustom = store.readBytes(CUSTOM_BUCKET_NAME, 
OTHER_TEST_BLOB_ID).block();
+
+        assertThat(bytesDefault).isEqualTo(bytesCustom);
+    }
+
+    @Test
+    default void saveConcurrentlyWithNonPreExistingBucketShouldNotFail() 
throws Exception {
+        DumbBlobStore store = testee();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) ->
+                store.save(
+                    TEST_BUCKET_NAME,
+                    new TestBlobId("id-" + threadNumber + step),
+                    SHORT_STRING + threadNumber + step).block()))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+    }
+
+    @Test
+    default void deleteBucketConcurrentlyShouldNotFail() throws Exception {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) -> 
store.deleteBucket(TEST_BUCKET_NAME).block()))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+    }
+}
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
new file mode 100644
index 0000000..a3c2a25
--- /dev/null
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java
@@ -0,0 +1,233 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.api;
+
+import static 
org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES;
+import static 
org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES;
+import static 
org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES_STRING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
+
+import reactor.core.publisher.Mono;
+
+public interface DeleteDumbBlobStoreContract  {
+
+    DumbBlobStore testee();
+
+    @Test
+    default void deleteShouldNotThrowWhenBlobDoesNotExist() {
+        DumbBlobStore store = testee();
+
+        assertThatCode(() -> store.delete(TEST_BUCKET_NAME, 
TEST_BLOB_ID).block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    default void deleteShouldNotThrowWhenBucketDoesNotExist() {
+        DumbBlobStore store = testee();
+
+        assertThatCode(() -> 
store.delete(BucketName.of("not_existing_bucket_name"), TEST_BLOB_ID).block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    default void deleteShouldDeleteExistingBlobData() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID,  SHORT_BYTEARRAY).block();
+        store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+            .isInstanceOf(ObjectStoreException.class);
+    }
+
+    @Test
+    default void deleteShouldBeIdempotent() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        assertThatCode(() -> store.delete(TEST_BUCKET_NAME, 
TEST_BLOB_ID).block())
+            .doesNotThrowAnyException();
+    }
+
+    @Test
+    default void deleteShouldNotDeleteOtherBlobs() {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, 
ELEVEN_KILOBYTES).block();
+
+        store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        InputStream read = store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(ELEVEN_KILOBYTES));
+    }
+
+    @Test
+    default void deleteConcurrentlyShouldNotFail() throws Exception {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) -> store.delete(TEST_BUCKET_NAME, 
TEST_BLOB_ID).block()))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+    }
+
+    @Test
+    default void deleteShouldThrowWhenNullBucketName() {
+        DumbBlobStore store = testee();
+        assertThatThrownBy(() -> store.delete(null, TEST_BLOB_ID).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    default void deleteShouldNotDeleteFromOtherBucket() {
+        DumbBlobStore store = testee();
+
+        store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, "custom").block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        store.delete(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID).block();
+
+        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(SHORT_BYTEARRAY));
+    }
+
+    @Test
+    default void deleteShouldNotDeleteFromOtherBucketWhenSameBlobId() {
+        DumbBlobStore store = testee();
+
+        store.save(CUSTOM_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
+
+        store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+
+        InputStream read = store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID);
+
+        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(SHORT_BYTEARRAY));
+    }
+
+    @Test
+    default void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() 
throws Exception {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) -> {
+                try {
+                    InputStream read = store.read(TEST_BUCKET_NAME, 
TEST_BLOB_ID);
+
+                    String string = IOUtils.toString(read, 
StandardCharsets.UTF_8);
+                    if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+                        throw new RuntimeException("Should not read partial 
blob when an other thread is deleting it. Size : " + string.length());
+                    }
+                } catch (ObjectStoreException exception) {
+                    // normal behavior here
+                }
+
+                store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+            }))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(3));
+    }
+
+    @Test
+    default void 
readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws 
Exception {
+        DumbBlobStore store = testee();
+
+        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
+
+        ConcurrentTestRunner.builder()
+            .operation(((threadNumber, step) -> {
+                try {
+                    byte[] read = store.readBytes(TEST_BUCKET_NAME, 
TEST_BLOB_ID).block();
+                    String string = IOUtils.toString(read, 
StandardCharsets.UTF_8.displayName());
+                    if (!string.equals(TWELVE_MEGABYTES_STRING)) {
+                        throw new RuntimeException("Should not read partial 
blob when an other thread is deleting it. Size : " + string.length());
+                    }
+                } catch (ObjectStoreException exception) {
+                    // normal behavior here
+                }
+
+                store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
+            }))
+            .threadCount(10)
+            .operationCount(10)
+            .runSuccessfullyWithin(Duration.ofMinutes(3));
+    }
+
+    @Test
+    default void mixingSaveReadAndDeleteShouldReturnConsistentState() throws 
ExecutionException, InterruptedException {
+        ConcurrentTestRunner.builder()
+            .reactorOperation((thread, iteration) -> 
getConcurrentMixedOperation())
+            .threadCount(10)
+            .operationCount(100)
+            .runSuccessfullyWithin(Duration.ofMinutes(2));
+    }
+
+    default Publisher<Void> getConcurrentMixedOperation() {
+        switch (ThreadLocalRandom.current().nextInt(3)) {
+            case 0:
+                return testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
TWELVE_MEGABYTES);
+            case 1:
+                return testee().delete(TEST_BUCKET_NAME, TEST_BLOB_ID);
+            default:
+                return checkConcurrentMixedOperation();
+        }
+    }
+
+    default Mono<Void> checkConcurrentMixedOperation() {
+        return Mono
+            .fromCallable(() ->
+                testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID))
+            .doOnNext(inputStream -> 
assertThat(inputStream).hasSameContentAs(new 
ByteArrayInputStream(TWELVE_MEGABYTES)))
+            .doOnError(throwable -> 
assertThat(throwable).isInstanceOf(ObjectNotFoundException.class))
+            .onErrorResume(throwable -> Mono.empty())
+            .then();
+    }
+}
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
index 844ac69..9dc1575 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
@@ -16,373 +16,7 @@
  * specific language governing permissions and limitations      *
  * under the License.                                           *
  ****************************************************************/
-
 package org.apache.james.blob.api;
 
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-
-import java.io.ByteArrayInputStream;
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.james.util.concurrency.ConcurrentTestRunner;
-import org.junit.jupiter.api.Test;
-import org.reactivestreams.Publisher;
-
-import com.google.common.base.Strings;
-import com.google.common.io.ByteSource;
-import reactor.core.publisher.Mono;
-
-public interface DumbBlobStoreContract {
-
-    BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket");
-    BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id");
-    String SHORT_STRING = "toto";
-    byte[] EMPTY_BYTEARRAY = {};
-    byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
-    byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 
1000).getBytes(StandardCharsets.UTF_8);
-    byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 
1024).getBytes(StandardCharsets.UTF_8);
-
-    DumbBlobStore testee();
-
-    @Test
-    default void saveShouldThrowWhenNullData() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
(byte[]) null).block())
-            .isInstanceOf(NullPointerException.class);
-    }
-
-    @Test
-    default void saveShouldThrowWhenNullString() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
(String) null).block())
-            .isInstanceOf(NullPointerException.class);
-    }
-
-
-    @Test
-    default void saveShouldThrowWhenNullInputStream() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
(InputStream) null).block())
-            .isInstanceOf(NullPointerException.class);
-    }
-
-    @Test
-    default void saveShouldSaveEmptyData() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block();
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEmpty();
-    }
-
-    @Test
-    default void saveShouldSaveEmptyString() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty();
-    }
-
-    @Test
-    default void saveShouldSaveEmptyInputStream() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new 
ByteArrayInputStream(EMPTY_BYTEARRAY)).block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEmpty();
-    }
-
-    @Test
-    default void saveShouldSaveEmptyByteSource() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEmpty();
-    }
-
-    @Test
-    default void readBytesShouldThrowWhenNotExisting() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new 
TestBlobId("unknown")).block())
-            .isExactlyInstanceOf(ObjectNotFoundException.class);
-    }
-
-    @Test
-    default void readBytesShouldReturnSavedData() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEqualTo(SHORT_BYTEARRAY);
-    }
-
-    @Test
-    default void readBytesShouldReturnLongSavedData() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES);
-    }
-
-    @Test
-    default void readBytesShouldReturnBigSavedData() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
-
-        byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(bytes).isEqualTo(TWELVE_MEGABYTES);
-    }
-
-    @Test
-    default void readShouldThrowWhenNotExistingStream() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new 
TestBlobId("unknown")))
-            .isInstanceOf(ObjectNotFoundException.class);
-    }
-
-    @Test
-    default void saveShouldCreateBucket() {
-        DumbBlobStore store = testee();
-        BucketName nonExisting = BucketName.of("non-existing-bucket");
-        store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-
-        //read for a non-existing bucket would throw
-        assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID))
-            .doesNotThrowAnyException();
-    }
-
-    @Test
-    default void readShouldReturnSavedData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-
-        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
-
-        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(SHORT_BYTEARRAY));
-    }
-
-    @Test
-    default void readShouldReturnLongSavedData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
-
-        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
-
-        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(ELEVEN_KILOBYTES));
-    }
-
-    @Test
-    default void readShouldReturnBigSavedData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block();
-
-        InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID);
-
-        assertThat(read).hasSameContentAs(new 
ByteArrayInputStream(TWELVE_MEGABYTES));
-    }
-
-    @Test
-    default void saveBytesShouldOverwritePreviousData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-
-        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
-    }
-
-    @Test
-    default void saveByteSourceShouldOverwritePreviousData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(SHORT_BYTEARRAY)).block();
-
-        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
-    }
-
-
-    @Test
-    default void saveInputStreamShouldOverwritePreviousData() {
-        DumbBlobStore store = testee();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new 
ByteArrayInputStream(SHORT_BYTEARRAY)).block();
-
-        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(read).isEqualTo(SHORT_BYTEARRAY);
-    }
-
-    @Test
-    default void 
saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())
-            .onErrorResume(throwable -> Mono.empty())
-            .block();
-
-        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
-    }
-
-    @Test
-    default void 
saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() {
-        DumbBlobStore store = testee();
-
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(ELEVEN_KILOBYTES)).block();
-        store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() {
-            @Override
-            public InputStream openStream() throws IOException {
-                return getThrowingInputStream();
-            }
-        })
-            .onErrorResume(throwable -> Mono.empty())
-            .block();
-
-        byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block();
-
-        assertThat(read).isEqualTo(ELEVEN_KILOBYTES);
-    }
-
-    @Test
-    default void saveByteSourceShouldThrowOnIOException() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
new ByteSource() {
-                @Override
-                public InputStream openStream() throws IOException {
-                    return getThrowingInputStream();
-                }
-            })
-            .block())
-        .isInstanceOf(IOObjectStoreException.class);
-    }
-
-    @Test
-    default void saveInputStreamShouldThrowOnIOException() {
-        DumbBlobStore store = testee();
-
-        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
getThrowingInputStream())
-            .block())
-            .isInstanceOf(IOObjectStoreException.class);
-    }
-
-    @Test
-    default void concurrentSaveBytesShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-        ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
bytes)))
-            .threadCount(10)
-            .operationCount(100)
-            .runSuccessfullyWithin(Duration.ofMinutes(2));
-    }
-
-    @Test
-    default void concurrentSaveInputStreamShouldReturnConsistentValues() 
throws ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-        ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
new ByteArrayInputStream(bytes))))
-            .threadCount(10)
-            .operationCount(100)
-            .runSuccessfullyWithin(Duration.ofMinutes(2));
-    }
-
-    @Test
-    default void concurrentSaveByteSourceShouldReturnConsistentValues() throws 
ExecutionException, InterruptedException {
-        testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block();
-        ConcurrentTestRunner.builder()
-            .reactorOperation((thread, iteration) -> 
getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ByteSource.wrap(bytes))))
-            .threadCount(10)
-            .operationCount(100)
-            .runSuccessfullyWithin(Duration.ofMinutes(2));
-    }
-
-    default Publisher<Void> getConcurrentOperation(Function<byte[], 
Mono<Void>> save) {
-        switch (ThreadLocalRandom.current().nextInt(4)) {
-            case 0:
-                return save.apply(SHORT_BYTEARRAY);
-            case 1:
-                return save.apply(ELEVEN_KILOBYTES);
-            case 2:
-                return save.apply(TWELVE_MEGABYTES);
-            default:
-                return checkConcurrentSaveOperation();
-        }
-    }
-
-    default Mono<Void> checkConcurrentSaveOperation() {
-        return Mono
-            .fromCallable(() ->
-                testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID))
-            .flatMap(inputstream -> Mono.fromCallable(() -> 
IOUtils.toByteArray(inputstream)))
-            .doOnNext(inputStream -> assertThat(inputStream).isIn(
-                SHORT_BYTEARRAY,
-                ELEVEN_KILOBYTES,
-                TWELVE_MEGABYTES
-            ))
-            .then();
-    }
-
-    default FilterInputStream getThrowingInputStream() {
-        return new FilterInputStream(new 
ByteArrayInputStream(TWELVE_MEGABYTES)) {
-            int failingThreshold = 5;
-            int alreadyRead = 0;
-
-            @Override
-            public int read() throws IOException {
-                if (alreadyRead < failingThreshold) {
-                    alreadyRead++;
-                    return super.read();
-                } else {
-                    throw new IOException("error on read");
-                }
-            }
-
-            @Override
-            public int read(byte[] b, int off, int len) throws IOException {
-                int value = read();
-                if (value != -1) {
-                    b[off] = (byte) value;
-                }
-                return value;
-            }
-
-        };
-    }
-
+public interface DumbBlobStoreContract extends ReadSaveDumbBlobStoreContract, 
DeleteDumbBlobStoreContract, BucketDumbBlobStoreContract {
 }
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java
new file mode 100644
index 0000000..ec9e91f
--- /dev/null
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.blob.api;
+
+import java.nio.charset.StandardCharsets;
+
+import com.google.common.base.Strings;
+
+public interface DumbBlobStoreFixture {
+    BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket");
+    BucketName CUSTOM_BUCKET_NAME = BucketName.of("custom");
+    BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id");
+    BlobId OTHER_TEST_BLOB_ID = new TestBlobId("other-test-blob-id");
+    String SHORT_STRING = "toto";
+    byte[] EMPTY_BYTEARRAY = {};
+    byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
+    byte[] ELEVEN_KILOBYTES = Strings.repeat("2103456789\n", 
1000).getBytes(StandardCharsets.UTF_8);
+    String TWELVE_MEGABYTES_STRING = Strings.repeat("7893456789\r\n", 1024 * 
1024);
+    byte[] TWELVE_MEGABYTES = 
TWELVE_MEGABYTES_STRING.getBytes(StandardCharsets.UTF_8);
+
+}
diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
similarity index 91%
copy from 
server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
copy to 
server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
index 844ac69..665a5d3 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java
@@ -19,6 +19,12 @@
 
 package org.apache.james.blob.api;
 
+import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.EMPTY_BYTEARRAY;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -38,19 +44,10 @@ import 
org.apache.james.util.concurrency.ConcurrentTestRunner;
 import org.junit.jupiter.api.Test;
 import org.reactivestreams.Publisher;
 
-import com.google.common.base.Strings;
 import com.google.common.io.ByteSource;
 import reactor.core.publisher.Mono;
 
-public interface DumbBlobStoreContract {
-
-    BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket");
-    BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id");
-    String SHORT_STRING = "toto";
-    byte[] EMPTY_BYTEARRAY = {};
-    byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8);
-    byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 
1000).getBytes(StandardCharsets.UTF_8);
-    byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 
1024).getBytes(StandardCharsets.UTF_8);
+public interface ReadSaveDumbBlobStoreContract {
 
     DumbBlobStore testee();
 
@@ -70,7 +67,6 @@ public interface DumbBlobStoreContract {
             .isInstanceOf(NullPointerException.class);
     }
 
-
     @Test
     default void saveShouldThrowWhenNullInputStream() {
         DumbBlobStore store = testee();
@@ -80,6 +76,14 @@ public interface DumbBlobStoreContract {
     }
 
     @Test
+    default void saveShouldThrowWhenNullByteSource() {
+        DumbBlobStore store = testee();
+
+        assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
(ByteSource) null).block())
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
     default void saveShouldSaveEmptyData() {
         DumbBlobStore store = testee();
 
@@ -164,7 +168,7 @@ public interface DumbBlobStoreContract {
     }
 
     @Test
-    default void readShouldThrowWhenNotExistingStream() {
+    default void readStreamShouldThrowWhenNotExisting() {
         DumbBlobStore store = testee();
 
         assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new 
TestBlobId("unknown")))
@@ -375,11 +379,15 @@ public interface DumbBlobStoreContract {
 
             @Override
             public int read(byte[] b, int off, int len) throws IOException {
-                int value = read();
-                if (value != -1) {
-                    b[off] = (byte) value;
+                int remaining = b.length - off;
+                int toRead = Math.min(remaining, len);
+                for (int i = 0; i < toRead; i ++) {
+                    int value = read();
+                    if (value != -1) {
+                        b[off] = (byte) value;
+                    }
                 }
-                return value;
+                return toRead;
             }
 
         };
diff --git 
a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
 
b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
index 48e0d78..16a0a7b 100644
--- 
a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
+++ 
b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java
@@ -30,6 +30,7 @@ import org.apache.james.blob.api.DumbBlobStore;
 import org.apache.james.blob.api.IOObjectStoreException;
 import org.apache.james.blob.api.ObjectNotFoundException;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import com.google.common.io.ByteSource;
@@ -92,11 +93,20 @@ public class MemoryDumbBlobStore implements DumbBlobStore {
 
     @Override
     public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
-        return null;
+        Preconditions.checkNotNull(bucketName);
+        return Mono.fromRunnable(() -> {
+            synchronized (blobs) {
+                blobs.remove(bucketName, blobId);
+            }
+        });
     }
 
     @Override
     public Mono<Void> deleteBucket(BucketName bucketName) {
-        return null;
+        return Mono.fromRunnable(() -> {
+            synchronized (blobs) {
+                blobs.row(bucketName).clear();
+            }
+        });
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to