JAMES-2525 changes ObjectStore#read to return an InputStream The original method is renamed `readBytes` name and all existing uses are changed to follow the rename.
The new method is implemented in the CassandraBlobsDAO and delegates to the existing implementation. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/86979704 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/86979704 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/86979704 Branch: refs/heads/master Commit: 86979704d7467a9910dd571655e54bd29aaef128 Parents: 23cbb7b Author: Jean Helou <[email protected]> Authored: Mon Aug 20 17:09:39 2018 +0200 Committer: Jean Helou <[email protected]> Committed: Thu Aug 23 14:18:49 2018 +0200 ---------------------------------------------------------------------- .../mail/CassandraAttachmentMapper.java | 2 +- .../cassandra/mail/CassandraMessageDAO.java | 2 +- .../migration/AttachmentV2MigrationTest.java | 4 +- .../org/apache/james/blob/api/ObjectStore.java | 5 +- .../james/blob/api/ObjectStoreException.java | 31 ++++++++++ .../james/blob/api/ObjectStoreContract.java | 61 +++++++++++++++++--- .../james/blob/cassandra/CassandraBlobsDAO.java | 31 +++++++++- .../blob/cassandra/CassandraBlobsDAOTest.java | 4 +- .../cassandra/CassandraMailRepository.java | 4 +- 9 files changed, 125 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index 678b10b..444ec85 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -87,7 +87,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { return CompletableFuture.completedFuture(Optional.empty()); } DAOAttachment daoAttachment = daoAttachmentOptional.get(); - return objectStore.read(daoAttachment.getBlobId()) + return objectStore.readBytes(daoAttachment.getBlobId()) .thenApply(bytes -> Optional.of(daoAttachment.toAttachment(bytes))); } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 8fe0284..a451adb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -372,7 +372,7 @@ public class CassandraMessageDAO { } private CompletableFuture<byte[]> getFieldContent(String field, Row row) { - return objectStore.read(blobIdFactory.from(row.getString(field))); + return objectStore.readBytes(blobIdFactory.from(row.getString(field))); } public static MessageResult notFound(ComposedMessageIdWithMetaData id) { http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java index 26bcf94..8a45267 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/AttachmentV2MigrationTest.java @@ -130,9 +130,9 @@ public class AttachmentV2MigrationTest { .contains(CassandraAttachmentDAOV2.from(attachment1, BLOB_ID_FACTORY.forPayload(attachment1.getBytes()))); assertThat(attachmentDAOV2.getAttachment(ATTACHMENT_ID_2).join()) .contains(CassandraAttachmentDAOV2.from(attachment2, BLOB_ID_FACTORY.forPayload(attachment2.getBytes()))); - assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join()) + assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment1.getBytes())).join()) .isEqualTo(attachment1.getBytes()); - assertThat(blobsDAO.read(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join()) + assertThat(blobsDAO.readBytes(BLOB_ID_FACTORY.forPayload(attachment2.getBytes())).join()) .isEqualTo(attachment2.getBytes()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java index c2bd88a..7582a2c 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStore.java @@ -18,11 +18,14 @@ ****************************************************************/ package org.apache.james.blob.api; +import java.io.InputStream; import java.util.concurrent.CompletableFuture; public interface ObjectStore { CompletableFuture<BlobId> save(byte[] data); - CompletableFuture<byte[]> read(BlobId blobId); + CompletableFuture<byte[]> readBytes(BlobId blobId); + + InputStream read(BlobId blobId); } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java new file mode 100644 index 0000000..624a8e0 --- /dev/null +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/ObjectStoreException.java @@ -0,0 +1,31 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.blob.api; + +public class ObjectStoreException extends RuntimeException { + + public ObjectStoreException(String message) { + super(message); + } + + public ObjectStoreException(String message, Throwable cause) { + super(message,cause); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java index bd4bd8e..bb51e14 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/ObjectStoreContract.java @@ -21,9 +21,12 @@ package org.apache.james.blob.api; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.charset.StandardCharsets; +import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.Test; import com.google.common.base.Strings; @@ -44,7 +47,7 @@ public interface ObjectStoreContract { default void saveShouldSaveEmptyData() throws Exception { BlobId blobId = testee().save(new byte[]{}).join(); - byte[] bytes = testee().read(blobId).join(); + byte[] bytes = testee().readBytes(blobId).join(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEmpty(); } @@ -57,39 +60,79 @@ public interface ObjectStoreContract { } @Test - default void readShouldBeEmptyWhenNoExisting() throws IOException { - byte[] bytes = testee().read(blobIdFactory().from("unknown")).join(); + default void readBytesShouldBeEmptyWhenNoExisting() throws IOException { + byte[] bytes = testee().readBytes(blobIdFactory().from("unknown")).join(); assertThat(bytes).isEmpty(); } @Test - default void readShouldReturnSavedData() throws IOException { + default void readBytesShouldReturnSavedData() throws IOException { BlobId blobId = testee().save("toto".getBytes(StandardCharsets.UTF_8)).join(); - byte[] bytes = testee().read(blobId).join(); + byte[] bytes = testee().readBytes(blobId).join(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("toto"); } @Test - default void readShouldReturnLongSavedData() throws IOException { + default void readBytesShouldReturnLongSavedData() throws IOException { String longString = Strings.repeat("0123456789\n", 1000); BlobId blobId = testee().save(longString.getBytes(StandardCharsets.UTF_8)).join(); - byte[] bytes = testee().read(blobId).join(); + byte[] bytes = testee().readBytes(blobId).join(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString); } @Test - default void readShouldReturnBigSavedData() throws IOException { + default void readBytesShouldReturnBigSavedData() throws IOException { // 12 MB of text String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024); BlobId blobId = testee().save(bigString.getBytes(StandardCharsets.UTF_8)).join(); - byte[] bytes = testee().read(blobId).join(); + byte[] bytes = testee().readBytes(blobId).join(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(bigString); } + + @Test + default void readShouldBeEmptyWhenNoExistingStream() throws IOException { + InputStream stream = testee().read(blobIdFactory().from("unknown")); + + assertThat(stream.read()).isEqualTo(IOUtils.EOF); + } + + @Test + default void readShouldReturnSavedData() throws IOException { + byte[] bytes = "toto".getBytes(StandardCharsets.UTF_8); + BlobId blobId = testee().save(bytes).join(); + + InputStream read = testee().read(blobId); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + } + + @Test + default void readShouldReturnLongSavedData() throws IOException { + String longString = Strings.repeat("0123456789\n", 1000); + byte[] bytes = longString.getBytes(StandardCharsets.UTF_8); + BlobId blobId = testee().save(bytes).join(); + + InputStream read = testee().read(blobId); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + } + + @Test + default void readShouldReturnBigSavedData() throws IOException { + // 12 MB of text + String bigString = Strings.repeat("0123456789\r\n", 1024 * 1024); + byte[] bytes = bigString.getBytes(StandardCharsets.UTF_8); + BlobId blobId = testee().save(bytes).join(); + + InputStream read = testee().read(blobId); + + assertThat(read).hasSameContentAs(new ByteArrayInputStream(bytes)); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java index 7c8eda8..3436d73 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobsDAO.java @@ -24,7 +24,11 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.Pipe; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.stream.IntStream; @@ -37,6 +41,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.ObjectStore; +import org.apache.james.blob.api.ObjectStoreException; import org.apache.james.blob.cassandra.BlobTable.BlobParts; import org.apache.james.blob.cassandra.utils.DataChunker; import org.apache.james.util.FluentFutureStream; @@ -47,6 +52,8 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; +import com.github.fge.lambdas.Throwing; +import com.github.fge.lambdas.consumers.ConsumerChainer; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -149,7 +156,7 @@ public class CassandraBlobsDAO implements ObjectStore { } @Override - public CompletableFuture<byte[]> read(BlobId blobId) { + public CompletableFuture<byte[]> readBytes(BlobId blobId) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(BlobTable.ID, blobId.asString())) @@ -209,4 +216,26 @@ public class CassandraBlobsDAO implements ObjectStore { this.row = row; } } + + @Override + public InputStream read(BlobId blobId) { + try { + Pipe pipe = Pipe.open(); + ConsumerChainer<ByteBuffer> consumer = Throwing.consumer( + bytes -> { + try (Pipe.SinkChannel sink = pipe.sink()) { + sink.write(bytes); + } + } + ); + readBytes(blobId) + .thenApply(ByteBuffer::wrap) + .thenAccept(consumer.sneakyThrow()); + return Channels.newInputStream(pipe.source()); + } catch (IOException cause) { + throw new ObjectStoreException( + "Failed to convert CompletableFuture<byte[]> to InputStream", + cause); + } + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java ---------------------------------------------------------------------- diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java index 07af72f..cb7db4d 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/CassandraBlobsDAOTest.java @@ -84,11 +84,11 @@ public class CassandraBlobsDAOTest implements ObjectStoreContract { } @Test - public void readShouldReturnSplitSavedDataByChunk() throws IOException { + public void readBytesShouldReturnSplitSavedDataByChunk() throws IOException { String longString = Strings.repeat("0123456789\n", MULTIPLE_CHUNK_SIZE); BlobId blobId = testee.save(longString.getBytes(StandardCharsets.UTF_8)).join(); - byte[] bytes = testee.read(blobId).join(); + byte[] bytes = testee.readBytes(blobId).join(); assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo(longString); } http://git-wip-us.apache.org/repos/asf/james-project/blob/86979704/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index c2e449d..9074481 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -158,8 +158,8 @@ public class CassandraMailRepository implements MailRepository { public CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { return CompletableFutureUtil.combine( - objectStore.read(mailDTO.getHeaderBlobId()), - objectStore.read(mailDTO.getBodyBlobId()), + objectStore.readBytes(mailDTO.getHeaderBlobId()), + objectStore.readBytes(mailDTO.getBodyBlobId()), Bytes::concat) .thenApply(this::toMimeMessage) .thenApply(mimeMessage -> mailDTO.getMailBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
