JAMES-2583 new JoiningBlobStore

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b48c4915
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b48c4915
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b48c4915

Branch: refs/heads/master
Commit: b48c491588f396981d679e6cdd80bb91334f6f7f
Parents: 97a5eb2
Author: tran tien duc <dt...@linagora.com>
Authored: Fri Nov 2 17:33:13 2018 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Wed Nov 7 14:12:19 2018 +0100

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 server/blob/blob-joining/pom.xml                |  81 +++++++
 .../james/blob/joining/JoiningBlobStore.java    | 109 +++++++++
 .../blob/joining/JoiningBlobStoreTest.java      | 223 +++++++++++++++++++
 server/blob/pom.xml                             |   1 +
 5 files changed, 419 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 52f9740..f303f8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1067,6 +1067,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>blob-joining</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>blob-objectstorage</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/pom.xml
----------------------------------------------------------------------
diff --git a/server/blob/blob-joining/pom.xml b/server/blob/blob-joining/pom.xml
new file mode 100644
index 0000000..74349dd
--- /dev/null
+++ b/server/blob/blob-joining/pom.xml
@@ -0,0 +1,81 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied. See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>james-server-blob</artifactId>
+        <groupId>org.apache.james</groupId>
+        <version>3.2.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>blob-joining</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Apache James :: Server :: Blob :: Joining Blob Storages</name>
+    <description>
+        An implementation of BlobStore which relies on a primary and a 
secondary BlobStore by order for reading.
+        And using primary BlobStore for writing
+    </description>
+
+    <dependencies>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-api</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-testing</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-util</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
 
