JAMES-2082 Factorize attachment loading by giving more responsibilities to 
Attachment loader

Code is then factorized between CassandraMessageMapper CassandraMessageIdMapper 
and V1ToV2Migration


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6091e38a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6091e38a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6091e38a

Branch: refs/heads/master
Commit: 6091e38aca468b0ac6ce86a4e17868f3eeeec68c
Parents: 2a4987d
Author: benwa <btell...@linagora.com>
Authored: Thu Jul 6 13:19:03 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:23:56 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/AttachmentLoader.java        | 30 ++++++------
 .../mail/CassandraMessageIdMapper.java          | 49 ++++++--------------
 .../cassandra/mail/CassandraMessageMapper.java  | 12 +----
 .../mail/migration/V1ToV2Migration.java         |  8 +++-
 4 files changed, 39 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index 59933be..17c4539 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -18,7 +18,6 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra.mail;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,12 +29,14 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageAttachment;
+import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 public class AttachmentLoader {
@@ -46,21 +47,24 @@ public class AttachmentLoader {
         this.attachmentMapper = attachmentMapper;
     }
 
-    public CompletableFuture<Stream<SimpleMailboxMessage>> 
toMailboxMessageWithAttachments(
-                CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>> messageRepresentations) {
+    public CompletableFuture<Stream<SimpleMailboxMessage>> 
addAttachmentToMessages(Stream<Pair<MessageWithoutAttachment,
+            Stream<MessageAttachmentRepresentation>>> messageRepresentations, 
MessageMapper.FetchType fetchType) {
 
-        return FluentFutureStream.of(messageRepresentations)
-            .thenComposeOnAll(pair -> 
getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
-                    .thenApply(attachments -> Pair.of(pair.getLeft(), 
attachments)))
-            .map(pair ->
-                pair.getLeft()
-                    .toMailboxMessage(pair.getRight()
-                        .stream()
-                        .collect(Guavate.toImmutableList())))
-            .completableFuture();
+        if (fetchType == MessageMapper.FetchType.Body || fetchType == 
MessageMapper.FetchType.Full) {
+            return FluentFutureStream.<SimpleMailboxMessage> of(
+                messageRepresentations
+                    .map(pair -> 
getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
+                        .thenApply(attachments -> 
pair.getLeft().toMailboxMessage(attachments))))
+                .completableFuture();
+        } else {
+            return CompletableFuture.completedFuture(messageRepresentations
+                .map(pair -> pair.getLeft()
+                    .toMailboxMessage(ImmutableList.of())));
+        }
     }
 
-    public CompletableFuture<Collection<MessageAttachment>> 
getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) 
{
+    @VisibleForTesting
+    CompletableFuture<List<MessageAttachment>> 
getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) 
{
         CompletableFuture<Map<AttachmentId, Attachment>> attachmentsByIdFuture 
=
             attachmentsById(attachmentRepresentations.stream()
                 .map(MessageAttachmentRepresentation::getAttachmentId)

http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index b94ffa6..ef5f954 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -36,13 +36,12 @@ import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.mail.migration.V1ToV2Migration;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.FlagsUpdateCalculator;
@@ -54,12 +53,9 @@ import 
org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
 import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
-import org.apache.james.util.OptionalConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.functions.FunctionChainer;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
@@ -111,14 +107,9 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
             .thenApply(stream -> stream.flatMap(Function.identity()))
             .thenApply(stream -> stream.collect(Guavate.toImmutableList()))
             .thenCompose(composedMessageIds -> retrieveMessages(fetchType, 
composedMessageIds))
-            .thenCompose(stream -> CompletableFutureUtil.allOf(
-                stream.map(pair -> mailboxExists(pair.getLeft())
-                    .thenApply(b -> Optional.of(pair).filter(any -> b)))))
-            .thenApply(stream -> stream.flatMap(OptionalConverter::toStream))
-            .thenApply(stream -> stream.map(loadAttachments(fetchType)))
-            .thenCompose(CompletableFutureUtil::allOf)
+            .thenCompose(stream -> 
attachmentLoader.addAttachmentToMessages(stream, fetchType))
+            .thenCompose(this::filterMessagesWIthExistingMailbox)
             .join()
-            .map(toMailboxMessages())
             .sorted(Comparator.comparing(MailboxMessage::getUid));
     }
 
@@ -130,38 +121,26 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
                         .completableFuture());
     }
 
