JAMES-2541 Use MimeMessageStore as part of CassandraMailRepository
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6a293665 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6a293665 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6a293665 Branch: refs/heads/master Commit: 6a2936653039202607f10f7ea046e6bf749d6950 Parents: 6d0f22d Author: Benoit Tellier <[email protected]> Authored: Thu Sep 6 13:54:31 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Sep 10 17:17:41 2018 +0700 ---------------------------------------------------------------------- pom.xml | 5 + .../mailrepository-cassandra/pom.xml | 2 +- .../cassandra/CassandraMailRepository.java | 137 ++++--------------- .../CassandraMailRepositoryProvider.java | 11 +- .../cassandra/CassandraMailRepositoryTest.java | 3 +- 5 files changed, 44 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d59b2fc..37a3e37 100644 --- a/pom.xml +++ b/pom.xml @@ -1422,6 +1422,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-mail-store</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-onami</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml b/server/mailrepository/mailrepository-cassandra/pom.xml index 19f03d0..479a90f 100644 --- a/server/mailrepository/mailrepository-cassandra/pom.xml +++ b/server/mailrepository/mailrepository-cassandra/pom.xml @@ -45,7 +45,7 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> - <artifactId>blob-api</artifactId> + <artifactId>james-server-mail-store</artifactId> </dependency> <dependency> <groupId>${james.groupId}</groupId> http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java index 35d6383..d2c8735 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java @@ -19,37 +19,25 @@ package org.apache.james.mailrepository.cassandra; -import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.Iterator; -import java.util.Properties; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import javax.mail.MessagingException; -import javax.mail.Session; import javax.mail.internet.MimeMessage; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.Store; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.mailrepository.api.MailKey; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; -import org.apache.james.util.BodyOffsetInputStream; -import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.apache.mailet.Mail; -import com.github.fge.lambdas.Throwing; -import com.google.common.primitives.Bytes; +import com.google.common.collect.ImmutableMap; public class CassandraMailRepository implements MailRepository { @@ -57,94 +45,38 @@ public class CassandraMailRepository implements MailRepository { private final CassandraMailRepositoryKeysDAO keysDAO; private final CassandraMailRepositoryCountDAO countDAO; private final CassandraMailRepositoryMailDAO mailDAO; - private final BlobStore blobStore; + private final MimeMessageStore mimeMessageStore; - public CassandraMailRepository(MailRepositoryUrl url, CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO countDAO, CassandraMailRepositoryMailDAO mailDAO, BlobStore blobStore) { + public CassandraMailRepository(MailRepositoryUrl url, CassandraMailRepositoryKeysDAO keysDAO, + CassandraMailRepositoryCountDAO countDAO, CassandraMailRepositoryMailDAO mailDAO, + MimeMessageStore mimeMessageStore) { this.url = url; this.keysDAO = keysDAO; this.countDAO = countDAO; this.mailDAO = mailDAO; - this.blobStore = blobStore; + this.mimeMessageStore = mimeMessageStore; } @Override public MailKey store(Mail mail) throws MessagingException { - try { - MailKey mailKey = MailKey.forMail(mail); - Pair<byte[], byte[]> splitHeaderBody = splitHeaderBody(mail.getMessage()); - - CompletableFuture<Pair<BlobId, BlobId>> blobIds = CompletableFutureUtil.combine( - blobStore.save(splitHeaderBody.getLeft()), - blobStore.save(splitHeaderBody.getRight()), - Pair::of); - - blobIds.thenCompose(Throwing.function(pair -> - mailDAO.store(url, mail, pair.getLeft(), pair.getRight()))) - .thenCompose(any -> keysDAO.store(url, mailKey)) - .thenCompose(this::increaseSizeIfStored) - .join(); - return mailKey; - } catch (IOException e) { - throw new MessagingException("Exception while storing mail", e); - } - } + MailKey mailKey = MailKey.forMail(mail); - private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) { - if (isStored) { - return countDAO.increment(url); - } - return CompletableFuture.completedFuture(null); - } - - private Pair<byte[], byte[]> splitHeaderBody(MimeMessage message) throws IOException, MessagingException { - byte[] messageAsArray = messageToArray(message); - int bodyStartOctet = computeBodyStartOctet(messageAsArray); - return Pair.of( - getHeaderBytes(messageAsArray, bodyStartOctet), - getBodyBytes(messageAsArray, bodyStartOctet)); - } - - private byte[] messageToArray(MimeMessage message) throws IOException, MessagingException { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - message.writeTo(byteArrayOutputStream); - return byteArrayOutputStream.toByteArray(); - } + Map<Store.BlobType, BlobId> parts = mimeMessageStore.save(mail.getMessage()); - private byte[] getHeaderBytes(byte[] messageContentAsArray, int bodyStartOctet) { - ByteBuffer headerContent = ByteBuffer.wrap(messageContentAsArray, 0, bodyStartOctet); - byte[] headerBytes = new byte[bodyStartOctet]; - headerContent.get(headerBytes); - return headerBytes; - } + mailDAO.store(url, mail, parts.get(MimeMessageStore.HEADER_BLOB_TYPE), parts.get(MimeMessageStore.BODY_BLOB_TYPE)) + .thenCompose(any -> keysDAO.store(url, mailKey)) + .thenCompose(this::increaseSizeIfStored) + .join(); - private byte[] getBodyBytes(byte[] messageContentAsArray, int bodyStartOctet) { - if (bodyStartOctet < messageContentAsArray.length) { - ByteBuffer bodyContent = ByteBuffer.wrap(messageContentAsArray, - bodyStartOctet, - messageContentAsArray.length - bodyStartOctet); - byte[] bodyBytes = new byte[messageContentAsArray.length - bodyStartOctet]; - bodyContent.get(bodyBytes); - return bodyBytes; - } else { - return new byte[] {}; - } + return mailKey; } - private int computeBodyStartOctet(byte[] messageAsArray) throws IOException { - try (BodyOffsetInputStream bodyOffsetInputStream = - new BodyOffsetInputStream(new ByteArrayInputStream(messageAsArray))) { - consume(bodyOffsetInputStream); - - if (bodyOffsetInputStream.getBodyStartOffset() == -1) { - return 0; - } - return (int) bodyOffsetInputStream.getBodyStartOffset(); + private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) { + if (isStored) { + return countDAO.increment(url); } - } - - private void consume(InputStream in) throws IOException { - IOUtils.copy(in, NULL_OUTPUT_STREAM); + return CompletableFuture.completedFuture(null); } @Override @@ -156,30 +88,21 @@ public class CassandraMailRepository implements MailRepository { @Override public Mail retrieve(MailKey key) { - return CompletableFutureUtil - .unwrap(mailDAO.read(url, key) - .thenApply(optional -> optional.map(this::toMail))) + return mailDAO.read(url, key) + .thenApply(optional -> optional.map(this::toMail)) .join() .orElse(null); } - private CompletableFuture<Mail> toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { - return CompletableFutureUtil.combine( - blobStore.readBytes(mailDTO.getHeaderBlobId()), - blobStore.readBytes(mailDTO.getBodyBlobId()), - Bytes::concat) - .thenApply(this::toMimeMessage) - .thenApply(mimeMessage -> mailDTO.getMailBuilder() - .mimeMessage(mimeMessage) - .build()); - } + private Mail toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) { + MimeMessage mimeMessage = mimeMessageStore + .read(ImmutableMap.of( + MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(), + MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId())); - private MimeMessage toMimeMessage(byte[] bytes) { - try { - return new MimeMessage(Session.getInstance(new Properties()), new ByteArrayInputStream(bytes)); - } catch (MessagingException e) { - throw new RuntimeException(e); - } + return mailDTO.getMailBuilder() + .mimeMessage(mimeMessage) + .build(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java index 1eb71bb..77d8ca4 100644 --- a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java +++ b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java @@ -21,7 +21,7 @@ package org.apache.james.mailrepository.cassandra; import javax.inject.Inject; -import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryProvider; import org.apache.james.mailrepository.api.MailRepositoryUrl; @@ -30,14 +30,15 @@ public class CassandraMailRepositoryProvider implements MailRepositoryProvider { private final CassandraMailRepositoryKeysDAO keysDAO; private final CassandraMailRepositoryCountDAO countDAO; private final CassandraMailRepositoryMailDAO mailDAO; - private final BlobStore blobStore; + private final MimeMessageStore mimeMessageStore; @Inject - public CassandraMailRepositoryProvider(CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO countDAO, CassandraMailRepositoryMailDAO mailDAO, BlobStore blobStore) { + public CassandraMailRepositoryProvider(CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO countDAO, + CassandraMailRepositoryMailDAO mailDAO, MimeMessageStore mimeMessageStore) { this.keysDAO = keysDAO; this.countDAO = countDAO; this.mailDAO = mailDAO; - this.blobStore = blobStore; + this.mimeMessageStore = mimeMessageStore; } @Override @@ -47,6 +48,6 @@ public class CassandraMailRepositoryProvider implements MailRepositoryProvider { @Override public MailRepository provide(MailRepositoryUrl url) { - return new CassandraMailRepository(url, keysDAO, countDAO, mailDAO, blobStore); + return new CassandraMailRepository(url, keysDAO, countDAO, mailDAO, mimeMessageStore); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java ---------------------------------------------------------------------- diff --git a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java index 7101376..8151a2a 100644 --- a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java +++ b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java @@ -26,6 +26,7 @@ import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.cassandra.CassandraBlobModule; import org.apache.james.blob.cassandra.CassandraBlobsDAO; +import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.mailrepository.MailRepositoryContract; import org.apache.james.mailrepository.api.MailRepository; import org.apache.james.mailrepository.api.MailRepositoryUrl; @@ -61,7 +62,7 @@ class CassandraMailRepositoryTest implements MailRepositoryContract { CassandraBlobsDAO blobsDAO = new CassandraBlobsDAO(cassandra.getConf()); cassandraMailRepository = new CassandraMailRepository(URL, - keysDAO, countDAO, mailDAO, blobsDAO); + keysDAO, countDAO, mailDAO, new MimeMessageStore(blobsDAO)); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
