This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b0f44ad0996133092f9d62b1b7d731d9187bd320 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Wed Jan 8 17:45:53 2020 +0100 JAMES-3028 Define a DumbBlobStore interface and a read/save Contract testsuite --- .../java/org/apache/james/blob/api/BlobStore.java | 8 +- .../org/apache/james/blob/api/DumbBlobStore.java | 95 +++++ ...{BlobStore.java => IOObjectStoreException.java} | 32 +- .../james/blob/api/DumbBlobStoreContract.java | 388 +++++++++++++++++++++ .../james/blob/memory/MemoryDumbBlobStore.java | 102 ++++++ .../blob/memory/MemoryDumbBlobStoreTest.java} | 38 +- 6 files changed, 608 insertions(+), 55 deletions(-) diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index 59414d5..458b354 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -35,14 +35,14 @@ public interface BlobStore { Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy); - Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); - - InputStream read(BucketName bucketName, BlobId blobId); - default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy); } + Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); + + InputStream read(BucketName bucketName, BlobId blobId); + BucketName getDefaultBucketName(); Mono<Void> deleteBucket(BucketName bucketName); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java new file mode 100644 index 0000000..97dbc11 --- /dev/null +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/DumbBlobStore.java @@ -0,0 +1,95 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Mono; + +public interface DumbBlobStore { + + /** + * Reads a Blob based on its BucketName and its BlobId. + * + * @throws ObjectNotFoundException when the blobId or the bucket is not found + * @throws IOObjectStoreException when an unexpected IO error occurs + */ + InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException; + + + /** + * Reads a Blob based on its BucketName and its BlobId + * + * @return a Mono containing the content of the blob or + * an ObjectNotFoundException in its error channel when the blobId or the bucket is not found + * or an IOObjectStoreException when an unexpected IO error occurs + */ + Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); + + + /** + * Save the blob with the provided blob id, and overwrite the previous blob with the same id if it already exists + * The bucket is created if it not already exists. + * This operation should be atomic and isolated + * @return an empty Mono when the save succeed, + * otherwise an IOObjectStoreException in its error channel + */ + Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data); + + /** + * @see #save(BucketName, BlobId, byte[]) + * + * The InputStream should be closed after the call to this method + */ + Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream); + + /** + * @see #save(BucketName, BlobId, byte[]) + */ + Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content); + + /** + * @see #save(BucketName, BlobId, byte[]) + * + * The String is stored as UTF-8. + */ + default Mono<Void> save(BucketName bucketName, BlobId blobId, String data) { + return save(bucketName, blobId, data.getBytes(StandardCharsets.UTF_8)); + } + + /** + * Remove a Blob based on its BucketName and its BlobId. + * + * @return a successful Mono if the Blob is deleted or did not exist + * otherwise an IOObjectStoreException in its error channel + */ + Mono<Void> delete(BucketName bucketName, BlobId blobId); + + /** + * Remove a bucket based on its BucketName + * + * @return a successful Mono if the bucket is deleted or did not exist + * otherwise an IOObjectStoreException in its error channel + */ + Mono<Void> deleteBucket(BucketName bucketName); +} diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java similarity index 58% copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java copy to server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java index 59414d5..b36d138 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/IOObjectStoreException.java @@ -16,36 +16,16 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ -package org.apache.james.blob.api; - -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import reactor.core.publisher.Mono; +package org.apache.james.blob.api; -public interface BlobStore { +public class IOObjectStoreException extends ObjectStoreException { - enum StoragePolicy { - SIZE_BASED, - LOW_COST, - HIGH_PERFORMANCE + public IOObjectStoreException(String message) { + super(message); } - Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy); - - Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy); - - Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); - - InputStream read(BucketName bucketName, BlobId blobId); - - default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { - return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy); + public IOObjectStoreException(String message, Throwable cause) { + super(message, cause); } - - BucketName getDefaultBucketName(); - - Mono<Void> deleteBucket(BucketName bucketName); - - Mono<Void> delete(BucketName bucketName, BlobId blobId); } diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java new file mode 100644 index 0000000..844ac69 --- /dev/null +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/DumbBlobStoreContract.java @@ -0,0 +1,388 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; + +import org.apache.commons.io.IOUtils; +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; + +import com.google.common.base.Strings; +import com.google.common.io.ByteSource; +import reactor.core.publisher.Mono; + +public interface DumbBlobStoreContract { + + BucketName TEST_BUCKET_NAME = BucketName.of("my-test-bucket"); + BlobId TEST_BLOB_ID = new TestBlobId("test-blob-id"); + String SHORT_STRING = "toto"; + byte[] EMPTY_BYTEARRAY = {}; + byte[] SHORT_BYTEARRAY = SHORT_STRING.getBytes(StandardCharsets.UTF_8); + byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8); + byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8); + + DumbBlobStore testee(); + + @Test + default void saveShouldThrowWhenNullData() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void saveShouldThrowWhenNullString() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (String) null).block()) + .isInstanceOf(NullPointerException.class); + } + + + @Test + default void saveShouldThrowWhenNullInputStream() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (InputStream) null).block()) + .isInstanceOf(NullPointerException.class); + } + + @Test + default void saveShouldSaveEmptyData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, EMPTY_BYTEARRAY).block(); + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEmpty(); + } + + @Test + default void saveShouldSaveEmptyString() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, "").block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); + } + + @Test + default void saveShouldSaveEmptyInputStream() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(EMPTY_BYTEARRAY)).block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEmpty(); + } + + @Test + default void saveShouldSaveEmptyByteSource() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.empty()).block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEmpty(); + } + + @Test + default void readBytesShouldThrowWhenNotExisting() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.readBytes(TEST_BUCKET_NAME, new TestBlobId("unknown")).block()) + .isExactlyInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void readBytesShouldReturnSavedData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEqualTo(SHORT_BYTEARRAY); + } + + @Test + default void readBytesShouldReturnLongSavedData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); + } + + @Test + default void readBytesShouldReturnBigSavedData() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); + + byte[] bytes = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(bytes).isEqualTo(TWELVE_MEGABYTES); + } + + @Test + default void readShouldThrowWhenNotExistingStream() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, new TestBlobId("unknown"))) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + default void saveShouldCreateBucket() { + DumbBlobStore store = testee(); + BucketName nonExisting = BucketName.of("non-existing-bucket"); + store.save(nonExisting, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + //read for a non-existing bucket would throw + assertThatCode(() -> store.read(nonExisting, TEST_BLOB_ID)) + .doesNotThrowAnyException(); + } + + @Test + default void readShouldReturnSavedData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); + } + + @Test + default void readShouldReturnLongSavedData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); + + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); + } + + @Test + default void readShouldReturnBigSavedData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, TWELVE_MEGABYTES).block(); + + InputStream read = store.read(TEST_BUCKET_NAME, TEST_BLOB_ID); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); + } + + @Test + default void saveBytesShouldOverwritePreviousData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(read).isEqualTo(SHORT_BYTEARRAY); + } + + @Test + default void saveByteSourceShouldOverwritePreviousData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY)).block(); + + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(read).isEqualTo(SHORT_BYTEARRAY); + } + + + @Test + default void saveInputStreamShouldOverwritePreviousData() { + DumbBlobStore store = testee(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY)).block(); + + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(read).isEqualTo(SHORT_BYTEARRAY); + } + + @Test + default void saveInputStreamShouldNotOverwritePreviousDataOnFailingInputStream() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream()) + .onErrorResume(throwable -> Mono.empty()) + .block(); + + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(read).isEqualTo(ELEVEN_KILOBYTES); + } + + @Test + default void saveByteSourceShouldNotOverwritePreviousDataOnFailingInputStream() { + DumbBlobStore store = testee(); + + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(ELEVEN_KILOBYTES)).block(); + store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { + @Override + public InputStream openStream() throws IOException { + return getThrowingInputStream(); + } + }) + .onErrorResume(throwable -> Mono.empty()) + .block(); + + byte[] read = store.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID).block(); + + assertThat(read).isEqualTo(ELEVEN_KILOBYTES); + } + + @Test + default void saveByteSourceShouldThrowOnIOException() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { + @Override + public InputStream openStream() throws IOException { + return getThrowingInputStream(); + } + }) + .block()) + .isInstanceOf(IOObjectStoreException.class); + } + + @Test + default void saveInputStreamShouldThrowOnIOException() { + DumbBlobStore store = testee(); + + assertThatThrownBy(() -> store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream()) + .block()) + .isInstanceOf(IOObjectStoreException.class); + } + + @Test + default void concurrentSaveBytesShouldReturnConsistentValues() throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + ConcurrentTestRunner.builder() + .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, bytes))) + .threadCount(10) + .operationCount(100) + .runSuccessfullyWithin(Duration.ofMinutes(2)); + } + + @Test + default void concurrentSaveInputStreamShouldReturnConsistentValues() throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + ConcurrentTestRunner.builder() + .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(bytes)))) + .threadCount(10) + .operationCount(100) + .runSuccessfullyWithin(Duration.ofMinutes(2)); + } + + @Test + default void concurrentSaveByteSourceShouldReturnConsistentValues() throws ExecutionException, InterruptedException { + testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY).block(); + ConcurrentTestRunner.builder() + .reactorOperation((thread, iteration) -> getConcurrentOperation(bytes -> testee().save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(bytes)))) + .threadCount(10) + .operationCount(100) + .runSuccessfullyWithin(Duration.ofMinutes(2)); + } + + default Publisher<Void> getConcurrentOperation(Function<byte[], Mono<Void>> save) { + switch (ThreadLocalRandom.current().nextInt(4)) { + case 0: + return save.apply(SHORT_BYTEARRAY); + case 1: + return save.apply(ELEVEN_KILOBYTES); + case 2: + return save.apply(TWELVE_MEGABYTES); + default: + return checkConcurrentSaveOperation(); + } + } + + default Mono<Void> checkConcurrentSaveOperation() { + return Mono + .fromCallable(() -> + testee().read(TEST_BUCKET_NAME, TEST_BLOB_ID)) + .flatMap(inputstream -> Mono.fromCallable(() -> IOUtils.toByteArray(inputstream))) + .doOnNext(inputStream -> assertThat(inputStream).isIn( + SHORT_BYTEARRAY, + ELEVEN_KILOBYTES, + TWELVE_MEGABYTES + )) + .then(); + } + + default FilterInputStream getThrowingInputStream() { + return new FilterInputStream(new ByteArrayInputStream(TWELVE_MEGABYTES)) { + int failingThreshold = 5; + int alreadyRead = 0; + + @Override + public int read() throws IOException { + if (alreadyRead < failingThreshold) { + alreadyRead++; + return super.read(); + } else { + throw new IOException("error on read"); + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int value = read(); + if (value != -1) { + b[off] = (byte) value; + } + return value; + } + + }; + } + +} diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java new file mode 100644 index 0000000..48e0d78 --- /dev/null +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryDumbBlobStore.java @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.memory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.IOObjectStoreException; +import org.apache.james.blob.api.ObjectNotFoundException; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Mono; + +public class MemoryDumbBlobStore implements DumbBlobStore { + + private final Table<BucketName, BlobId, byte[]> blobs; + + public MemoryDumbBlobStore() { + blobs = HashBasedTable.create(); + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws IOObjectStoreException, ObjectNotFoundException { + return readBytes(bucketName, blobId) + .map(ByteArrayInputStream::new) + .block(); + } + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.fromCallable(() -> blobs.get(bucketName, blobId)) + .switchIfEmpty(Mono.error(() -> new ObjectNotFoundException(String.format("blob '%s' not found in bucket '%s'", blobId.asString(), bucketName.asString())))); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + return Mono.fromRunnable(() -> { + synchronized (blobs) { + blobs.put(bucketName, blobId, data); + } + }); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + return Mono.fromCallable(() -> { + try { + return IOUtils.toByteArray(inputStream); + } catch (IOException e) { + throw new IOObjectStoreException("IOException occured", e); + } + }) + .flatMap(bytes -> save(bucketName, blobId, bytes)); + } + + @Override + public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + return Mono.fromCallable(() -> { + try { + return content.read(); + } catch (IOException e) { + throw new IOObjectStoreException("IOException occured", e); + } + }) + .flatMap(bytes -> save(bucketName, blobId, bytes)); + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + return null; + } + + @Override + public Mono<Void> deleteBucket(BucketName bucketName) { + return null; + } +} diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java similarity index 57% copy from server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java copy to server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java index 59414d5..eb326f4 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-memory/src/test/java/org/apache/james/blob/memory/MemoryDumbBlobStoreTest.java @@ -16,36 +16,24 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ -package org.apache.james.blob.api; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; +package org.apache.james.blob.memory; -import reactor.core.publisher.Mono; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.DumbBlobStoreContract; +import org.junit.jupiter.api.BeforeEach; -public interface BlobStore { +class MemoryDumbBlobStoreTest implements DumbBlobStoreContract { - enum StoragePolicy { - SIZE_BASED, - LOW_COST, - HIGH_PERFORMANCE - } - - Mono<BlobId> save(BucketName bucketName, byte[] data, StoragePolicy storagePolicy); - - Mono<BlobId> save(BucketName bucketName, InputStream data, StoragePolicy storagePolicy); - - Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId); + private MemoryDumbBlobStore blobStore; - InputStream read(BucketName bucketName, BlobId blobId); - - default Mono<BlobId> save(BucketName bucketName, String data, StoragePolicy storagePolicy) { - return save(bucketName, data.getBytes(StandardCharsets.UTF_8), storagePolicy); + @BeforeEach + void setUp() { + blobStore = new MemoryDumbBlobStore(); } - BucketName getDefaultBucketName(); - - Mono<Void> deleteBucket(BucketName bucketName); - - Mono<Void> delete(BucketName bucketName, BlobId blobId); + @Override + public DumbBlobStore testee() { + return blobStore; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org