-    private CompletableFuture<Boolean> mailboxExists(MessageWithoutAttachment 
messageWithoutAttachment) {
-        CassandraId cassandraId = (CassandraId) 
messageWithoutAttachment.getMailboxId();
+    private CompletableFuture<Stream<SimpleMailboxMessage>> 
filterMessagesWIthExistingMailbox(Stream<SimpleMailboxMessage> stream) {
+        return FluentFutureStream.of(stream.map(this::mailboxExists))
+            .flatMap(m -> m)
+            .completableFuture();
+    }
+
+    private CompletableFuture<Stream<SimpleMailboxMessage>> 
mailboxExists(SimpleMailboxMessage message) {
+        CassandraId cassandraId = (CassandraId) message.getMailboxId();
         return mailboxDAO.retrieveMailbox(cassandraId)
             .thenApply(optional -> {
                 if (!optional.isPresent()) {
                     LOGGER.info("Mailbox {} have been deleted but message {} 
is still attached to it.",
                         cassandraId,
-                        messageWithoutAttachment.getMessageId());
-                    return false;
+                        message.getMailboxId());
+                    return Stream.empty();
                 }
-                return true;
+                return Stream.of(message);
             });
     }
 
-    private Function<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>,
-                     CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachment>>>>
-                     loadAttachments(FetchType fetchType) {
-        if (fetchType == FetchType.Full || fetchType == FetchType.Body) {
-            return pair -> attachmentLoader
-                
.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
-                .thenApply(attachments -> Pair.of(pair.getLeft(), 
attachments.stream()));
-        } else {
-            return pair -> 
CompletableFuture.completedFuture(Pair.of(pair.getLeft(), Stream.of()));
-        }
-    }
-
-    private FunctionChainer<Pair<MessageWithoutAttachment, 
Stream<MessageAttachment>>, SimpleMailboxMessage> toMailboxMessages() {
-        return Throwing.function(pair -> pair.getLeft()
-            .toMailboxMessage(pair.getRight()
-                .collect(Guavate.toImmutableList())));
-    }
-
     @Override
     public List<MailboxId> findMailboxes(MessageId messageId) {
         return imapUidDAO.retrieve((CassandraMessageId) messageId, 
Optional.empty()).join()

http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index de07b8c..4b52806 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -193,16 +193,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CompletableFuture<Stream<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>>
             messageRepresentations = 
retrieveMessagesAndDoMigrationIfNeeded(messageIds, fetchType, limit);
 
-        if (fetchType == FetchType.Body || fetchType == FetchType.Full) {
-            return 
attachmentLoader.toMailboxMessageWithAttachments(messageRepresentations);
-        } else {
-            return FluentFutureStream.of(messageRepresentations)
-                .map(pair ->
-                    pair
-                        .getLeft()
-                        .toMailboxMessage(ImmutableList.of()))
-                .completableFuture();
-        }
+        return messageRepresentations
+            .thenCompose(stream -> 
attachmentLoader.addAttachmentToMessages(stream, fetchType));
     }
 
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 84d21a6..ea14568 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -36,10 +36,14 @@ import 
org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
 public class V1ToV2Migration {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(V1ToV2Migration.class);
+
     private final CassandraMessageDAO messageDAOV1;
     private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
@@ -64,8 +68,7 @@ public class V1ToV2Migration {
     }
 
     private CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> 
performV1ToV2Migration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
-        return attachmentLoader.toMailboxMessageWithAttachments(
-            CompletableFuture.completedFuture(Stream.of(messageV1)))
+        return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), 
MessageMapper.FetchType.Full)
             .thenApply(stream -> stream.findAny().get())
             .thenCompose(this::saveInV2FromV1)
             .thenCompose(this::deleteInV1)
@@ -83,6 +86,7 @@ public class V1ToV2Migration {
         try {
             return messageDAOV2.save(message).thenApply(any -> 
Optional.of(message));
         } catch (MailboxException e) {
+            LOGGER.error("Exception while saving message during migration", e);
             return 
CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty());
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to