JAMES-2541 Use MimeMessageStore as part of CassandraMailRepository

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6a293665
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6a293665
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6a293665

Branch: refs/heads/master
Commit: 6a2936653039202607f10f7ea046e6bf749d6950
Parents: 6d0f22d
Author: Benoit Tellier <[email protected]>
Authored: Thu Sep 6 13:54:31 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Mon Sep 10 17:17:41 2018 +0700

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 .../mailrepository-cassandra/pom.xml            |   2 +-
 .../cassandra/CassandraMailRepository.java      | 137 ++++---------------
 .../CassandraMailRepositoryProvider.java        |  11 +-
 .../cassandra/CassandraMailRepositoryTest.java  |   3 +-
 5 files changed, 44 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d59b2fc..37a3e37 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1422,6 +1422,11 @@
             </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
+                <artifactId>james-server-mail-store</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
                 <artifactId>james-server-onami</artifactId>
                 <version>${project.version}</version>
             </dependency>

http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/server/mailrepository/mailrepository-cassandra/pom.xml 
b/server/mailrepository/mailrepository-cassandra/pom.xml
index 19f03d0..479a90f 100644
--- a/server/mailrepository/mailrepository-cassandra/pom.xml
+++ b/server/mailrepository/mailrepository-cassandra/pom.xml
@@ -45,7 +45,7 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
-            <artifactId>blob-api</artifactId>
+            <artifactId>james-server-mail-store</artifactId>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>

http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
index 35d6383..d2c8735 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepository.java
@@ -19,37 +19,25 @@
 
 package org.apache.james.mailrepository.cassandra;
 
-import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.Properties;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
 import javax.mail.MessagingException;
-import javax.mail.Session;
 import javax.mail.internet.MimeMessage;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.Store;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.api.MailKey;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
-import org.apache.james.util.BodyOffsetInputStream;
-import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.mailet.Mail;
 
