JAMES-2583 Change name from Joining to Union
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/12a26f95 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/12a26f95 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/12a26f95 Branch: refs/heads/master Commit: 12a26f9581e9cb23999cd4a66ebbb2435a8b3ab1 Parents: 4562413 Author: tran tien duc <dt...@linagora.com> Authored: Tue Nov 6 17:27:42 2018 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Wed Nov 7 14:12:19 2018 +0100 ---------------------------------------------------------------------- pom.xml | 10 +- server/blob/blob-joining/pom.xml | 81 ----- .../james/blob/joining/JoiningBlobStore.java | 153 --------- .../blob/joining/JoiningBlobStoreTest.java | 333 ------------------- server/blob/blob-union/pom.xml | 81 +++++ .../apache/james/blob/union/UnionBlobStore.java | 153 +++++++++ .../james/blob/union/UnionBlobStoreTest.java | 333 +++++++++++++++++++ server/blob/pom.xml | 2 +- 8 files changed, 573 insertions(+), 573 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f303f8f..59e606e 100644 --- a/pom.xml +++ b/pom.xml @@ -1067,11 +1067,6 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>blob-joining</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> <artifactId>blob-objectstorage</artifactId> <version>${project.version}</version> </dependency> @@ -1102,6 +1097,11 @@ <artifactId>blob-memory</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-union</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>${james.groupId}</groupId> <artifactId>event-sourcing-core</artifactId> http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-joining/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/pom.xml b/server/blob/blob-joining/pom.xml deleted file mode 100644 index 74349dd..0000000 --- a/server/blob/blob-joining/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>james-server-blob</artifactId> - <groupId>org.apache.james</groupId> - <version>3.2.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>blob-joining</artifactId> - <packaging>jar</packaging> - - <name>Apache James :: Server :: Blob :: Joining Blob Storages</name> - <description> - An implementation of BlobStore which relies on a primary and a secondary BlobStore by order for reading. - And using primary BlobStore for writing - </description> - - <dependencies> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>blob-api</artifactId> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>blob-api</artifactId> - <type>test-jar</type> - <scope>test</scope> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>blob-memory</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>james-server-testing</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>${james.groupId}</groupId> - <artifactId>james-server-util</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.junit.platform</groupId> - <artifactId>junit-platform-launcher</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java b/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java deleted file mode 100644 index aab4cdb..0000000 --- a/server/blob/blob-joining/src/main/java/org/apache/james/blob/joining/JoiningBlobStore.java +++ /dev/null @@ -1,153 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.blob.joining; - -import java.io.IOException; -import java.io.InputStream; -import java.io.PushbackInputStream; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; -import java.util.function.Supplier; - -import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BlobStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.fge.lambdas.Throwing; -import com.google.common.annotations.VisibleForTesting; - -public class JoiningBlobStore implements BlobStore { - - private static final Logger LOGGER = LoggerFactory.getLogger(JoiningBlobStore.class); - private static final int UN_AVAILABLE = -1; - - private final BlobStore primaryBlobStore; - private final BlobStore secondaryBlobStore; - - @VisibleForTesting - JoiningBlobStore(BlobStore primaryBlobStore, BlobStore secondaryBlobStore) { - this.primaryBlobStore = primaryBlobStore; - this.secondaryBlobStore = secondaryBlobStore; - } - - @Override - public CompletableFuture<BlobId> save(byte[] data) { - try { - return saveToPrimaryFallbackIfFails( - primaryBlobStore.save(data), - () -> secondaryBlobStore.save(data)); - } catch (Exception e) { - LOGGER.error("exception directly happens while saving bytes data, fall back to secondary blob store", e); - return secondaryBlobStore.save(data); - } - } - - @Override - public CompletableFuture<BlobId> save(InputStream data) { - try { - return saveToPrimaryFallbackIfFails( - primaryBlobStore.save(data), - () -> secondaryBlobStore.save(data)); - } catch (Exception e) { - LOGGER.error("exception directly happens while saving InputStream data, fall back to secondary blob store", e); - return secondaryBlobStore.save(data); - } - } - - @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - try { - return readBytesFallBackIfFailsOrEmptyResult(blobId); - } catch (Exception e) { - LOGGER.error("exception directly happens while readBytes, fall back to secondary blob store", e); - return secondaryBlobStore.readBytes(blobId); - } - } - - @Override - public InputStream read(BlobId blobId) { - try { - return readFallBackIfEmptyResult(blobId); - } catch (Exception e) { - LOGGER.error("exception directly happens while read, fall back to secondary blob store", e); - return secondaryBlobStore.read(blobId); - } - } - - private InputStream readFallBackIfEmptyResult(BlobId blobId) { - return Optional.ofNullable(primaryBlobStore.read(blobId)) - .map(PushbackInputStream::new) - .filter(Throwing.predicate(this::streamHasContent).sneakyThrow()) - .<InputStream>map(Function.identity()) - .orElseGet(() -> secondaryBlobStore.read(blobId)); - } - - @VisibleForTesting - boolean streamHasContent(PushbackInputStream pushBackIS) throws IOException { - int byteRead = pushBackIS.read(); - if (byteRead != UN_AVAILABLE) { - pushBackIS.unread(byteRead); - return true; - } - return false; - } - - private CompletableFuture<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) { - return primaryBlobStore.readBytes(blobId) - .thenApply(Optional::ofNullable) - .exceptionally(this::logAndReturnEmptyOptional) - .thenCompose(maybeBytes -> readFromSecondaryIfNeeded(maybeBytes, blobId)); - } - - private CompletableFuture<BlobId> saveToPrimaryFallbackIfFails( - CompletableFuture<BlobId> primarySavingOperation, - Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) { - - return primarySavingOperation - .thenApply(Optional::ofNullable) - .exceptionally(this::logAndReturnEmptyOptional) - .thenCompose(maybeBlobId -> saveToSecondaryIfNeeded(maybeBlobId, fallbackSavingOperationSupplier)); - } - - private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) { - LOGGER.error("primary completed exceptionally, fall back to second blob store", throwable); - return Optional.empty(); - } - - private CompletableFuture<BlobId> saveToSecondaryIfNeeded(Optional<BlobId> maybeBlobId, - Supplier<CompletableFuture<BlobId>> saveToSecondarySupplier) { - return maybeBlobId - .map(CompletableFuture::completedFuture) - .orElseGet(saveToSecondarySupplier); - } - - private CompletableFuture<byte[]> readFromSecondaryIfNeeded(Optional<byte[]> readFromPrimaryResult, BlobId blodId) { - return readFromPrimaryResult - .filter(this::hasContent) - .map(CompletableFuture::completedFuture) - .orElseGet(() -> secondaryBlobStore.readBytes(blodId)); - } - - private boolean hasContent(byte [] bytes) { - return bytes.length > 0; - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java b/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java deleted file mode 100644 index 329dd92..0000000 --- a/server/blob/blob-joining/src/test/java/org/apache/james/blob/joining/JoiningBlobStoreTest.java +++ /dev/null @@ -1,333 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.blob.joining; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.PushbackInputStream; -import java.util.concurrent.CompletableFuture; - -import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BlobStoreContract; -import org.apache.james.blob.api.HashBlobId; -import org.apache.james.blob.memory.MemoryBlobStore; -import org.apache.james.util.CompletableFutureUtil; -import org.assertj.core.api.SoftAssertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -class JoiningBlobStoreTest implements BlobStoreContract { - - private static class FutureThrowingBlobStore implements BlobStore { - - @Override - public CompletableFuture<BlobId> save(byte[] data) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); - } - - @Override - public CompletableFuture<BlobId> save(InputStream data) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); - } - - @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("not supported")); - } - - @Override - public InputStream read(BlobId blobId) { - throw new RuntimeException("not supported"); - } - } - - private static class ThrowingBlobStore implements BlobStore { - - @Override - public CompletableFuture<BlobId> save(byte[] data) { - throw new RuntimeException("not supported"); - } - - @Override - public CompletableFuture<BlobId> save(InputStream data) { - throw new RuntimeException("not supported"); - } - - @Override - public CompletableFuture<byte[]> readBytes(BlobId blobId) { - throw new RuntimeException("not supported"); - } - - @Override - public InputStream read(BlobId blobId) { - throw new RuntimeException("not supported"); - } - } - - private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); - private static final byte [] BLOB_CONTENT = "blob content".getBytes(); - - private MemoryBlobStore primaryBlobStore; - private MemoryBlobStore secondaryBlobStore; - private JoiningBlobStore joiningBlobStore; - - @BeforeEach - void setup() { - primaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - joiningBlobStore = new JoiningBlobStore(primaryBlobStore, secondaryBlobStore); - } - - @Override - public BlobStore testee() { - return joiningBlobStore; - } - - @Override - public BlobId.Factory blobIdFactory() { - return BLOB_ID_FACTORY; - } - - @Nested - class PrimarySaveThrowsExceptionDirectly { - - @Test - void saveShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - softly.assertThat(secondaryBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - }); - } - - @Test - void saveInputStreamShouldFallBackToSecondaryWhenPrimaryGotException() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - softly.assertThat(secondaryBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - }); - } - } - - @Nested - class PrimarySaveCompletesExceptionally { - - @Test - void saveShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - softly.assertThat(secondaryBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - }); - } - - @Test - void saveInputStreamShouldFallBackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); - - SoftAssertions.assertSoftly(softly -> { - softly.assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - softly.assertThat(secondaryBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - }); - } - - } - - @Nested - class PrimaryReadThrowsExceptionDirectly { - - @Test - void readShouldReturnFallbackToSecondaryWhenPrimaryGotException() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - } - - @Test - void readBytesShouldReturnFallbackToSecondaryWhenPrimaryGotException() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new ThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - - } - - @Nested - class PrimaryReadCompletesExceptionally { - - @Test - void readShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - } - - @Test - void readBytesShouldReturnFallbackToSecondaryWhenPrimaryCompletedExceptionally() throws Exception { - MemoryBlobStore secondaryBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); - JoiningBlobStore joiningBlobStore = new JoiningBlobStore(new FutureThrowingBlobStore(), secondaryBlobStore); - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - } - - - @Test - void readShouldReturnFromPrimaryWhenAvailable() throws Exception { - BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - } - - @Test - void readShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws Exception { - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.read(blobId)) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - } - - @Test - void readBytesShouldReturnFromPrimaryWhenAvailable() throws Exception { - BlobId blobId = primaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - - @Test - void readBytesShouldReturnFromSecondaryWhenPrimaryNotAvailable() throws Exception { - BlobId blobId = secondaryBlobStore.save(BLOB_CONTENT).get(); - - assertThat(joiningBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - - @Test - void saveShouldWriteToPrimary() throws Exception { - BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); - - assertThat(primaryBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - - @Test - void saveShouldNotWriteToSecondary() throws Exception { - BlobId blobId = joiningBlobStore.save(BLOB_CONTENT).get(); - - assertThat(secondaryBlobStore.readBytes(blobId).get()) - .isEmpty(); - } - - @Test - void saveInputStreamShouldWriteToPrimary() throws Exception { - BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); - - assertThat(primaryBlobStore.readBytes(blobId).get()) - .isEqualTo(BLOB_CONTENT); - } - - @Test - void saveInputStreamShouldNotWriteToSecondary() throws Exception { - BlobId blobId = joiningBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); - - assertThat(secondaryBlobStore.readBytes(blobId).get()) - .isEmpty(); - } - - @Test - void streamHasContentShouldReturnTrueWhenStreamHasContent() throws Exception { - PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(BLOB_CONTENT)); - - assertThat(joiningBlobStore.streamHasContent(pushBackIS)) - .isTrue(); - } - - @Test - void streamHasContentShouldReturnFalseWhenStreamHasNoContent() throws Exception { - PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); - - assertThat(joiningBlobStore.streamHasContent(pushBackIS)) - .isFalse(); - } - - @Test - void streamHasContentShouldNotThrowWhenStreamHasNoContent() { - PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); - - assertThatCode(() -> joiningBlobStore.streamHasContent(pushBackIS)) - .doesNotThrowAnyException(); - } - - @Test - void streamHasContentShouldNotDrainPushBackStreamContent() throws Exception { - PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(BLOB_CONTENT)); - joiningBlobStore.streamHasContent(pushBackIS); - - assertThat(pushBackIS) - .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); - } - - @Test - void streamHasContentShouldKeepStreamEmptyWhenStreamIsEmpty() throws Exception { - PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); - joiningBlobStore.streamHasContent(pushBackIS); - - assertThat(pushBackIS) - .hasSameContentAs(new ByteArrayInputStream(new byte[0])); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-union/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/pom.xml b/server/blob/blob-union/pom.xml new file mode 100644 index 0000000..5a6e84f --- /dev/null +++ b/server/blob/blob-union/pom.xml @@ -0,0 +1,81 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>james-server-blob</artifactId> + <groupId>org.apache.james</groupId> + <version>3.2.0-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + + <artifactId>blob-union</artifactId> + <packaging>jar</packaging> + + <name>Apache James :: Server :: Blob :: Union Blob Storage</name> + <description> + An implementation of BlobStore which relies on a current and a legacy BlobStore by order for reading and writing + blobs with fallback mechanism. + </description> + + <dependencies> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-testing</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-util</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java new file mode 100644 index 0000000..9d87213 --- /dev/null +++ b/server/blob/blob-union/src/main/java/org/apache/james/blob/union/UnionBlobStore.java @@ -0,0 +1,153 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.union; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; + +public class UnionBlobStore implements BlobStore { + + private static final Logger LOGGER = LoggerFactory.getLogger(UnionBlobStore.class); + private static final int UNAVAILABLE = -1; + + private final BlobStore currentBlobStore; + private final BlobStore legacyBlobStore; + + @VisibleForTesting + UnionBlobStore(BlobStore currentBlobStore, BlobStore legacyBlobStore) { + this.currentBlobStore = currentBlobStore; + this.legacyBlobStore = legacyBlobStore; + } + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + try { + return saveToCurrentFallbackIfFails( + currentBlobStore.save(data), + () -> legacyBlobStore.save(data)); + } catch (Exception e) { + LOGGER.error("exception directly happens while saving bytes data, fall back to legacy blob store", e); + return legacyBlobStore.save(data); + } + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + try { + return saveToCurrentFallbackIfFails( + currentBlobStore.save(data), + () -> legacyBlobStore.save(data)); + } catch (Exception e) { + LOGGER.error("exception directly happens while saving InputStream data, fall back to legacy blob store", e); + return legacyBlobStore.save(data); + } + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + try { + return readBytesFallBackIfFailsOrEmptyResult(blobId); + } catch (Exception e) { + LOGGER.error("exception directly happens while readBytes, fall back to legacy blob store", e); + return legacyBlobStore.readBytes(blobId); + } + } + + @Override + public InputStream read(BlobId blobId) { + try { + return readFallBackIfEmptyResult(blobId); + } catch (Exception e) { + LOGGER.error("exception directly happens while read, fall back to legacy blob store", e); + return legacyBlobStore.read(blobId); + } + } + + private InputStream readFallBackIfEmptyResult(BlobId blobId) { + return Optional.ofNullable(currentBlobStore.read(blobId)) + .map(PushbackInputStream::new) + .filter(Throwing.predicate(this::streamHasContent).sneakyThrow()) + .<InputStream>map(Function.identity()) + .orElseGet(() -> legacyBlobStore.read(blobId)); + } + + @VisibleForTesting + boolean streamHasContent(PushbackInputStream pushBackIS) throws IOException { + int byteRead = pushBackIS.read(); + if (byteRead != UNAVAILABLE) { + pushBackIS.unread(byteRead); + return true; + } + return false; + } + + private CompletableFuture<byte[]> readBytesFallBackIfFailsOrEmptyResult(BlobId blobId) { + return currentBlobStore.readBytes(blobId) + .thenApply(Optional::ofNullable) + .exceptionally(this::logAndReturnEmptyOptional) + .thenCompose(maybeBytes -> readFromLegacyIfNeeded(maybeBytes, blobId)); + } + + private CompletableFuture<BlobId> saveToCurrentFallbackIfFails( + CompletableFuture<BlobId> currentSavingOperation, + Supplier<CompletableFuture<BlobId>> fallbackSavingOperationSupplier) { + + return currentSavingOperation + .thenApply(Optional::ofNullable) + .exceptionally(this::logAndReturnEmptyOptional) + .thenCompose(maybeBlobId -> saveToLegacyIfNeeded(maybeBlobId, fallbackSavingOperationSupplier)); + } + + private <T> Optional<T> logAndReturnEmptyOptional(Throwable throwable) { + LOGGER.error("error happens from current blob store, fall back to legacy blob store", throwable); + return Optional.empty(); + } + + private CompletableFuture<BlobId> saveToLegacyIfNeeded(Optional<BlobId> maybeBlobId, + Supplier<CompletableFuture<BlobId>> saveToLegacySupplier) { + return maybeBlobId + .map(CompletableFuture::completedFuture) + .orElseGet(saveToLegacySupplier); + } + + private CompletableFuture<byte[]> readFromLegacyIfNeeded(Optional<byte[]> readFromCurrentResult, BlobId blodId) { + return readFromCurrentResult + .filter(this::hasContent) + .map(CompletableFuture::completedFuture) + .orElseGet(() -> legacyBlobStore.readBytes(blodId)); + } + + private boolean hasContent(byte [] bytes) { + return bytes.length > 0; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java new file mode 100644 index 0000000..9afa0f9 --- /dev/null +++ b/server/blob/blob-union/src/test/java/org/apache/james/blob/union/UnionBlobStoreTest.java @@ -0,0 +1,333 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.union; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.util.concurrent.CompletableFuture; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.memory.MemoryBlobStore; +import org.apache.james.util.CompletableFutureUtil; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class UnionBlobStoreTest implements BlobStoreContract { + + private static class FutureThrowingBlobStore implements BlobStore { + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + return CompletableFutureUtil.exceptionallyFuture(new RuntimeException("broken everywhere")); + } + + @Override + public InputStream read(BlobId blobId) { + throw new RuntimeException("broken everywhere"); + } + } + + private static class ThrowingBlobStore implements BlobStore { + + @Override + public CompletableFuture<BlobId> save(byte[] data) { + throw new RuntimeException("broken everywhere"); + } + + @Override + public CompletableFuture<BlobId> save(InputStream data) { + throw new RuntimeException("broken everywhere"); + } + + @Override + public CompletableFuture<byte[]> readBytes(BlobId blobId) { + throw new RuntimeException("broken everywhere"); + } + + @Override + public InputStream read(BlobId blobId) { + throw new RuntimeException("broken everywhere"); + } + } + + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + private static final byte [] BLOB_CONTENT = "blob content".getBytes(); + + private MemoryBlobStore currentBlobStore; + private MemoryBlobStore legacyBlobStore; + private UnionBlobStore unionBlobStore; + + @BeforeEach + void setup() { + currentBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + unionBlobStore = new UnionBlobStore(currentBlobStore, legacyBlobStore); + } + + @Override + public BlobStore testee() { + return unionBlobStore; + } + + @Override + public BlobId.Factory blobIdFactory() { + return BLOB_ID_FACTORY; + } + + @Nested + class CurrentSaveThrowsExceptionDirectly { + + @Test + void saveShouldFallBackToLegacyWhenCurrentGotException() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new ThrowingBlobStore(), legacyBlobStore); + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(legacyBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + @Test + void saveInputStreamShouldFallBackToLegacyWhenCurrentGotException() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new ThrowingBlobStore(), legacyBlobStore); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(legacyBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + } + + @Nested + class CurrentSaveCompletesExceptionally { + + @Test + void saveShouldFallBackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new FutureThrowingBlobStore(), legacyBlobStore); + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(legacyBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + @Test + void saveInputStreamShouldFallBackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new FutureThrowingBlobStore(), legacyBlobStore); + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + softly.assertThat(legacyBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + }); + } + + } + + @Nested + class CurrentReadThrowsExceptionDirectly { + + @Test + void readShouldReturnFallbackToLegacyWhenCurrentGotException() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new ThrowingBlobStore(), legacyBlobStore); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFallbackToLegacyWhenCurrentGotException() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new ThrowingBlobStore(), legacyBlobStore); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + } + + @Nested + class CurrentReadCompletesExceptionally { + + @Test + void readShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new FutureThrowingBlobStore(), legacyBlobStore); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFallbackToLegacyWhenCurrentCompletedExceptionally() throws Exception { + MemoryBlobStore legacyBlobStore = new MemoryBlobStore(BLOB_ID_FACTORY); + UnionBlobStore unionBlobStore = new UnionBlobStore(new FutureThrowingBlobStore(), legacyBlobStore); + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + } + + + @Test + void readShouldReturnFromCurrentWhenAvailable() throws Exception { + BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readShouldReturnFromLegacyWhenCurrentNotAvailable() throws Exception { + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.read(blobId)) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void readBytesShouldReturnFromCurrentWhenAvailable() throws Exception { + BlobId blobId = currentBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void readBytesShouldReturnFromLegacyWhenCurrentNotAvailable() throws Exception { + BlobId blobId = legacyBlobStore.save(BLOB_CONTENT).get(); + + assertThat(unionBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveShouldWriteToCurrent() throws Exception { + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + + assertThat(currentBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveShouldNotWriteToLegacy() throws Exception { + BlobId blobId = unionBlobStore.save(BLOB_CONTENT).get(); + + assertThat(legacyBlobStore.readBytes(blobId).get()) + .isEmpty(); + } + + @Test + void saveInputStreamShouldWriteToCurrent() throws Exception { + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + assertThat(currentBlobStore.readBytes(blobId).get()) + .isEqualTo(BLOB_CONTENT); + } + + @Test + void saveInputStreamShouldNotWriteToLegacy() throws Exception { + BlobId blobId = unionBlobStore.save(new ByteArrayInputStream(BLOB_CONTENT)).get(); + + assertThat(legacyBlobStore.readBytes(blobId).get()) + .isEmpty(); + } + + @Test + void streamHasContentShouldReturnTrueWhenStreamHasContent() throws Exception { + PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(BLOB_CONTENT)); + + assertThat(unionBlobStore.streamHasContent(pushBackIS)) + .isTrue(); + } + + @Test + void streamHasContentShouldReturnFalseWhenStreamHasNoContent() throws Exception { + PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); + + assertThat(unionBlobStore.streamHasContent(pushBackIS)) + .isFalse(); + } + + @Test + void streamHasContentShouldNotThrowWhenStreamHasNoContent() { + PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); + + assertThatCode(() -> unionBlobStore.streamHasContent(pushBackIS)) + .doesNotThrowAnyException(); + } + + @Test + void streamHasContentShouldNotDrainPushBackStreamContent() throws Exception { + PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(BLOB_CONTENT)); + unionBlobStore.streamHasContent(pushBackIS); + + assertThat(pushBackIS) + .hasSameContentAs(new ByteArrayInputStream(BLOB_CONTENT)); + } + + @Test + void streamHasContentShouldKeepStreamEmptyWhenStreamIsEmpty() throws Exception { + PushbackInputStream pushBackIS = new PushbackInputStream(new ByteArrayInputStream(new byte[0])); + unionBlobStore.streamHasContent(pushBackIS); + + assertThat(pushBackIS) + .hasSameContentAs(new ByteArrayInputStream(new byte[0])); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/12a26f95/server/blob/pom.xml ---------------------------------------------------------------------- diff --git a/server/blob/pom.xml b/server/blob/pom.xml index 131baa2..852d054 100644 --- a/server/blob/pom.xml +++ b/server/blob/pom.xml @@ -35,9 +35,9 @@ <modules> <module>blob-api</module> <module>blob-cassandra</module> - <module>blob-joining</module> <module>blob-memory</module> <module>blob-objectstorage</module> + <module>blob-union</module> <module>mail-store</module> </modules> --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org