This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 48a698e6afc12cce27ba0a6eb7d31cd3df9f5cf0 Author: Benoit Tellier <[email protected]> AuthorDate: Sun Apr 12 10:05:37 2020 +0700 [REFACTORING] Avoid intermediate collect when retrieving messages using JMAP --- .../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java | 8 ++++++-- .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java | 6 ++---- .../james/mailbox/cassandra/mail/CassandraMessageMapper.java | 3 +-- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 621a685..d861ab9 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -237,11 +237,15 @@ public class CassandraMessageDAO { .collect(Guavate.toImmutableList()); } + public Mono<MessageResult> retrieveMessage(ComposedMessageIdWithMetaData id, FetchType fetchType) { + return retrieveRow(id, fetchType) + .flatMap(resultSet -> message(resultSet, id, fetchType)); + } + public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) .publishOn(Schedulers.elastic()) - .flatMap(id -> retrieveRow(id, fetchType) - .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize()); + .flatMap(id -> retrieveMessage(id, fetchType), configuration.getMessageReadChunkSize()); } private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 107e37b..8e07fda 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -45,7 +45,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.ModSeqProvider; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.util.FunctionalUtils; -import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,11 +95,10 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return Flux.fromStream(messageIds.stream()) .publishOn(Schedulers.elastic()) .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize()) - .collectList() - .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) + .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType), cassandraConfiguration.getMessageReadChunkSize()) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) - .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType)) + .flatMap(messageRepresentation -> attachmentLoader.addAttachmentToMessage(messageRepresentation, fetchType), cassandraConfiguration.getMessageReadChunkSize()) .groupBy(MailboxMessage::getMailboxId) .flatMap(this::keepMessageIfMailboxExists) .collectSortedList(Comparator.comparing(MailboxMessage::getUid)) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 7faa9cc..7f7a0b7 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -220,8 +220,7 @@ public class CassandraMessageMapper implements MessageMapper { private Flux<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) { return retrieveComposedId(mailboxId, messageUid) .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata)) - .flatMapMany(idWithMetadata -> - messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Limit.unlimited())) + .flatMapMany(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata)) .filter(CassandraMessageDAO.MessageResult::isFound) .map(CassandraMessageDAO.MessageResult::message) .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
