JAMES-2583 completely fallback to secondary when primary fails, completed exceptionally or returns empty
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/45624137 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/45624137 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/45624137 Branch: refs/heads/master Commit: 45624137336239289014e9f441732ac28bb64893 Parents: aa064ef Author: tran tien duc <dt...@linagora.com> Authored: Mon Nov 5 13:28:50 2018 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Wed Nov 7 14:12:19 2018 +0100 ---------------------------------------------------------------------- .../james/blob/joining/JoiningBlobStore.java | 36 ++++++++++- .../blob/joining/JoiningBlobStoreTest.java | 66 ++++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java index 4548007..aab4cdb 100644 --- a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java +++ b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java @@ -25,6 +25,7 @@ import java.io.PushbackInputStream; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; @@ -50,12 +51,26 @@ public class JoiningBlobStore implements BlobStore { @Override public CompletableFuture<BlobId> save(byte[] data) { - return primaryBlobStore.save(data); + try { + return saveToPrimaryFallbackIfFails( + primaryBlobStore.save(data), + () -> secondaryBlobStore.save(data)); + } catch (Exception e) { + LOGGER.error("exception directly happens while saving bytes data, fall back to secondary blob store", e); + return secondaryBlobStore.save(data); + } } @Override public CompletableFuture<BlobId> save(InputStream data) { - return primaryBlobStore.save(data); + try { + return saveToPrimaryFallbackIfFails( + primaryBlobStore.save(data), + () -> secondaryBlobStore.save(data)); + } catch (Exception e) { + LOGGER.error("exception directly happens while saving InputStream data, fall back to secondary blob store", e); + return secondaryBlobStore.save(data); + } } @Override @@ -103,11 +118,28 @@ public class JoiningBlobStore implements BlobStore { .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, blobId)); } + private CompletableFuture<BlobId> saveToPrimaryFallbackIfFails( + CompletableFuture<BlobId> primarySavingOperation, + Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) { + + return primarySavingOperation + .thenApply(Optional::ofNullable) + .exceptionally(this::logAndReturnEmptyOptional) + .thenCompose(maybeBlobId -> saveToSecondaryIfNeeded(maybeBlobId, fallbackSavingOperationSupplier)); + } + private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) { LOGGER.error("primary completed exceptionally, fall back to second blob store", throwable); return Optional.empty(); } + private CompletableFuture<BlobId> saveToSecondaryIfNeeded(Optional<BlobId> maybeBlobId, + Supplier<CompletableFuture<BlobId>> saveToSecondarySupplier) { + return maybeBlobId + .map(CompletableFuture::completedFuture) + .orElseGet(saveToSecondarySupplier); + } + private CompletableFuture<byte[]> readFromSecondaryIfNeeded(Optional<byte[]> readFromPrimaryResult, BlobId blodId) { return readFromPrimaryResult .filter(this::hasContent) http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java index a26dff5..329dd92 100644 --- a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java +++ b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java @@ -33,6 +33,7 @@ import org.apache.james.blob.api.BlobStoreContract; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.memory.MemoryBlobStore; import org.apache.james.util.CompletableFutureUtil; +import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -110,6 +111,71 @@ class JoiningBlobStoreTest implements BlobStoreContract { } @Nested + class PrimarySaveThrowsExceptionDirectly { + + @Test + void saveShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(secondaryBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + @Test + void saveInputStreamShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(secondaryBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + } + + @Nested + class PrimarySaveCompletesExceptionally { + + @Test + void saveShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(secondaryBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + @Test + void saveInputStreamShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(secondaryBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + } + + @Nested class PrimaryReadThrowsExceptionDirectly { @Test --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org