This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3943d2f09c08b78ee082e62f0696133f1df3c25a Author: Benoit Tellier <[email protected]> AuthorDate: Tue Apr 14 10:28:19 2020 +0700 JAMES-3149 reactify more getMailboxes --- .../cassandra/mail/CassandraMessageMapper.java | 16 ++------- .../james/mailbox/store/StoreMailboxManager.java | 40 ++++++++-------------- .../james/mailbox/store/mail/MessageMapper.java | 10 ++---- .../store/mail/model/MessageMapperTest.java | 18 ---------- 4 files changed, 20 insertions(+), 64 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 7d189c2..7faa9cc 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -20,7 +20,6 @@ package org.apache.james.mailbox.cassandra.mail; import java.time.Duration; -import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -62,7 +61,6 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public class CassandraMessageMapper implements MessageMapper { public static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageMapper.class); @@ -123,10 +121,11 @@ public class CassandraMessageMapper implements MessageMapper { @Override public MailboxCounters getMailboxCounters(Mailbox mailbox) { - return getMailboxCountersAsMono(mailbox).block(); + return getMailboxCountersReactive(mailbox).block(); } - private Mono<MailboxCounters> getMailboxCountersAsMono(Mailbox mailbox) { + @Override + public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return mailboxCounterDAO.retrieveMailboxCounters(mailboxId) .defaultIfEmpty(MailboxCounters.builder() @@ -137,15 +136,6 @@ public class CassandraMessageMapper implements MessageMapper { } @Override - public List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes) { - return Flux.fromIterable(mailboxes) - .publishOn(Schedulers.elastic()) - .concatMap(this::getMailboxCountersAsMono) - .toStream() - .collect(Guavate.toImmutableList()); - } - - @Override public void delete(Mailbox mailbox, MailboxMessage message) { deleteAsFuture(message) .block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index 76f8bfb..e9ad7da 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -22,7 +22,6 @@ package org.apache.james.mailbox.store; import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITED; import java.util.ArrayList; -import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.Optional; @@ -87,7 +86,6 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; -import com.google.common.base.Functions; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -593,26 +591,23 @@ public class StoreMailboxManager implements MailboxManager { @Override public List<MailboxMetaData> search(MailboxQuery mailboxExpression, MailboxSession session) throws MailboxException { - return searchMailboxesMetadata(mailboxExpression, session, Right.Lookup); + return searchMailboxesMetadata(mailboxExpression, session, Right.Lookup) + .collect(Guavate.toImmutableList()) + .block(); } - private List<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { - List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right).collectList().block(); - - ImmutableMap<MailboxId, MailboxCounters> counters = getMailboxCounters(mailboxes, session) - .stream() - .collect(Guavate.toImmutableMap( - MailboxCounters::getMailboxId, - Functions.identity())); + private Flux<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { + Mono<List<Mailbox>> mailboxesMono = searchMailboxes(mailboxQuery, session, right).collectList(); + MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(session); - return mailboxes - .stream() - .filter(mailboxQuery::matches) - .map(Throwing.<Mailbox, MailboxMetaData>function( - mailbox -> toMailboxMetadata(session, mailboxes, mailbox, retrieveCounters(counters, mailbox))) - .sneakyThrow()) - .sorted(MailboxMetaData.COMPARATOR) - .collect(Guavate.toImmutableList()); + return mailboxesMono + .flatMapMany(mailboxes -> Flux.fromIterable(mailboxes) + .filter(mailboxQuery::matches) + .flatMap(mailbox -> messageMapper.getMailboxCountersReactive(mailbox) + .map(Throwing.<MailboxCounters, MailboxMetaData>function( + counters -> toMailboxMetadata(session, mailboxes, mailbox, counters)) + .sneakyThrow()))) + .sort(MailboxMetaData.COMPARATOR); } private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { @@ -823,11 +818,4 @@ public class StoreMailboxManager implements MailboxManager { Mailbox mailbox = mapper.findMailboxByPathBlocking(mailboxPath); return mapper.hasChildren(mailbox, session.getPathDelimiter()); } - - private List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes, MailboxSession session) throws MailboxException { - MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(session); - return messageMapper.getMailboxCounters(mailboxes.stream() - .filter(Throwing.<Mailbox>predicate(mailbox -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow()) - .collect(Guavate.toImmutableList())); - } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java index c0d34d4..d7c8428 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java @@ -18,7 +18,6 @@ ****************************************************************/ package org.apache.james.mailbox.store.mail; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,8 +38,7 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.Property; import org.apache.james.mailbox.store.transaction.Mapper; -import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Mono; /** * Maps {@link MailboxMessage} in a {@link org.apache.james.mailbox.MessageManager}. A {@link MessageMapper} has a lifecycle from the start of a request @@ -73,10 +71,8 @@ public interface MessageMapper extends Mapper { MailboxCounters getMailboxCounters(Mailbox mailbox) throws MailboxException; - default List<MailboxCounters> getMailboxCounters(Collection<Mailbox> mailboxes) throws MailboxException { - return mailboxes.stream() - .map(Throwing.<Mailbox, MailboxCounters>function(this::getMailboxCounters).sneakyThrow()) - .collect(Guavate.toImmutableList()); + default Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) { + return Mono.fromCallable(() -> getMailboxCounters(mailbox)); } /** diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java index ae67a36..66ee7bf 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java @@ -43,7 +43,6 @@ import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Mailbox; -import org.apache.james.mailbox.model.MailboxCounters; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageMetaData; @@ -128,23 +127,6 @@ public abstract class MessageMapperTest { } @Test - void getMailboxCountersShouldReturnStoredValue() throws MailboxException { - saveMessages(); - assertThat(messageMapper.getMailboxCounters(ImmutableList.of(benwaInboxMailbox, benwaWorkMailbox))) - .containsExactlyInAnyOrder( - MailboxCounters.builder() - .mailboxId(benwaInboxMailbox.getMailboxId()) - .count(5) - .unseen(5) - .build(), - MailboxCounters.builder() - .mailboxId(benwaWorkMailbox.getMailboxId()) - .count(1) - .unseen(1) - .build()); - } - - @Test void mailboxCountShouldBeDecrementedAfterAMessageDelete() throws MailboxException { saveMessages(); messageMapper.delete(benwaInboxMailbox, message1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
