JAMES-2583 new JoiningBlobStore
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b48c4915 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b48c4915 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b48c4915 Branch: refs/heads/master Commit: b48c491588f396981d679e6cdd80bb91334f6f7f Parents: 97a5eb2 Author: tran tien duc <dt...@linagora.com> Authored: Fri Nov 2 17:33:13 2018 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Wed Nov 7 14:12:19 2018 +0100 ---------------------------------------------------------------------- pom.xml | 5 + server/blob/blob-joining/pom.xml | 81 +++++++ .../james/blob/joining/JoiningBlobStore.java | 109 +++++++++ .../blob/joining/JoiningBlobStoreTest.java | 223 +++++++++++++++++++ server/blob/pom.xml | 1 + 5 files changed, 419 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 52f9740..f303f8f 100644 --- a/pom.xml +++ b/pom.xml @@ -1067,6 +1067,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>blob-joining</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>blob-objectstorage</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/pom.xml b/server/blob/blob-joining/pom.xml new file mode 100644 index 0000000..74349dd --- /dev/null +++ b/server/blob/blob-joining/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>james-server-blob</artifactId> + <groupId>org.apache.james</groupId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>blob-joining</artifactId> + <packaging>jar</packaging> + + <name>Apache James :: Server :: Blob :: Joining Blob Storages</name> + <description> + An implementation of BlobStore which relies on a primary and a secondary BlobStore by order for reading. + And using primary BlobStore for writing + </description> + + <dependencies> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-util</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java new file mode 100644 index 0000000..cd32a04 --- /dev/null +++ b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java @@ -0,0 +1,109 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.joining; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import org.apache.commons.io.IOUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; + +public class JoiningBlobStore implements BlobStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(JoiningBlobStore.class); + + private final BlobStore primaryBlobStore; + private final BlobStore secondaryBlobStore; + + @VisibleForTesting + JoiningBlobStore(BlobStore primaryBlobStore, BlobStore secondaryBlobStore) { + this.primaryBlobStore = primaryBlobStore; + this.secondaryBlobStore = secondaryBlobStore; + } + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + return primaryBlobStore.save(data); + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + return primaryBlobStore.save(data); + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + try { + return readBytesFallBackIfFailsOrEmptyResult(blobId); + } catch (Exception e) { + LOGGER.error("exception directly happens while readBytes, fall back to secondary blob store", e); + return secondaryBlobStore.readBytes(blobId); + } + } + + @Override + public InputStream read(BlobId blobId) { + try { + return readFallBackIfEmptyResult(blobId); + } catch (Exception e) { + LOGGER.error("exception directly happens while read, fall back to secondary blob store", e); + return secondaryBlobStore.read(blobId); + } + } + + private InputStream readFallBackIfEmptyResult(BlobId blobId) { + return Optional.ofNullable(primaryBlobStore.read(blobId)) + .map(Throwing.<InputStream, byte[]>function(IOUtils::toByteArray).sneakyThrow()) + .filter(this::hasContent) + .<InputStream>map(ByteArrayInputStream::new) + .orElseGet(() -> secondaryBlobStore.read(blobId)); + } + + private CompletableFuture<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) { + return primaryBlobStore.readBytes(blobId) + .thenApply(Optional::ofNullable) + .exceptionally(this::logAndReturnEmptyOptional) + .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, blobId)); + } + + private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) { + LOGGER.error("primary completed exceptionally, fall back to second blob store", throwable); + return Optional.empty(); + } + + private CompletableFuture<byte[]> readFromSecondaryIfNeeded(Optional<byte[]> readFromPrimaryResult, BlobId blodId) { + return readFromPrimaryResult + .filter(this::hasContent) + .map(CompletableFuture::completedFuture) + .orElseGet(() -> secondaryBlobStore.readBytes(blodId)); + } + + private boolean hasContent(byte [] bytes) { + return bytes.length > 0; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java new file mode 100644 index 0000000..b324cbd --- /dev/null +++ b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java @@ -0,0 +1,223 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.joining; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.memory.MemoryBlobStore; +import org.apache.james.util.CompletableFutureUtil; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class JoiningBlobStoreTest implements BlobStoreContract { + + private static class FutureThrowingBlobStore implements BlobStore { + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); + } + + @Override + public InputStream read(BlobId blobId) { + throw new RuntimeException("not supported"); + } + } + + private static class ThrowingBlobStore implements BlobStore { + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + throw new RuntimeException("not supported"); + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + throw new RuntimeException("not supported"); + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + throw new RuntimeException("not supported"); + } + + @Override + public InputStream read(BlobId blobId) { + throw new RuntimeException("not supported"); + } + } + + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + private static final byte [] BLOB_CONTENT = "blob content".getBytes(); + + private MemoryBlobStore primaryBlobStore; + private MemoryBlobStore secondaryBlobStore; + private JoiningBlobStore joiningBlobStore; + + @BeforeEach + void setup() { + primaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + joiningBlobStore = new JoiningBlobStore(primaryBlobStore, secondaryBlobStore); + } + + @Override + public BlobStore testee() { + return joiningBlobStore; + } + + @Override + public BlobId.Factory blobIdFactory() { + return BLOB_ID_FACTORY; + } + + @Nested + class PrimaryReadThrowsExceptionDirectly { + + @Test + void readShouldReturnFallbackToSecondaryWhenPrimaryGotException() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFallbackToSecondaryWhenPrimaryGotException() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + } + + @Nested + class PrimaryReadCompletesExceptionally { + + @Test + void readShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { + MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + } + + + @Test + void readShouldReturnFromPrimaryWhenAvailable() throws Exception { + BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws Exception { + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFromPrimaryWhenAvailable() throws Exception { + BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void readBytesShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws Exception { + BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); + + assertThat(joiningBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveShouldWriteToPrimary() throws Exception { + BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); + + assertThat(primaryBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveShouldNotWriteToSecondary() throws Exception { + BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); + + assertThat(secondaryBlobStore.readBytes(blobId).get()) + .isEmpty(); + } + + @Test + void saveInputStreamShouldWriteToPrimary() throws Exception { + BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + assertThat(primaryBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveInputStreamShouldNotWriteToSecondary() throws Exception { + BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + assertThat(secondaryBlobStore.readBytes(blobId).get()) + .isEmpty(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/b48c4915/server/blob/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/pom.xml b/server/blob/pom.xml index 1b674bc..131baa2 100644 --- a/server/blob/pom.xml +++ b/server/blob/pom.xml @@ -35,6 +35,7 @@ <modules> <module>blob-api</module> <module>blob-cassandra</module> + <module>blob-joining</module> <module>blob-memory</module> <module>blob-objectstorage</module> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org