This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 19147b082fa5ff18afb4109255c72eecb5a128a5 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Thu Jan 9 11:04:35 2020 +0100 JAMES-3028 add Delete and Bucket contract for DumbBlobStore --- .../org/apache/james/blob/api/DumbBlobStore.java | 6 + .../blob/api/BucketDumbBlobStoreContract.java | 173 ++++++++++ .../blob/api/DeleteDumbBlobStoreContract.java | 233 +++++++++++++ .../james/blob/api/DumbBlobStoreContract.java | 368 +-------------------- .../james/blob/api/DumbBlobStoreFixture.java | 37 +++ ...act.java => ReadSaveDumbBlobStoreContract.java} | 40 ++- .../james/blob/memory/MemoryDumbBlobStore.java | 14 +- 7 files changed, 486 insertions(+), 385 deletions(-) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java index 97dbc11..b19161a 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java @@ -79,8 +79,10 @@ public interface DumbBlobStore { /** * Remove a Blob based on its BucketName and its BlobId. + * This operation should be atomic * * @return a successful Mono if the Blob is deleted or did not exist + * (either the blob doesn't exist in the bucket or the bucket itself doesn't exist) * otherwise an IOObjectStoreException in its error channel */ Mono<Void> delete(BucketName bucketName, BlobId blobId); @@ -88,6 +90,10 @@ public interface DumbBlobStore { /** * Remove a bucket based on its BucketName * + * Deleting a bucket is not guaranteed to be atomic nor isolated. + * Saving or reading blobs concurrently of bucket deletion can lead + * to an inconsistent state. + * * @return a successful Mono if the bucket is deleted or did not exist * otherwise an IOObjectStoreException in its error channel */ diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java new file mode 100644 index 0000000..cbf8543 --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java @@ -0,0 +1,173 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import static org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME; +import static org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID; +import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY; +import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_STRING; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.ByteArrayInputStream; +import java.time.Duration; + +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.Test; + +public interface BucketDumbBlobStoreContract { + + DumbBlobStore testee(); + + @Test + default void deleteBucketShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.deleteBucket(null).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void deleteBucketShouldDeleteExistingBucketWithItsData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.deleteBucket(TEST_BUCKET_NAME).block(); + + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID)) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void deleteBucketShouldBeIdempotent() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.deleteBucket(TEST_BUCKET_NAME).block(); + + assertThatCode(() -> store.deleteBucket(TEST_BUCKET_NAME).block()) + .doesNotThrowAnyException(); + } + + @Test + default void saveBytesShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, SHORT_BYTEARRAY).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void saveStringShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, SHORT_STRING).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void saveInputStreamShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(null, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void readShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + assertThatThrownBy(() -> store.read(null, TEST_BLOB_ID)) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void readBytesShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + assertThatThrownBy(() -> store.readBytes(null, TEST_BLOB_ID).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void readStreamShouldThrowWhenBucketDoesNotExist() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void readBytesShouldThrowWhenBucketDoesNotExist() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + assertThatThrownBy(() -> store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).block()) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void shouldBeAbleToSaveDataInMultipleBuckets() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + byte[] bytesDefault = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + byte[] bytesCustom = store.readBytes(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID).block(); + + assertThat(bytesDefault).isEqualTo(bytesCustom); + } + + @Test + default void saveConcurrentlyWithNonPreExistingBucketShouldNotFail() throws Exception { + DumbBlobStore store = testee(); + + ConcurrentTestRunner.builder() + .operation(((threadNumber, step) -> + store.save( + TEST_BUCKET_NAME, + new TestBlobId("id-" + threadNumber + step), + SHORT_STRING + threadNumber + step).block())) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + } + + @Test + default void deleteBucketConcurrentlyShouldNotFail() throws Exception { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + ConcurrentTestRunner.builder() + .operation(((threadNumber, step) -> store.deleteBucket(TEST_BUCKET_NAME).block())) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + } +} diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java new file mode 100644 index 0000000..a3c2a25 --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java @@ -0,0 +1,233 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import static org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME; +import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES; +import static org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID; +import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES_STRING; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; + +import org.apache.commons.io.IOUtils; +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; + +import reactor.core.publisher.Mono; + +public interface DeleteDumbBlobStoreContract { + + DumbBlobStore testee(); + + @Test + default void deleteShouldNotThrowWhenBlobDoesNotExist() { + DumbBlobStore store = testee(); + + assertThatCode(() -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block()) + .doesNotThrowAnyException(); + } + + @Test + default void deleteShouldNotThrowWhenBucketDoesNotExist() { + DumbBlobStore store = testee(); + + assertThatCode(() -> store.delete(BucketName.of("not_existing_bucket_name"), TEST_BLOB_ID).block()) + .doesNotThrowAnyException(); + } + + @Test + default void deleteShouldDeleteExistingBlobData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID)) + .isInstanceOf(ObjectStoreException.class); + } + + @Test + default void deleteShouldBeIdempotent() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThatCode(() -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block()) + .doesNotThrowAnyException(); + } + + @Test + default void deleteShouldNotDeleteOtherBlobs() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.save(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); + + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + InputStream read = store.read(TEST_BUCKET_NAME, OTHER_TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + } + + @Test + default void deleteConcurrentlyShouldNotFail() throws Exception { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); + + ConcurrentTestRunner.builder() + .operation(((threadNumber, step) -> store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block())) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + } + + @Test + default void deleteShouldThrowWhenNullBucketName() { + DumbBlobStore store = testee(); + assertThatThrownBy(() -> store.delete(null, TEST_BLOB_ID).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void deleteShouldNotDeleteFromOtherBucket() { + DumbBlobStore store = testee(); + + store.save(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID, "custom").block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + store.delete(CUSTOM_BUCKET_NAME, OTHER_TEST_BLOB_ID).block(); + + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + } + + @Test + default void deleteShouldNotDeleteFromOtherBucketWhenSameBlobId() { + DumbBlobStore store = testee(); + + store.save(CUSTOM_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + InputStream read = store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + } + + @Test + default void readShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); + + ConcurrentTestRunner.builder() + .operation(((threadNumber, step) -> { + try { + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + + String string = IOUtils.toString(read, StandardCharsets.UTF_8); + if (!string.equals(TWELVE_MEGABYTES_STRING)) { + throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length()); + } + } catch (ObjectStoreException exception) { + // normal behavior here + } + + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + })) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(3)); + } + + @Test + default void readBytesShouldNotReadPartiallyWhenDeletingConcurrentlyBigBlob() throws Exception { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); + + ConcurrentTestRunner.builder() + .operation(((threadNumber, step) -> { + try { + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + String string = IOUtils.toString(read, StandardCharsets.UTF_8.displayName()); + if (!string.equals(TWELVE_MEGABYTES_STRING)) { + throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length()); + } + } catch (ObjectStoreException exception) { + // normal behavior here + } + + store.delete(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + })) + .threadCount(10) + .operationCount(10) + .runSuccessfullyWithin(Duration.ofMinutes(3)); + } + + @Test + default void mixingSaveReadAndDeleteShouldReturnConsistentState() throws ExecutionException, InterruptedException { + ConcurrentTestRunner.builder() + .reactorOperation((thread, iteration) -> getConcurrentMixedOperation()) + .threadCount(10) + .operationCount(100) + .runSuccessfullyWithin(Duration.ofMinutes(2)); + } + + default Publisher<Void> getConcurrentMixedOperation() { + switch (ThreadLocalRandom.current().nextInt(3)) { + case 0: + return testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES); + case 1: + return testee().delete(TEST_BUCKET_NAME, TEST_BLOB_ID); + default: + return checkConcurrentMixedOperation(); + } + } + + default Mono<Void> checkConcurrentMixedOperation() { + return Mono + .fromCallable(() -> + testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID)) + .doOnNext(inputStream -> assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES))) + .doOnError(throwable -> assertThat(throwable).isInstanceOf(ObjectNotFoundException.class)) + .onErrorResume(throwable -> Mono.empty()) + .then(); + } +} diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java index 844ac69..9dc1575 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java @@ -16,373 +16,7 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ - package org.apache.james.blob.api; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -import java.io.ByteArrayInputStream; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; - -import org.apache.commons.io.IOUtils; -import org.apache.james.util.concurrency.ConcurrentTestRunner; -import org.junit.jupiter.api.Test; -import org.reactivestreams.Publisher; - -import com.google.common.base.Strings; -import com.google.common.io.ByteSource; -import reactor.core.publisher.Mono; - -public interface DumbBlobStoreContract { - - BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket"); - BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id"); - String SHORT_STRING = "toto"; - byte[] EMPTY_BYTEARRAY = {}; - byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8); - byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8); - byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8); - - DumbBlobStore testee(); - - @Test - default void saveShouldThrowWhenNullData() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null).block()) - .isInstanceOf(NullPointerException.class); - } - - @Test - default void saveShouldThrowWhenNullString() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null).block()) - .isInstanceOf(NullPointerException.class); - } - - - @Test - default void saveShouldThrowWhenNullInputStream() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null).block()) - .isInstanceOf(NullPointerException.class); - } - - @Test - default void saveShouldSaveEmptyData() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block(); - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEmpty(); - } - - @Test - default void saveShouldSaveEmptyString() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); - } - - @Test - default void saveShouldSaveEmptyInputStream() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEmpty(); - } - - @Test - default void saveShouldSaveEmptyByteSource() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEmpty(); - } - - @Test - default void readBytesShouldThrowWhenNotExisting() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown")).block()) - .isExactlyInstanceOf(ObjectNotFoundException.class); - } - - @Test - default void readBytesShouldReturnSavedData() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEqualTo(SHORT_BYTEARRAY); - } - - @Test - default void readBytesShouldReturnLongSavedData() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); - } - - @Test - default void readBytesShouldReturnBigSavedData() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); - - byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(bytes).isEqualTo(TWELVE_MEGABYTES); - } - - @Test - default void readShouldThrowWhenNotExistingStream() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown"))) - .isInstanceOf(ObjectNotFoundException.class); - } - - @Test - default void saveShouldCreateBucket() { - DumbBlobStore store = testee(); - BucketName nonExisting = BucketName.of("non-existing-bucket"); - store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - - //read for a non-existing bucket would throw - assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID)) - .doesNotThrowAnyException(); - } - - @Test - default void readShouldReturnSavedData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); - - assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); - } - - @Test - default void readShouldReturnLongSavedData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); - - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); - - assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); - } - - @Test - default void readShouldReturnBigSavedData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); - - InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); - - assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); - } - - @Test - default void saveBytesShouldOverwritePreviousData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - - byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(read).isEqualTo(SHORT_BYTEARRAY); - } - - @Test - default void saveByteSourceShouldOverwritePreviousData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)).block(); - - byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(read).isEqualTo(SHORT_BYTEARRAY); - } - - - @Test - default void saveInputStreamShouldOverwritePreviousData() { - DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); - - byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(read).isEqualTo(SHORT_BYTEARRAY); - } - - @Test - default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream()) - .onErrorResume(throwable -> Mono.empty()) - .block(); - - byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(read).isEqualTo(ELEVEN_KILOBYTES); - } - - @Test - default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() { - DumbBlobStore store = testee(); - - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { - @Override - public InputStream openStream() throws IOException { - return getThrowingInputStream(); - } - }) - .onErrorResume(throwable -> Mono.empty()) - .block(); - - byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - - assertThat(read).isEqualTo(ELEVEN_KILOBYTES); - } - - @Test - default void saveByteSourceShouldThrowOnIOException() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { - @Override - public InputStream openStream() throws IOException { - return getThrowingInputStream(); - } - }) - .block()) - .isInstanceOf(IOObjectStoreException.class); - } - - @Test - default void saveInputStreamShouldThrowOnIOException() { - DumbBlobStore store = testee(); - - assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream()) - .block()) - .isInstanceOf(IOObjectStoreException.class); - } - - @Test - default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes))) - .threadCount(10) - .operationCount(100) - .runSuccessfullyWithin(Duration.ofMinutes(2)); - } - - @Test - default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)))) - .threadCount(10) - .operationCount(100) - .runSuccessfullyWithin(Duration.ofMinutes(2)); - } - - @Test - default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); - ConcurrentTestRunner.builder() - .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)))) - .threadCount(10) - .operationCount(100) - .runSuccessfullyWithin(Duration.ofMinutes(2)); - } - - default Publisher<Void> getConcurrentOperation(Function<byte[], Mono<Void>> save) { - switch (ThreadLocalRandom.current().nextInt(4)) { - case 0: - return save.apply(SHORT_BYTEARRAY); - case 1: - return save.apply(ELEVEN_KILOBYTES); - case 2: - return save.apply(TWELVE_MEGABYTES); - default: - return checkConcurrentSaveOperation(); - } - } - - default Mono<Void> checkConcurrentSaveOperation() { - return Mono - .fromCallable(() -> - testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID)) - .flatMap(inputstream -> Mono.fromCallable(() -> IOUtils.toByteArray(inputstream))) - .doOnNext(inputStream -> assertThat(inputStream).isIn( - SHORT_BYTEARRAY, - ELEVEN_KILOBYTES, - TWELVE_MEGABYTES - )) - .then(); - } - - default FilterInputStream getThrowingInputStream() { - return new FilterInputStream(new ByteArrayInputStream(TWELVE_MEGABYTES)) { - int failingThreshold = 5; - int alreadyRead = 0; - - @Override - public int read() throws IOException { - if (alreadyRead < failingThreshold) { - alreadyRead++; - return super.read(); - } else { - throw new IOException("error on read"); - } - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int value = read(); - if (value != -1) { - b[off] = (byte) value; - } - return value; - } - - }; - } - +public interface DumbBlobStoreContract extends ReadSaveDumbBlobStoreContract, DeleteDumbBlobStoreContract, BucketDumbBlobStoreContract { } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java new file mode 100644 index 0000000..ec9e91f --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreFixture.java @@ -0,0 +1,37 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.blob.api; + +import java.nio.charset.StandardCharsets; + +import com.google.common.base.Strings; + +public interface DumbBlobStoreFixture { + BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket"); + BucketName CUSTOM_BUCKET_NAME = BucketName.of("custom"); + BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id"); + BlobId OTHER_TEST_BLOB_ID = new TestBlobId("other-test-blob-id"); + String SHORT_STRING = "toto"; + byte[] EMPTY_BYTEARRAY = {}; + byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8); + byte[] ELEVEN_KILOBYTES = Strings.repeat("2103456789\n", 1000).getBytes(StandardCharsets.UTF_8); + String TWELVE_MEGABYTES_STRING = Strings.repeat("7893456789\r\n", 1024 * 1024); + byte[] TWELVE_MEGABYTES = TWELVE_MEGABYTES_STRING.getBytes(StandardCharsets.UTF_8); + +} diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java similarity index 91% copy from server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java copy to server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java index 844ac69..665a5d3 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java @@ -19,6 +19,12 @@ package org.apache.james.blob.api; +import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES; +import static org.apache.james.blob.api.DumbBlobStoreFixture.EMPTY_BYTEARRAY; +import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BLOB_ID; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TEST_BUCKET_NAME; +import static org.apache.james.blob.api.DumbBlobStoreFixture.TWELVE_MEGABYTES; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -38,19 +44,10 @@ import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; -import com.google.common.base.Strings; import com.google.common.io.ByteSource; import reactor.core.publisher.Mono; -public interface DumbBlobStoreContract { - - BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket"); - BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id"); - String SHORT_STRING = "toto"; - byte[] EMPTY_BYTEARRAY = {}; - byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8); - byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8); - byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8); +public interface ReadSaveDumbBlobStoreContract { DumbBlobStore testee(); @@ -70,7 +67,6 @@ public interface DumbBlobStoreContract { .isInstanceOf(NullPointerException.class); } - @Test default void saveShouldThrowWhenNullInputStream() { DumbBlobStore store = testee(); @@ -80,6 +76,14 @@ public interface DumbBlobStoreContract { } @Test + default void saveShouldThrowWhenNullByteSource() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (ByteSource) null).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test default void saveShouldSaveEmptyData() { DumbBlobStore store = testee(); @@ -164,7 +168,7 @@ public interface DumbBlobStoreContract { } @Test - default void readShouldThrowWhenNotExistingStream() { + default void readStreamShouldThrowWhenNotExisting() { DumbBlobStore store = testee(); assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown"))) @@ -375,11 +379,15 @@ public interface DumbBlobStoreContract { @Override public int read(byte[] b, int off, int len) throws IOException { - int value = read(); - if (value != -1) { - b[off] = (byte) value; + int remaining = b.length - off; + int toRead = Math.min(remaining, len); + for (int i = 0; i < toRead; i ++) { + int value = read(); + if (value != -1) { + b[off] = (byte) value; + } } - return value; + return toRead; } }; diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java index 48e0d78..16a0a7b 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java @@ -30,6 +30,7 @@ import org.apache.james.blob.api.DumbBlobStore; import org.apache.james.blob.api.IOObjectStoreException; import org.apache.james.blob.api.ObjectNotFoundException; +import com.google.common.base.Preconditions; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import com.google.common.io.ByteSource; @@ -92,11 +93,20 @@ public class MemoryDumbBlobStore implements DumbBlobStore { @Override public Mono<Void> delete(BucketName bucketName, BlobId blobId) { - return null; + Preconditions.checkNotNull(bucketName); + return Mono.fromRunnable(() -> { + synchronized (blobs) { + blobs.remove(bucketName, blobId); + } + }); } @Override public Mono<Void> deleteBucket(BucketName bucketName) { - return null; + return Mono.fromRunnable(() -> { + synchronized (blobs) { + blobs.row(bucketName).clear(); + } + }); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org