JAMES-2583 completely fallback to secondary

when primary fails, completed exceptionally or returns empty


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/45624137
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/45624137
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/45624137

Branch: refs/heads/master
Commit: 45624137336239289014e9f441732ac28bb64893
Parents: aa064ef
Author: tran tien duc <dt...@linagora.com>
Authored: Mon Nov 5 13:28:50 2018 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Wed Nov 7 14:12:19 2018 +0100

----------------------------------------------------------------------
 .../james/blob/joining/JoiningBlobStore.java    | 36 ++++++++++-
 .../blob/joining/JoiningBlobStoreTest.java      | 66 ++++++++++++++++++++
 2 files changed, 100 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
 
b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
index 4548007..aab4cdb 100644
--- 
a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
+++ 
b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
@@ -25,6 +25,7 @@ import java.io.PushbackInputStream;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
@@ -50,12 +51,26 @@ public class JoiningBlobStore implements BlobStore {
 
     @Override
     public CompletableFuture<BlobId> save(byte[] data) {
-        return primaryBlobStore.save(data);
+        try {
+            return saveToPrimaryFallbackIfFails(
+                primaryBlobStore.save(data),
+                () -> secondaryBlobStore.save(data));
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while saving bytes data, 
fall back to secondary blob store", e);
+            return secondaryBlobStore.save(data);
+        }
     }
 
     @Override
     public CompletableFuture<BlobId> save(InputStream data) {
-        return primaryBlobStore.save(data);
+        try {
+            return saveToPrimaryFallbackIfFails(
+                primaryBlobStore.save(data),
+                () -> secondaryBlobStore.save(data));
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while saving InputStream 
data, fall back to secondary blob store", e);
+            return secondaryBlobStore.save(data);
+        }
     }
 
     @Override
@@ -103,11 +118,28 @@ public class JoiningBlobStore implements BlobStore {
             .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, 
blobId));
     }
 
+    private CompletableFuture<BlobId> saveToPrimaryFallbackIfFails(
+        CompletableFuture<BlobId> primarySavingOperation,
+        Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) {
+
+        return primarySavingOperation
+            .thenApply(Optional::ofNullable)
+            .exceptionally(this::logAndReturnEmptyOptional)
+            .thenCompose(maybeBlobId -> saveToSecondaryIfNeeded(maybeBlobId, 
fallbackSavingOperationSupplier));
+    }
+
     private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) {
         LOGGER.error("primary completed exceptionally, fall back to second 
blob store", throwable);
         return Optional.empty();
     }
 
+    private CompletableFuture<BlobId> saveToSecondaryIfNeeded(Optional<BlobId> 
maybeBlobId,
+                                                              
Supplier<CompletableFuture<BlobId>> saveToSecondarySupplier) {
+        return maybeBlobId
+            .map(CompletableFuture::completedFuture)
+            .orElseGet(saveToSecondarySupplier);
+    }
+
     private CompletableFuture<byte[]> 
readFromSecondaryIfNeeded(Optional<byte[]> readFromPrimaryResult, BlobId 
blodId) {
         return readFromPrimaryResult
             .filter(this::hasContent)

http://git-wip-us.apache.org/repos/asf/james-project/blob/45624137/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
 
b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
index a26dff5..329dd92 100644
--- 
a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
+++ 
b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
@@ -33,6 +33,7 @@ import org.apache.james.blob.api.BlobStoreContract;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.memory.MemoryBlobStore;
 import org.apache.james.util.CompletableFutureUtil;
+import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
@@ -110,6 +111,71 @@ class JoiningBlobStoreTest implements BlobStoreContract {
     }
 
     @Nested
+    class PrimarySaveThrowsExceptionDirectly {
+
+        @Test
+        void saveShouldFallBackToSecondaryWhenPrimaryGotException() throws 
Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
ThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+        @Test
+        void saveInputStreamShouldFallBackToSecondaryWhenPrimaryGotException() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
ThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+    }
+
+    @Nested
+    class PrimarySaveCompletesExceptionally {
+
+        @Test
+        void saveShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
FutureThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+        @Test
+        void 
saveInputStreamShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
FutureThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = joiningBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+
+            SoftAssertions.assertSoftly(softly -> {
+                softly.assertThat(joiningBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+                softly.assertThat(secondaryBlobStore.read(blobId))
+                    .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+            });
+        }
+
+    }
+
+    @Nested
     class PrimaryReadThrowsExceptionDirectly {
 
         @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to