JAMES-2082 Factorize attachment loading by giving more responsibilities to Attachment loader
Code is then factorized between CassandraMessageMapper CassandraMessageIdMapper and V1ToV2Migration Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6091e38a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6091e38a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6091e38a Branch: refs/heads/master Commit: 6091e38aca468b0ac6ce86a4e17868f3eeeec68c Parents: 2a4987d Author: benwa <btell...@linagora.com> Authored: Thu Jul 6 13:19:03 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:56 2017 +0200 ---------------------------------------------------------------------- .../cassandra/mail/AttachmentLoader.java | 30 ++++++------ .../mail/CassandraMessageIdMapper.java | 49 ++++++-------------- .../cassandra/mail/CassandraMessageMapper.java | 12 +---- .../mail/migration/V1ToV2Migration.java | 8 +++- 4 files changed, 39 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java index 59933be..17c4539 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java @@ -18,7 +18,6 @@ ****************************************************************/ package org.apache.james.mailbox.cassandra.mail; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -30,12 +29,14 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.james.mailbox.model.Attachment; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.MessageAttachment; +import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalConverter; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; public class AttachmentLoader { @@ -46,21 +47,24 @@ public class AttachmentLoader { this.attachmentMapper = attachmentMapper; } - public CompletableFuture<Stream<SimpleMailboxMessage>> toMailboxMessageWithAttachments( - CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> messageRepresentations) { + public CompletableFuture<Stream<SimpleMailboxMessage>> addAttachmentToMessages(Stream<Pair<MessageWithoutAttachment, + Stream<MessageAttachmentRepresentation>>> messageRepresentations, MessageMapper.FetchType fetchType) { - return FluentFutureStream.of(messageRepresentations) - .thenComposeOnAll(pair -> getAttachments(pair.getRight().collect(Guavate.toImmutableList())) - .thenApply(attachments -> Pair.of(pair.getLeft(), attachments))) - .map(pair -> - pair.getLeft() - .toMailboxMessage(pair.getRight() - .stream() - .collect(Guavate.toImmutableList()))) - .completableFuture(); + if (fetchType == MessageMapper.FetchType.Body || fetchType == MessageMapper.FetchType.Full) { + return FluentFutureStream.<SimpleMailboxMessage> of( + messageRepresentations + .map(pair -> getAttachments(pair.getRight().collect(Guavate.toImmutableList())) + .thenApply(attachments -> pair.getLeft().toMailboxMessage(attachments)))) + .completableFuture(); + } else { + return CompletableFuture.completedFuture(messageRepresentations + .map(pair -> pair.getLeft() + .toMailboxMessage(ImmutableList.of()))); + } } - public CompletableFuture<Collection<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) { + @VisibleForTesting + CompletableFuture<List<MessageAttachment>> getAttachments(List<MessageAttachmentRepresentation> attachmentRepresentations) { CompletableFuture<Map<AttachmentId, Attachment>> attachmentsByIdFuture = attachmentsById(attachmentRepresentations.stream() .map(MessageAttachmentRepresentation::getAttachmentId) http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index b94ffa6..ef5f954 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -36,13 +36,12 @@ import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; -import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.mail.migration.V1ToV2Migration; +import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MailboxId; -import org.apache.james.mailbox.model.MessageAttachment; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.FlagsUpdateCalculator; @@ -54,12 +53,9 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; -import org.apache.james.util.OptionalConverter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; -import com.github.fge.lambdas.functions.FunctionChainer; import com.github.steveash.guavate.Guavate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -111,14 +107,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .thenApply(stream -> stream.flatMap(Function.identity())) .thenApply(stream -> stream.collect(Guavate.toImmutableList())) .thenCompose(composedMessageIds -> retrieveMessages(fetchType, composedMessageIds)) - .thenCompose(stream -> CompletableFutureUtil.allOf( - stream.map(pair -> mailboxExists(pair.getLeft()) - .thenApply(b -> Optional.of(pair).filter(any -> b))))) - .thenApply(stream -> stream.flatMap(OptionalConverter::toStream)) - .thenApply(stream -> stream.map(loadAttachments(fetchType))) - .thenCompose(CompletableFutureUtil::allOf) + .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType)) + .thenCompose(this::filterMessagesWIthExistingMailbox) .join() - .map(toMailboxMessages()) .sorted(Comparator.comparing(MailboxMessage::getUid)); } @@ -130,38 +121,26 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .completableFuture()); } - private CompletableFuture<Boolean> mailboxExists(MessageWithoutAttachment messageWithoutAttachment) { - CassandraId cassandraId = (CassandraId) messageWithoutAttachment.getMailboxId(); + private CompletableFuture<Stream<SimpleMailboxMessage>> filterMessagesWIthExistingMailbox(Stream<SimpleMailboxMessage> stream) { + return FluentFutureStream.of(stream.map(this::mailboxExists)) + .flatMap(m -> m) + .completableFuture(); + } + + private CompletableFuture<Stream<SimpleMailboxMessage>> mailboxExists(SimpleMailboxMessage message) { + CassandraId cassandraId = (CassandraId) message.getMailboxId(); return mailboxDAO.retrieveMailbox(cassandraId) .thenApply(optional -> { if (!optional.isPresent()) { LOGGER.info("Mailbox {} have been deleted but message {} is still attached to it.", cassandraId, - messageWithoutAttachment.getMessageId()); - return false; + message.getMailboxId()); + return Stream.empty(); } - return true; + return Stream.of(message); }); } - private Function<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>, - CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachment>>>> - loadAttachments(FetchType fetchType) { - if (fetchType == FetchType.Full || fetchType == FetchType.Body) { - return pair -> attachmentLoader - .getAttachments(pair.getRight().collect(Guavate.toImmutableList())) - .thenApply(attachments -> Pair.of(pair.getLeft(), attachments.stream())); - } else { - return pair -> CompletableFuture.completedFuture(Pair.of(pair.getLeft(), Stream.of())); - } - } - - private FunctionChainer<Pair<MessageWithoutAttachment, Stream<MessageAttachment>>, SimpleMailboxMessage> toMailboxMessages() { - return Throwing.function(pair -> pair.getLeft() - .toMailboxMessage(pair.getRight() - .collect(Guavate.toImmutableList()))); - } - @Override public List<MailboxId> findMailboxes(MessageId messageId) { return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()).join() http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index de07b8c..4b52806 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -193,16 +193,8 @@ public class CassandraMessageMapper implements MessageMapper { CompletableFuture<Stream<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>> messageRepresentations = retrieveMessagesAndDoMigrationIfNeeded(messageIds, fetchType, limit); - if (fetchType == FetchType.Body || fetchType == FetchType.Full) { - return attachmentLoader.toMailboxMessageWithAttachments(messageRepresentations); - } else { - return FluentFutureStream.of(messageRepresentations) - .map(pair -> - pair - .getLeft() - .toMailboxMessage(ImmutableList.of())) - .completableFuture(); - } + return messageRepresentations + .thenCompose(stream -> attachmentLoader.addAttachmentToMessages(stream, fetchType)); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6091e38a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index 84d21a6..ea14568 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -36,10 +36,14 @@ import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; public class V1ToV2Migration { + private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2Migration.class); + private final CassandraMessageDAO messageDAOV1; private final CassandraMessageDAOV2 messageDAOV2; private final AttachmentLoader attachmentLoader; @@ -64,8 +68,7 @@ public class V1ToV2Migration { } private CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { - return attachmentLoader.toMailboxMessageWithAttachments( - CompletableFuture.completedFuture(Stream.of(messageV1))) + return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full) .thenApply(stream -> stream.findAny().get()) .thenCompose(this::saveInV2FromV1) .thenCompose(this::deleteInV1) @@ -83,6 +86,7 @@ public class V1ToV2Migration { try { return messageDAOV2.save(message).thenApply(any -> Optional.of(message)); } catch (MailboxException e) { + LOGGER.error("Exception while saving message during migration", e); return CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org