This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 28808b8c73505a120b1f6302e465c8d5685c9142 Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Thu Jan 9 17:45:02 2020 +0100 JAMES-3028 Relax contract for dumb blob api: blobid is considered as unique --- .../org/apache/james/blob/api/DumbBlobStore.java | 1 + .../blob/api/DeleteDumbBlobStoreContract.java | 2 +- .../blob/api/ReadSaveDumbBlobStoreContract.java | 91 +++++++++++----------- 3 files changed, 49 insertions(+), 45 deletions(-) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java index b19161a..79305d1 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java @@ -51,6 +51,7 @@ public interface DumbBlobStore { * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists * The bucket is created if it not already exists. * This operation should be atomic and isolated + * Two blobs having the same blobId must have the same content * @return an empty Mono when the save succeed, * otherwise an IOObjectStoreException in its error channel */ diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java index e7d4a40..afd8f0e 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DeleteDumbBlobStoreContract.java @@ -187,7 +187,7 @@ public interface DeleteDumbBlobStoreContract { if (!string.equals(TWELVE_MEGABYTES_STRING)) { throw new RuntimeException("Should not read partial blob when an other thread is deleting it. Size : " + string.length()); } - } catch (ObjectStoreException exception) { + } catch (ObjectNotFoundException exception) { // normal behavior here } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java index 98b589a..c6ee9ab 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ReadSaveDumbBlobStoreContract.java @@ -36,10 +36,14 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.ExecutionException; +import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.james.util.concurrency.ConcurrentTestRunner; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import com.google.common.io.ByteSource; import reactor.core.publisher.Mono; @@ -213,38 +217,40 @@ public interface ReadSaveDumbBlobStoreContract { assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); } - @Test - default void saveBytesShouldOverwritePreviousData() { + @ParameterizedTest + @MethodSource("blobs") + default void saveBytesShouldBeIdempotent(byte[] bytes) { DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block(); byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - assertThat(read).isEqualTo(SHORT_BYTEARRAY); + assertThat(read).isEqualTo(bytes); } - @Test - default void saveByteSourceShouldOverwritePreviousData() { + @ParameterizedTest + @MethodSource("blobs") + default void saveByteSourceShouldBeIdempotent(byte[] bytes) { DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block(); byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - assertThat(read).isEqualTo(SHORT_BYTEARRAY); + assertThat(read).isEqualTo(bytes); } - - @Test - default void saveInputStreamShouldOverwritePreviousData() { + @ParameterizedTest + @MethodSource("blobs") + default void saveInputStreamShouldBeIdempotent(byte[] bytes) { DumbBlobStore store = testee(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); - store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)).block(); byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); - assertThat(read).isEqualTo(SHORT_BYTEARRAY); + assertThat(read).isEqualTo(bytes); } @Test @@ -303,61 +309,58 @@ public interface ReadSaveDumbBlobStoreContract { .isInstanceOf(IOObjectStoreException.class); } - @Test - default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + static Stream<Arguments> blobs() { + return Stream.of(SHORT_BYTEARRAY, ELEVEN_KILOBYTES, TWELVE_MEGABYTES) + .map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("blobs") + default void concurrentSaveBytesShouldReturnConsistentValues(byte[] bytes) throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES), - (threadNumber, step) -> checkConcurrentSaveOperation() + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes) ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); } - @Test - default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + @ParameterizedTest + @MethodSource("blobs") + default void concurrentSaveInputStreamShouldReturnConsistentValues(byte[] bytes) throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(ELEVEN_KILOBYTES)), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(TWELVE_MEGABYTES)), - (threadNumber, step) -> checkConcurrentSaveOperation() + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes) ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); } - @Test - default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException { - testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + @ParameterizedTest + @MethodSource("blobs") + default void concurrentSaveByteSourceShouldReturnConsistentValues(byte[] bytes) throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes).block(); ConcurrentTestRunner.builder() .randomlyDistributedReactorOperations( - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)), - (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(TWELVE_MEGABYTES)), - (threadNumber, step) -> checkConcurrentSaveOperation() + (threadNumber, step) -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)), + (threadNumber, step) -> checkConcurrentSaveOperation(bytes) ) .threadCount(10) .operationCount(100) .runSuccessfullyWithin(Duration.ofMinutes(2)); } - default Mono<Void> checkConcurrentSaveOperation() { + default Mono<Void> checkConcurrentSaveOperation(byte[] bytes) { return Mono .fromCallable(() -> testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID)) - .flatMap(inputstream -> Mono.fromCallable(() -> IOUtils.toByteArray(inputstream))) - .doOnNext(inputStream -> assertThat(inputStream).isIn( - SHORT_BYTEARRAY, - ELEVEN_KILOBYTES, - TWELVE_MEGABYTES - )) + .doOnNext(inputStream -> assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(bytes))) .then(); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org