This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit cb1179e5d9e4ea7f3810bbc7266406b7de51389a Author: Benoit Tellier <[email protected]> AuthorDate: Tue Apr 14 10:59:30 2020 +0700 JAMES-3149 Reactive getMailboxes --- .../org/apache/james/mailbox/MailboxManager.java | 4 + .../james/mailbox/store/StoreMailboxManager.java | 22 ++++- .../jmap/draft/methods/GetMailboxesMethod.java | 109 +++++++++++---------- .../jmap/draft/methods/GetMailboxesMethodTest.java | 4 + 4 files changed, 84 insertions(+), 55 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java index 9f31163..d5ac9d7 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java @@ -36,6 +36,8 @@ import org.apache.james.mailbox.model.MultimailboxesSearchQuery; import org.apache.james.mailbox.model.search.MailboxQuery; import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + /** * <p> * Central MailboxManager which creates, lists, provides, renames and deletes @@ -243,6 +245,8 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot */ List<MailboxMetaData> search(MailboxQuery expression, MailboxSession session) throws MailboxException; + Flux<MailboxMetaData> searchReactive(MailboxQuery expression, MailboxSession session); + /** * Searches for messages matching the given query. * diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index e9ad7da..f32d83e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -596,6 +596,15 @@ public class StoreMailboxManager implements MailboxManager { .block(); } + @Override + public Flux<MailboxMetaData> searchReactive(MailboxQuery expression, MailboxSession session) { + try { + return searchMailboxesMetadata(expression, session, Right.Lookup); + } catch (MailboxException e) { + return Flux.error(e); + } + } + private Flux<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { Mono<List<Mailbox>> mailboxesMono = searchMailboxes(mailboxQuery, session, right).collectList(); MessageMapper messageMapper = mailboxSessionMapperFactory.getMessageMapper(session); @@ -603,13 +612,24 @@ public class StoreMailboxManager implements MailboxManager { return mailboxesMono .flatMapMany(mailboxes -> Flux.fromIterable(mailboxes) .filter(mailboxQuery::matches) - .flatMap(mailbox -> messageMapper.getMailboxCountersReactive(mailbox) + .flatMap(mailbox -> retrieveCounters(messageMapper, mailbox, session) .map(Throwing.<MailboxCounters, MailboxMetaData>function( counters -> toMailboxMetadata(session, mailboxes, mailbox, counters)) .sneakyThrow()))) .sort(MailboxMetaData.COMPARATOR); } + private Mono<MailboxCounters> retrieveCounters(MessageMapper messageMapper, Mailbox mailbox, MailboxSession session) { + return messageMapper.getMailboxCountersReactive(mailbox) + .filter(Throwing.<MailboxCounters>predicate(counter -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow()) + .switchIfEmpty(Mono.just(MailboxCounters + .builder() + .mailboxId(mailbox.getMailboxId()) + .count(0) + .unseen(0) + .build())); + } + private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session); Flux<Mailbox> baseMailboxes = mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session)); diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java index da0ad86..f74b243 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java @@ -19,11 +19,12 @@ package org.apache.james.jmap.draft.methods; +import static org.apache.james.util.ReactorUtils.context; + import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.stream.Stream; import javax.inject.Inject; @@ -33,11 +34,9 @@ import org.apache.james.jmap.draft.model.MailboxFactory; import org.apache.james.jmap.draft.model.MailboxProperty; import org.apache.james.jmap.draft.model.MethodCallId; import org.apache.james.jmap.draft.model.mailbox.Mailbox; -import org.apache.james.jmap.draft.utils.quotas.QuotaLoader; import org.apache.james.jmap.draft.utils.quotas.QuotaLoaderWithDefaultPreloaded; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxMetaData; import org.apache.james.mailbox.model.search.MailboxQuery; @@ -45,21 +44,24 @@ import org.apache.james.mailbox.quota.QuotaManager; import org.apache.james.mailbox.quota.QuotaRootResolver; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; -import org.apache.james.util.OptionalUtils; import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class GetMailboxesMethod implements Method { private static final Method.Request.Name METHOD_NAME = Method.Request.name("getMailboxes"); private static final Method.Response.Name RESPONSE_NAME = Method.Response.name("mailboxes"); private static final Optional<List<MailboxMetaData>> NO_PRELOADED_METADATA = Optional.empty(); + private static final String ACTION = "GET_MAILBOXES"; private final MailboxManager mailboxManager; private final MailboxFactory mailboxFactory; @@ -88,83 +90,82 @@ public class GetMailboxesMethod implements Method { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { Preconditions.checkArgument(request instanceof GetMailboxesRequest); GetMailboxesRequest mailboxesRequest = (GetMailboxesRequest) request; + return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + () -> process(methodCallId, mailboxSession, mailboxesRequest) + .subscriberContext(context(ACTION, mdc(mailboxesRequest)))); + } + + private MDCBuilder mdc(GetMailboxesRequest mailboxesRequest) { return MDCBuilder.create() - .addContext(MDCBuilder.ACTION, "GET_MAILBOXES") + .addContext(MDCBuilder.ACTION, ACTION) .addContext("accountId", mailboxesRequest.getAccountId()) .addContext("mailboxIds", mailboxesRequest.getIds()) - .addContext("properties", mailboxesRequest.getProperties()) - .wrapArround( - () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, mailboxesRequest))) - .get(); + .addContext("properties", mailboxesRequest.getProperties()); } - private Stream<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMailboxesRequest mailboxesRequest) { - return Stream.of( - JmapResponse.builder().methodCallId(methodCallId) - .response(getMailboxesResponse(mailboxesRequest, mailboxSession)) + private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMailboxesRequest mailboxesRequest) { + return Flux.from(getMailboxesResponse(mailboxesRequest, mailboxSession) + .map(response -> JmapResponse.builder().methodCallId(methodCallId) + .response(response) .properties(mailboxesRequest.getProperties().map(this::ensureContainsId)) .responseName(RESPONSE_NAME) - .build()); + .build())); } private Set<MailboxProperty> ensureContainsId(Set<MailboxProperty> input) { return Sets.union(input, ImmutableSet.of(MailboxProperty.ID)).immutableCopy(); } - private GetMailboxesResponse getMailboxesResponse(GetMailboxesRequest mailboxesRequest, MailboxSession mailboxSession) { - GetMailboxesResponse.Builder builder = GetMailboxesResponse.builder(); - try { - Optional<ImmutableList<MailboxId>> mailboxIds = mailboxesRequest.getIds(); - List<Mailbox> mailboxes = retrieveMailboxes(mailboxIds, mailboxSession) - .sorted(Comparator.comparing(Mailbox::getSortOrder)) - .collect(Guavate.toImmutableList()); - return builder.addAll(mailboxes).build(); - } catch (MailboxException e) { - throw new RuntimeException(e); - } + private Mono<GetMailboxesResponse> getMailboxesResponse(GetMailboxesRequest mailboxesRequest, MailboxSession mailboxSession) { + Optional<ImmutableList<MailboxId>> mailboxIds = mailboxesRequest.getIds(); + return retrieveMailboxes(mailboxIds, mailboxSession) + .sort(Comparator.comparing(Mailbox::getSortOrder)) + .reduce(GetMailboxesResponse.builder(), GetMailboxesResponse.Builder::add) + .map(GetMailboxesResponse.Builder::build); } - private Stream<Mailbox> retrieveMailboxes(Optional<ImmutableList<MailboxId>> mailboxIds, MailboxSession mailboxSession) throws MailboxException { + private Flux<Mailbox> retrieveMailboxes(Optional<ImmutableList<MailboxId>> mailboxIds, MailboxSession mailboxSession) { return mailboxIds .map(ids -> retrieveSpecificMailboxes(mailboxSession, ids)) .orElseGet(Throwing.supplier(() -> retrieveAllMailboxes(mailboxSession)).sneakyThrow()); } - private Stream<Mailbox> retrieveSpecificMailboxes(MailboxSession mailboxSession, ImmutableList<MailboxId> mailboxIds) { - return mailboxIds - .stream() - .map(mailboxId -> mailboxFactory.builder() - .id(mailboxId) - .session(mailboxSession) - .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA) - .build() - ) - .flatMap(OptionalUtils::toStream); + private Flux<Mailbox> retrieveSpecificMailboxes(MailboxSession mailboxSession, ImmutableList<MailboxId> mailboxIds) { + return Flux.fromIterable(mailboxIds) + .flatMap(mailboxId -> Mono.fromCallable(() -> + mailboxFactory.builder() + .id(mailboxId) + .session(mailboxSession) + .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA) + .build()) + .subscribeOn(Schedulers.elastic())) + .handle((element, sink) -> element.ifPresent(sink::next)); } - private Stream<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) throws MailboxException { - List<MailboxMetaData> userMailboxes = getAllMailboxesMetaData(mailboxSession); - QuotaLoader quotaLoader = new QuotaLoaderWithDefaultPreloaded(quotaRootResolver, quotaManager, mailboxSession); - - return userMailboxes - .stream() - .map(mailboxMetaData -> mailboxFactory.builder() - .mailboxMetadata(mailboxMetaData) - .session(mailboxSession) - .usingPreloadedMailboxesMetadata(Optional.of(userMailboxes)) - .quotaLoader(quotaLoader) - .build()) - .flatMap(OptionalUtils::toStream); + private Flux<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) { + Mono<List<MailboxMetaData>> userMailboxesMono = getAllMailboxesMetaData(mailboxSession).collectList(); + Mono<QuotaLoaderWithDefaultPreloaded> quotaLoaderMono = Mono.fromCallable(() -> + new QuotaLoaderWithDefaultPreloaded(quotaRootResolver, quotaManager, mailboxSession)) + .subscribeOn(Schedulers.elastic()); + + return userMailboxesMono.zipWith(quotaLoaderMono) + .flatMapMany( + tuple -> Flux.fromIterable(tuple.getT1()) + .flatMap(mailboxMetaData -> Mono.justOrEmpty(mailboxFactory.builder() + .mailboxMetadata(mailboxMetaData) + .session(mailboxSession) + .usingPreloadedMailboxesMetadata(Optional.of(tuple.getT1())) + .quotaLoader(tuple.getT2()) + .build()))); } - private List<MailboxMetaData> getAllMailboxesMetaData(MailboxSession mailboxSession) throws MailboxException { - return mailboxManager.search( + private Flux<MailboxMetaData> getAllMailboxesMetaData(MailboxSession mailboxSession) { + return mailboxManager.searchReactive( MailboxQuery.builder() .matchesAllMailboxNames() .build(), diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java index f0b9cca..f51721b 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java @@ -57,6 +57,8 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + public class GetMailboxesMethodTest { private static final Username USERNAME = Username.of("[email protected]"); @@ -106,6 +108,8 @@ public class GetMailboxesMethodTest { .thenReturn(ImmutableList.of(new MailboxPath("namespace", Username.of("user"), "name"))); when(mockedMailboxManager.getMailbox(any(MailboxPath.class), any())) .thenThrow(new MailboxException()); + when(mockedMailboxManager.searchReactive(any(), any())) + .thenReturn(Flux.empty()); GetMailboxesMethod testee = new GetMailboxesMethod(mockedMailboxManager, quotaRootResolver, quotaManager, mailboxFactory, new DefaultMetricFactory()); GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