b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
new file mode 100644
index 0000000..cd32a04
--- /dev/null
+++ 
b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java
@@ -0,0 +1,109 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.joining;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
+
+public class JoiningBlobStore implements BlobStore {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(JoiningBlobStore.class);
+
+    private final BlobStore primaryBlobStore;
+    private final BlobStore secondaryBlobStore;
+
+    @VisibleForTesting
+    JoiningBlobStore(BlobStore primaryBlobStore, BlobStore secondaryBlobStore) 
{
+        this.primaryBlobStore = primaryBlobStore;
+        this.secondaryBlobStore = secondaryBlobStore;
+    }
+
+    @Override
+    public CompletableFuture<BlobId> save(byte[] data) {
+        return primaryBlobStore.save(data);
+    }
+
+    @Override
+    public CompletableFuture<BlobId> save(InputStream data) {
+        return primaryBlobStore.save(data);
+    }
+
+    @Override
+    public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+        try {
+            return readBytesFallBackIfFailsOrEmptyResult(blobId);
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while readBytes, fall 
back to secondary blob store", e);
+            return secondaryBlobStore.readBytes(blobId);
+        }
+    }
+
+    @Override
+    public InputStream read(BlobId blobId) {
+        try {
+            return readFallBackIfEmptyResult(blobId);
+        } catch (Exception e) {
+            LOGGER.error("exception directly happens while read, fall back to 
secondary blob store", e);
+            return secondaryBlobStore.read(blobId);
+        }
+    }
+
+    private InputStream readFallBackIfEmptyResult(BlobId blobId) {
+        return Optional.ofNullable(primaryBlobStore.read(blobId))
+            .map(Throwing.<InputStream, 
byte[]>function(IOUtils::toByteArray).sneakyThrow())
+            .filter(this::hasContent)
+            .<InputStream>map(ByteArrayInputStream::new)
+            .orElseGet(() -> secondaryBlobStore.read(blobId));
+    }
+
+    private CompletableFuture<byte[]> 
readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) {
+        return primaryBlobStore.readBytes(blobId)
+            .thenApply(Optional::ofNullable)
+            .exceptionally(this::logAndReturnEmptyOptional)
+            .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, 
blobId));
+    }
+
+    private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) {
+        LOGGER.error("primary completed exceptionally, fall back to second 
blob store", throwable);
+        return Optional.empty();
+    }
+
+    private CompletableFuture<byte[]> 
readFromSecondaryIfNeeded(Optional<byte[]> readFromPrimaryResult, BlobId 
blodId) {
+        return readFromPrimaryResult
+            .filter(this::hasContent)
+            .map(CompletableFuture::completedFuture)
+            .orElseGet(() -> secondaryBlobStore.readBytes(blodId));
+    }
+
+    private boolean hasContent(byte [] bytes) {
+        return bytes.length > 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
----------------------------------------------------------------------
diff --git 
a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
 
b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
new file mode 100644
index 0000000..b324cbd
--- /dev/null
+++ 
b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java
@@ -0,0 +1,223 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.blob.joining;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreContract;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.memory.MemoryBlobStore;
+import org.apache.james.util.CompletableFutureUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+class JoiningBlobStoreTest implements BlobStoreContract {
+
+    private static class FutureThrowingBlobStore implements BlobStore {
+
+        @Override
+        public CompletableFuture<BlobId> save(byte[] data) {
+            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("not supported"));
+        }
+
+        @Override
+        public CompletableFuture<BlobId> save(InputStream data) {
+            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("not supported"));
+        }
+
+        @Override
+        public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+            return CompletableFutureUtil.exceptionallyFuture(new 
RuntimeException("not supported"));
+        }
+
+        @Override
+        public InputStream read(BlobId blobId) {
+            throw new RuntimeException("not supported");
+        }
+    }
+
+    private static class ThrowingBlobStore implements BlobStore {
+
+        @Override
+        public CompletableFuture<BlobId> save(byte[] data) {
+            throw new RuntimeException("not supported");
+        }
+
+        @Override
+        public CompletableFuture<BlobId> save(InputStream data) {
+            throw new RuntimeException("not supported");
+        }
+
+        @Override
+        public CompletableFuture<byte[]> readBytes(BlobId blobId) {
+            throw new RuntimeException("not supported");
+        }
+
+        @Override
+        public InputStream read(BlobId blobId) {
+            throw new RuntimeException("not supported");
+        }
+    }
+
+    private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
+    private static final byte [] BLOB_CONTENT = "blob content".getBytes();
+
+    private MemoryBlobStore primaryBlobStore;
+    private MemoryBlobStore secondaryBlobStore;
+    private JoiningBlobStore joiningBlobStore;
+
+    @BeforeEach
+    void setup() {
+        primaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+        secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY);
+        joiningBlobStore = new JoiningBlobStore(primaryBlobStore, 
secondaryBlobStore);
+    }
+
+    @Override
+    public BlobStore testee() {
+        return joiningBlobStore;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return BLOB_ID_FACTORY;
+    }
+
+    @Nested
+    class PrimaryReadThrowsExceptionDirectly {
+
+        @Test
+        void readShouldReturnFallbackToSecondaryWhenPrimaryGotException() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
ThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+            assertThat(joiningBlobStore.read(blobId))
+                .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+        }
+
+        @Test
+        void readBytesShouldReturnFallbackToSecondaryWhenPrimaryGotException() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
ThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+            assertThat(joiningBlobStore.readBytes(blobId).get())
+                .isEqualTo(BLOB_CONTENT);
+        }
+
+    }
+
+    @Nested
+    class PrimaryReadCompletesExceptionally {
+
+        @Test
+        void 
readShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() throws 
Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
FutureThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+            assertThat(joiningBlobStore.read(blobId))
+                .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+        }
+
+        @Test
+        void 
readBytesShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() 
throws Exception {
+            MemoryBlobStore secondaryBlobStore = new 
MemoryBlobStore(BLOB_ID_FACTORY);
+            JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new 
FutureThrowingBlobStore(), secondaryBlobStore);
+            BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+            assertThat(joiningBlobStore.readBytes(blobId).get())
+                .isEqualTo(BLOB_CONTENT);
+        }
+    }
+
+
+    @Test
+    void readShouldReturnFromPrimaryWhenAvailable() throws Exception {
+        BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(joiningBlobStore.read(blobId))
+            .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+    }
+
+    @Test
+    void readShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws 
Exception {
+        BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(joiningBlobStore.read(blobId))
+            .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT));
+    }
+
+    @Test
+    void readBytesShouldReturnFromPrimaryWhenAvailable() throws Exception {
+        BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(joiningBlobStore.readBytes(blobId).get())
+            .isEqualTo(BLOB_CONTENT);
+    }
+
+    @Test
+    void readBytesShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws 
Exception {
+        BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(joiningBlobStore.readBytes(blobId).get())
+            .isEqualTo(BLOB_CONTENT);
+    }
+
+    @Test
+    void saveShouldWriteToPrimary() throws Exception {
+        BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(primaryBlobStore.readBytes(blobId).get())
+            .isEqualTo(BLOB_CONTENT);
+    }
+
+    @Test
+    void saveShouldNotWriteToSecondary() throws Exception {
+        BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get();
+
+        assertThat(secondaryBlobStore.readBytes(blobId).get())
+            .isEmpty();
+    }
+
+    @Test
+    void saveInputStreamShouldWriteToPrimary() throws Exception {
+        BlobId blobId = joiningBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+
+        assertThat(primaryBlobStore.readBytes(blobId).get())
+            .isEqualTo(BLOB_CONTENT);
+    }
+
+    @Test
+    void saveInputStreamShouldNotWriteToSecondary() throws Exception {
+        BlobId blobId = joiningBlobStore.save(new 
ByteArrayInputStream(BLOB_CONTENT)).get();
+
+        assertThat(secondaryBlobStore.readBytes(blobId).get())
+            .isEmpty();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/pom.xml
----------------------------------------------------------------------
diff --git a/server/blob/pom.xml b/server/blob/pom.xml
index 1b674bc..131baa2 100644
--- a/server/blob/pom.xml
+++ b/server/blob/pom.xml
@@ -35,6 +35,7 @@
     <modules>
         <module>blob-api</module>
         <module>blob-cassandra</module>
+        <module>blob-joining</module>
         <module>blob-memory</module>
         <module>blob-objectstorage</module>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to