-import com.github.fge.lambdas.Throwing;
-import com.google.common.primitives.Bytes;
+import com.google.common.collect.ImmutableMap;
 
 public class CassandraMailRepository implements MailRepository {
 
@@ -57,94 +45,38 @@ public class CassandraMailRepository implements 
MailRepository {
     private final CassandraMailRepositoryKeysDAO keysDAO;
     private final CassandraMailRepositoryCountDAO countDAO;
     private final CassandraMailRepositoryMailDAO mailDAO;
-    private final BlobStore blobStore;
+    private final MimeMessageStore mimeMessageStore;
 
-    public CassandraMailRepository(MailRepositoryUrl url, 
CassandraMailRepositoryKeysDAO keysDAO, CassandraMailRepositoryCountDAO 
countDAO, CassandraMailRepositoryMailDAO mailDAO, BlobStore blobStore) {
+    public CassandraMailRepository(MailRepositoryUrl url, 
CassandraMailRepositoryKeysDAO keysDAO,
+                                   CassandraMailRepositoryCountDAO countDAO, 
CassandraMailRepositoryMailDAO mailDAO,
+                                   MimeMessageStore mimeMessageStore) {
         this.url = url;
         this.keysDAO = keysDAO;
         this.countDAO = countDAO;
         this.mailDAO = mailDAO;
-        this.blobStore = blobStore;
+        this.mimeMessageStore = mimeMessageStore;
     }
 
     @Override
     public MailKey store(Mail mail) throws MessagingException {
-        try {
-            MailKey mailKey = MailKey.forMail(mail);
-            Pair<byte[], byte[]> splitHeaderBody = 
splitHeaderBody(mail.getMessage());
-
-            CompletableFuture<Pair<BlobId, BlobId>> blobIds = 
CompletableFutureUtil.combine(
-                blobStore.save(splitHeaderBody.getLeft()),
-                blobStore.save(splitHeaderBody.getRight()),
-                Pair::of);
-
-            blobIds.thenCompose(Throwing.function(pair ->
-                mailDAO.store(url, mail, pair.getLeft(), pair.getRight())))
-                .thenCompose(any -> keysDAO.store(url, mailKey))
-                .thenCompose(this::increaseSizeIfStored)
-                .join();
-            return mailKey;
-        } catch (IOException e) {
-            throw new MessagingException("Exception while storing mail", e);
-        }
-    }
+        MailKey mailKey = MailKey.forMail(mail);
 
-    private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) {
-        if (isStored) {
-            return countDAO.increment(url);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private Pair<byte[], byte[]> splitHeaderBody(MimeMessage message) throws 
IOException, MessagingException {
-        byte[] messageAsArray = messageToArray(message);
-        int bodyStartOctet = computeBodyStartOctet(messageAsArray);
 
-        return Pair.of(
-            getHeaderBytes(messageAsArray, bodyStartOctet),
-            getBodyBytes(messageAsArray, bodyStartOctet));
-    }
-
-    private byte[] messageToArray(MimeMessage message) throws IOException, 
MessagingException {
-        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-        message.writeTo(byteArrayOutputStream);
-        return byteArrayOutputStream.toByteArray();
-    }
+        Map<Store.BlobType, BlobId> parts = 
mimeMessageStore.save(mail.getMessage());
 
-    private byte[] getHeaderBytes(byte[] messageContentAsArray, int 
bodyStartOctet) {
-        ByteBuffer headerContent = ByteBuffer.wrap(messageContentAsArray, 0, 
bodyStartOctet);
-        byte[] headerBytes = new byte[bodyStartOctet];
-        headerContent.get(headerBytes);
-        return headerBytes;
-    }
+        mailDAO.store(url, mail, parts.get(MimeMessageStore.HEADER_BLOB_TYPE), 
parts.get(MimeMessageStore.BODY_BLOB_TYPE))
+            .thenCompose(any -> keysDAO.store(url, mailKey))
+            .thenCompose(this::increaseSizeIfStored)
+            .join();
 
-    private byte[] getBodyBytes(byte[] messageContentAsArray, int 
bodyStartOctet) {
-        if (bodyStartOctet < messageContentAsArray.length) {
-            ByteBuffer bodyContent = ByteBuffer.wrap(messageContentAsArray,
-                bodyStartOctet,
-                messageContentAsArray.length - bodyStartOctet);
-            byte[] bodyBytes = new byte[messageContentAsArray.length - 
bodyStartOctet];
-            bodyContent.get(bodyBytes);
-            return bodyBytes;
-        } else {
-            return new byte[] {};
-        }
+        return mailKey;
     }
 
-    private int computeBodyStartOctet(byte[] messageAsArray) throws 
IOException {
-        try (BodyOffsetInputStream bodyOffsetInputStream =
-                 new BodyOffsetInputStream(new 
ByteArrayInputStream(messageAsArray))) {
-            consume(bodyOffsetInputStream);
-
-            if (bodyOffsetInputStream.getBodyStartOffset() == -1) {
-                return 0;
-            }
-            return (int) bodyOffsetInputStream.getBodyStartOffset();
+    private CompletionStage<Void> increaseSizeIfStored(Boolean isStored) {
+        if (isStored) {
+            return countDAO.increment(url);
         }
-    }
-
-    private void consume(InputStream in) throws IOException {
-        IOUtils.copy(in, NULL_OUTPUT_STREAM);
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
@@ -156,30 +88,21 @@ public class CassandraMailRepository implements 
MailRepository {
 
     @Override
     public Mail retrieve(MailKey key) {
-        return CompletableFutureUtil
-            .unwrap(mailDAO.read(url, key)
-                .thenApply(optional -> optional.map(this::toMail)))
+        return mailDAO.read(url, key)
+                .thenApply(optional -> optional.map(this::toMail))
             .join()
             .orElse(null);
     }
 
-    private CompletableFuture<Mail> 
toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
-        return CompletableFutureUtil.combine(
-            blobStore.readBytes(mailDTO.getHeaderBlobId()),
-            blobStore.readBytes(mailDTO.getBodyBlobId()),
-            Bytes::concat)
-            .thenApply(this::toMimeMessage)
-            .thenApply(mimeMessage -> mailDTO.getMailBuilder()
-                .mimeMessage(mimeMessage)
-                .build());
-    }
+    private Mail toMail(CassandraMailRepositoryMailDAO.MailDTO mailDTO) {
+        MimeMessage mimeMessage = mimeMessageStore
+            .read(ImmutableMap.of(
+                MimeMessageStore.HEADER_BLOB_TYPE, mailDTO.getHeaderBlobId(),
+                MimeMessageStore.BODY_BLOB_TYPE, mailDTO.getBodyBlobId()));
 
-    private MimeMessage toMimeMessage(byte[] bytes) {
-        try {
-            return new MimeMessage(Session.getInstance(new Properties()), new 
ByteArrayInputStream(bytes));
-        } catch (MessagingException e) {
-            throw new RuntimeException(e);
-        }
+        return mailDTO.getMailBuilder()
+            .mimeMessage(mimeMessage)
+            .build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java
 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java
index 1eb71bb..77d8ca4 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryProvider.java
@@ -21,7 +21,7 @@ package org.apache.james.mailrepository.cassandra;
 
 import javax.inject.Inject;
 
-import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryProvider;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
@@ -30,14 +30,15 @@ public class CassandraMailRepositoryProvider implements 
MailRepositoryProvider {
     private final CassandraMailRepositoryKeysDAO keysDAO;
     private final CassandraMailRepositoryCountDAO countDAO;
     private final CassandraMailRepositoryMailDAO mailDAO;
-    private final BlobStore blobStore;
+    private final MimeMessageStore mimeMessageStore;
 
     @Inject
-    public CassandraMailRepositoryProvider(CassandraMailRepositoryKeysDAO 
keysDAO, CassandraMailRepositoryCountDAO countDAO, 
CassandraMailRepositoryMailDAO mailDAO, BlobStore blobStore) {
+    public CassandraMailRepositoryProvider(CassandraMailRepositoryKeysDAO 
keysDAO, CassandraMailRepositoryCountDAO countDAO,
+                                           CassandraMailRepositoryMailDAO 
mailDAO, MimeMessageStore mimeMessageStore) {
         this.keysDAO = keysDAO;
         this.countDAO = countDAO;
         this.mailDAO = mailDAO;
-        this.blobStore = blobStore;
+        this.mimeMessageStore = mimeMessageStore;
     }
 
     @Override
@@ -47,6 +48,6 @@ public class CassandraMailRepositoryProvider implements 
MailRepositoryProvider {
 
     @Override
     public MailRepository provide(MailRepositoryUrl url) {
-        return new CassandraMailRepository(url, keysDAO, countDAO, mailDAO, 
blobStore);
+        return new CassandraMailRepository(url, keysDAO, countDAO, mailDAO, 
mimeMessageStore);
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/6a293665/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
index 7101376..8151a2a 100644
--- 
a/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
+++ 
b/server/mailrepository/mailrepository-cassandra/src/test/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryTest.java
@@ -26,6 +26,7 @@ import 
org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.cassandra.CassandraBlobModule;
 import org.apache.james.blob.cassandra.CassandraBlobsDAO;
+import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.mailrepository.MailRepositoryContract;
 import org.apache.james.mailrepository.api.MailRepository;
 import org.apache.james.mailrepository.api.MailRepositoryUrl;
@@ -61,7 +62,7 @@ class CassandraMailRepositoryTest implements 
MailRepositoryContract {
         CassandraBlobsDAO blobsDAO = new 
CassandraBlobsDAO(cassandra.getConf());
 
         cassandraMailRepository = new CassandraMailRepository(URL,
-            keysDAO, countDAO, mailDAO, blobsDAO);
+            keysDAO, countDAO, mailDAO, new MimeMessageStore(blobsDAO));
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to