Repository: james-project Updated Branches: refs/heads/master e85f41a1e -> eeafbf4b5
JAMES-2624 don't rely on empty content for error handling Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/eeafbf4b Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/eeafbf4b Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/eeafbf4b Branch: refs/heads/master Commit: eeafbf4b53d8091eaee0e1ff765784afadf2a349 Parents: e85f41a Author: Matthieu Baechler <[email protected]> Authored: Mon Dec 10 17:37:00 2018 +0100 Committer: Raphael Ouazana <[email protected]> Committed: Mon Dec 17 14:49:46 2018 +0100 ---------------------------------------------------------------------- .../james/blob/api/BlobStoreContract.java | 85 +++++++++----------- .../james/blob/cassandra/CassandraBlobsDAO.java | 33 ++++---- .../james/blob/memory/MemoryBlobStore.java | 15 +++- .../objectstorage/ObjectStorageBlobsDAO.java | 2 +- .../apache/james/blob/union/UnionBlobStore.java | 5 -- .../james/blob/union/UnionBlobStoreTest.java | 9 ++- 6 files changed, 76 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java index 40376a1..cfc441d 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BlobStoreContract.java @@ -23,36 +23,39 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import com.google.common.base.Strings; public interface BlobStoreContract { + byte[] EMPTY_BYTEARRAY = {}; + byte[] SHORT_BYTEARRAY = "toto".getBytes(StandardCharsets.UTF_8); + byte[] ELEVEN_KILOBYTES = Strings.repeat("0123456789\n", 1000).getBytes(StandardCharsets.UTF_8); + byte[] TWELVE_MEGABYTES = Strings.repeat("0123456789\r\n", 1024 * 1024).getBytes(StandardCharsets.UTF_8); + BlobStore testee(); BlobId.Factory blobIdFactory(); @Test - default void saveShouldReturnEmptyWhenNullData() throws Exception { + default void saveShouldThrowWhenNullData() { assertThatThrownBy(() -> testee().save((byte[]) null)) .isInstanceOf(NullPointerException.class); } @Test - default void saveShouldReturnEmptyWhenNullInputStream() throws Exception { + default void saveShouldThrowWhenNullInputStream() { assertThatThrownBy(() -> testee().save((InputStream) null)) .isInstanceOf(NullPointerException.class); } @Test - default void saveShouldSaveEmptyData() throws Exception { - BlobId blobId = testee().save(new byte[]{}).join(); + default void saveShouldSaveEmptyData() { + BlobId blobId = testee().save(EMPTY_BYTEARRAY).join(); byte[] bytes = testee().readBytes(blobId).join(); @@ -60,8 +63,8 @@ public interface BlobStoreContract { } @Test - default void saveShouldSaveEmptyInputStream() throws Exception { - BlobId blobId = testee().save(new ByteArrayInputStream(new byte[]{})).join(); + default void saveShouldSaveEmptyInputStream() { + BlobId blobId = testee().save(new ByteArrayInputStream(EMPTY_BYTEARRAY)).join(); byte[] bytes = testee().readBytes(blobId).join(); @@ -69,94 +72,84 @@ public interface BlobStoreContract { } @Test - default void saveShouldReturnBlobId() throws Exception { - BlobId blobId = testee().save("toto".getBytes(StandardCharsets.UTF_8)).join(); + default void saveShouldReturnBlobId() { + BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @Test - default void saveShouldReturnBlobIdOfInputStream() throws Exception { + default void saveShouldReturnBlobIdOfInputStream() { BlobId blobId = - testee().save(new ByteArrayInputStream("toto".getBytes(StandardCharsets.UTF_8))).join(); + testee().save(new ByteArrayInputStream(SHORT_BYTEARRAY)).join(); assertThat(blobId).isEqualTo(blobIdFactory().from("31f7a65e315586ac198bd798b6629ce4903d0899476d5741a9f32e2e521b6a66")); } @Test - default void readBytesShouldBeEmptyWhenNoExisting() throws IOException { - byte[] bytes = testee().readBytes(blobIdFactory().from("unknown")).join(); - - assertThat(bytes).isEmpty(); + default void readBytesShouldThrowWhenNoExisting() { + assertThatThrownBy(() -> testee().readBytes(blobIdFactory().from("unknown")).join()) + .hasCauseInstanceOf(ObjectStoreException.class); } @Test - default void readBytesShouldReturnSavedData() throws IOException { - BlobId blobId = testee().save("toto".getBytes(StandardCharsets.UTF_8)).join(); + default void readBytesShouldReturnSavedData() { + BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); byte[] bytes = testee().readBytes(blobId).join(); - assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("toto"); + assertThat(bytes).isEqualTo(SHORT_BYTEARRAY); } @Test - default void readBytesShouldReturnLongSavedData() throws IOException { - String longString = Strings.repeat("0123456789\n", 1000); - BlobId blobId = testee().save(longString.getBytes(StandardCharsets.UTF_8)).join(); + default void readBytesShouldReturnLongSavedData() { + BlobId blobId = testee().save(ELEVEN_KILOBYTES).join(); byte[] bytes = testee().readBytes(blobId).join(); - assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString); + assertThat(bytes).isEqualTo(ELEVEN_KILOBYTES); } @Test - default void readBytesShouldReturnBigSavedData() throws IOException { - // 12 MB of text - String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024); - BlobId blobId = testee().save(bigString.getBytes(StandardCharsets.UTF_8)).join(); + default void readBytesShouldReturnBigSavedData() { + BlobId blobId = testee().save(TWELVE_MEGABYTES).join(); byte[] bytes = testee().readBytes(blobId).join(); - assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(bigString); + assertThat(bytes).isEqualTo(TWELVE_MEGABYTES); } @Test - default void readShouldBeEmptyWhenNoExistingStream() throws IOException { - InputStream stream = testee().read(blobIdFactory().from("unknown")); - - assertThat(stream.read()).isEqualTo(IOUtils.EOF); + default void readShouldThrowWhenNoExistingStream() { + assertThatThrownBy(() -> testee().read(blobIdFactory().from("unknown"))) + .isInstanceOf(ObjectStoreException.class); } @Test - default void readShouldReturnSavedData() throws IOException { - byte[] bytes = "toto".getBytes(StandardCharsets.UTF_8); - BlobId blobId = testee().save(bytes).join(); + default void readShouldReturnSavedData() { + BlobId blobId = testee().save(SHORT_BYTEARRAY).join(); InputStream read = testee().read(blobId); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + assertThat(read).hasSameContentAs(new ByteArrayInputStream(SHORT_BYTEARRAY)); } @Test - default void readShouldReturnLongSavedData() throws IOException { - String longString = Strings.repeat("0123456789\n", 1000); - byte[] bytes = longString.getBytes(StandardCharsets.UTF_8); - BlobId blobId = testee().save(bytes).join(); + default void readShouldReturnLongSavedData() { + BlobId blobId = testee().save(ELEVEN_KILOBYTES).join(); InputStream read = testee().read(blobId); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + assertThat(read).hasSameContentAs(new ByteArrayInputStream(ELEVEN_KILOBYTES)); } @Test - default void readShouldReturnBigSavedData() throws IOException { + default void readShouldReturnBigSavedData() { // 12 MB of text - String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024); - byte[] bytes = bigString.getBytes(StandardCharsets.UTF_8); - BlobId blobId = testee().save(bytes).join(); + BlobId blobId = testee().save(TWELVE_MEGABYTES).join(); InputStream read = testee().read(blobId); - assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + assertThat(read).hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java index 557e927..e079176 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java @@ -31,6 +31,7 @@ import java.nio.channels.Channels; import java.nio.channels.Pipe; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -159,24 +160,21 @@ public class CassandraBlobsDAO implements BlobStore { @Override public CompletableFuture<byte[]> readBytes(BlobId blobId) { - return cassandraAsyncExecutor.executeSingleRow( - select.bind() - .setString(BlobTable.ID, blobId.asString())) - .thenCompose(row -> toDataParts(row, blobId)) + CompletableFuture<Row> futureRow = cassandraAsyncExecutor + .executeSingleRow( + select.bind() + .setString(BlobTable.ID, blobId.asString())) + .thenApply(x -> x.orElseThrow(() -> new ObjectStoreException(String.format("Could not retrieve blob metadata for %s", blobId)))); + return toDataParts(futureRow.join(), blobId) .thenApply(this::concatenateDataParts); } - private CompletableFuture<Stream<BlobPart>> toDataParts(Optional<Row> blobRowOptional, BlobId blobId) { - return blobRowOptional.map(blobRow -> { - int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK); - return FluentFutureStream.of( - IntStream.range(0, numOfChunk) - .mapToObj(position -> readPart(blobId, position))) - .completableFuture(); - }).orElseGet(() -> { - LOGGER.warn("Could not retrieve blob metadata for {}", blobId); - return CompletableFuture.completedFuture(Stream.empty()); - }); + private CompletableFuture<Stream<BlobPart>> toDataParts(Row blobRow, BlobId blobId) { + int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK); + return FluentFutureStream.of( + IntStream.range(0, numOfChunk) + .mapToObj(position -> readPart(blobId, position))) + .completableFuture(); } private byte[] concatenateDataParts(Stream<BlobPart> blobParts) { @@ -234,6 +232,11 @@ public class CassandraBlobsDAO implements BlobStore { .thenApply(ByteBuffer::wrap) .thenAccept(consumer.sneakyThrow()); return Channels.newInputStream(pipe.source()); + } catch (CompletionException e) { + if (e.getCause() instanceof ObjectStoreException) { + throw (ObjectStoreException)(e.getCause()); + } + throw new RuntimeException(e); } catch (IOException cause) { throw new ObjectStoreException( "Failed to convert CompletableFuture<byte[]> to InputStream", http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java index ec3be33..20c1fef 100644 --- a/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java +++ b/server/blob/blob-memory/src/main/java/org/apache/james/blob/memory/MemoryBlobStore.java @@ -22,12 +22,15 @@ package org.apache.james.blob.memory; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.ObjectStoreException; import com.google.common.base.Preconditions; @@ -63,7 +66,14 @@ public class MemoryBlobStore implements BlobStore { @Override public CompletableFuture<byte[]> readBytes(BlobId blobId) { - return CompletableFuture.completedFuture(retrieveStoredValue(blobId)); + try { + return CompletableFuture.completedFuture(retrieveStoredValue(blobId)); + } catch (ObjectStoreException e) { + Supplier<byte[]> throwing = () -> { + throw e; + }; + return CompletableFuture.supplyAsync(throwing); + } } @Override @@ -72,6 +82,7 @@ public class MemoryBlobStore implements BlobStore { } private byte[] retrieveStoredValue(BlobId blobId) { - return blobs.getOrDefault(blobId, new byte[]{}); + return Optional.ofNullable(blobs.get(blobId)) + .orElseThrow(() -> new ObjectStoreException("unable to find blob with id " + blobId)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java index 1409aaf..9c2ebe2 100644 --- a/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java +++ b/server/blob/blob-objectstorage/src/main/java/org/apache/james/blob/objectstorage/ObjectStorageBlobsDAO.java @@ -137,7 +137,7 @@ public class ObjectStorageBlobsDAO implements BlobStore { if (blob != null) { return payloadCodec.read(blob.getPayload()); } else { - return EMPTY_STREAM; + throw new ObjectStoreException("fail to load blob with id " + blobId); } } catch (IOException cause) { throw new ObjectStoreException( http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java index e18e564..c69367f 100644 --- a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java +++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java @@ -172,15 +172,10 @@ public class UnionBlobStore implements BlobStore { private CompletableFuture<byte[]> readFromLegacyIfNeeded(Optional<byte[]> readFromCurrentResult, BlobId blodId) { return readFromCurrentResult - .filter(this::hasContent) .map(CompletableFuture::completedFuture) .orElseGet(() -> legacyBlobStore.readBytes(blodId)); } - private boolean hasContent(byte [] bytes) { - return bytes.length > 0; - } - @Override public String toString() { return MoreObjects.toStringHelper(this) http://git-wip-us.apache.org/repos/asf/james-project/blob/eeafbf4b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java index d5a6977..b614ed7 100644 --- a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java +++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java @@ -35,6 +35,7 @@ import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BlobStoreContract; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.ObjectStoreException; import org.apache.james.blob.memory.MemoryBlobStore; import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.StreamUtils; @@ -387,8 +388,8 @@ class UnionBlobStoreTest implements BlobStoreContract { void saveShouldNotWriteToLegacy() throws Exception { BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); - assertThat(legacyBlobStore.readBytes(blobId).get()) - .isEmpty(); + assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join()) + .hasCauseInstanceOf(ObjectStoreException.class); } @Test @@ -403,8 +404,8 @@ class UnionBlobStoreTest implements BlobStoreContract { void saveInputStreamShouldNotWriteToLegacy() throws Exception { BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); - assertThat(legacyBlobStore.readBytes(blobId).get()) - .isEmpty(); + assertThatThrownBy(() -> legacyBlobStore.readBytes(blobId).join()) + .isNotInstanceOf(ObjectStoreException.class); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
