This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cb1179e5d9e4ea7f3810bbc7266406b7de51389a
Author: Benoit Tellier <[email protected]>
AuthorDate: Tue Apr 14 10:59:30 2020 +0700

    JAMES-3149 Reactive getMailboxes
---
 .../org/apache/james/mailbox/MailboxManager.java   |   4 +
 .../james/mailbox/store/StoreMailboxManager.java   |  22 ++++-
 .../jmap/draft/methods/GetMailboxesMethod.java     | 109 +++++++++++----------
 .../jmap/draft/methods/GetMailboxesMethodTest.java |   4 +
 4 files changed, 84 insertions(+), 55 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
index 9f31163..d5ac9d7 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java
@@ -36,6 +36,8 @@ import 
org.apache.james.mailbox.model.MultimailboxesSearchQuery;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.reactivestreams.Publisher;
 
+import reactor.core.publisher.Flux;
+
 /**
  * <p>
  * Central MailboxManager which creates, lists, provides, renames and deletes
@@ -243,6 +245,8 @@ public interface MailboxManager extends RequestAware, 
RightManager, MailboxAnnot
      */
     List<MailboxMetaData> search(MailboxQuery expression, MailboxSession 
session) throws MailboxException;
 
+    Flux<MailboxMetaData> searchReactive(MailboxQuery expression, 
MailboxSession session);
+
     /**
      * Searches for messages matching the given query.
      * 
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
index e9ad7da..f32d83e 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java
@@ -596,6 +596,15 @@ public class StoreMailboxManager implements MailboxManager 
{
             .block();
     }
 
+    @Override
+    public Flux<MailboxMetaData> searchReactive(MailboxQuery expression, 
MailboxSession session) {
+        try {
+            return searchMailboxesMetadata(expression, session, Right.Lookup);
+        } catch (MailboxException e) {
+            return Flux.error(e);
+        }
+    }
+
     private Flux<MailboxMetaData> searchMailboxesMetadata(MailboxQuery 
mailboxQuery, MailboxSession session, Right right) throws MailboxException {
         Mono<List<Mailbox>> mailboxesMono = searchMailboxes(mailboxQuery, 
session, right).collectList();
         MessageMapper messageMapper = 
mailboxSessionMapperFactory.getMessageMapper(session);
@@ -603,13 +612,24 @@ public class StoreMailboxManager implements 
MailboxManager {
         return mailboxesMono
             .flatMapMany(mailboxes -> Flux.fromIterable(mailboxes)
                 .filter(mailboxQuery::matches)
-                .flatMap(mailbox -> 
messageMapper.getMailboxCountersReactive(mailbox)
+                .flatMap(mailbox -> retrieveCounters(messageMapper, mailbox, 
session)
                     .map(Throwing.<MailboxCounters, MailboxMetaData>function(
                         counters -> toMailboxMetadata(session, mailboxes, 
mailbox, counters))
                         .sneakyThrow())))
             .sort(MailboxMetaData.COMPARATOR);
     }
 
+    private Mono<MailboxCounters> retrieveCounters(MessageMapper 
messageMapper, Mailbox mailbox, MailboxSession session) {
+        return messageMapper.getMailboxCountersReactive(mailbox)
+            .filter(Throwing.<MailboxCounters>predicate(counter -> 
storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow())
+            .switchIfEmpty(Mono.just(MailboxCounters
+                .builder()
+                .mailboxId(mailbox.getMailboxId())
+                .count(0)
+                .unseen(0)
+                .build()));
+    }
+
     private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, 
MailboxSession session, Right right) throws MailboxException {
         MailboxMapper mailboxMapper = 
mailboxSessionMapperFactory.getMailboxMapper(session);
         Flux<Mailbox> baseMailboxes = 
mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session));
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
index da0ad86..f74b243 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMailboxesMethod.java
@@ -19,11 +19,12 @@
 
 package org.apache.james.jmap.draft.methods;
 
+import static org.apache.james.util.ReactorUtils.context;
+
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -33,11 +34,9 @@ import org.apache.james.jmap.draft.model.MailboxFactory;
 import org.apache.james.jmap.draft.model.MailboxProperty;
 import org.apache.james.jmap.draft.model.MethodCallId;
 import org.apache.james.jmap.draft.model.mailbox.Mailbox;
-import org.apache.james.jmap.draft.utils.quotas.QuotaLoader;
 import 
org.apache.james.jmap.draft.utils.quotas.QuotaLoaderWithDefaultPreloaded;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxMetaData;
 import org.apache.james.mailbox.model.search.MailboxQuery;
@@ -45,21 +44,24 @@ import org.apache.james.mailbox.quota.QuotaManager;
 import org.apache.james.mailbox.quota.QuotaRootResolver;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
-import org.apache.james.util.OptionalUtils;
 
 import com.github.fge.lambdas.Throwing;
-import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class GetMailboxesMethod implements Method {
 
     private static final Method.Request.Name METHOD_NAME = 
Method.Request.name("getMailboxes");
     private static final Method.Response.Name RESPONSE_NAME = 
Method.Response.name("mailboxes");
     private static final Optional<List<MailboxMetaData>> NO_PRELOADED_METADATA 
= Optional.empty();
+    private static final String ACTION = "GET_MAILBOXES";
 
     private final MailboxManager mailboxManager;
     private final MailboxFactory mailboxFactory;
@@ -88,83 +90,82 @@ public class GetMailboxesMethod implements Method {
     }
 
     @Override
-    public Stream<JmapResponse> processToStream(JmapRequest request, 
MethodCallId methodCallId, MailboxSession mailboxSession) {
+    public Flux<JmapResponse> process(JmapRequest request, MethodCallId 
methodCallId, MailboxSession mailboxSession) {
         Preconditions.checkArgument(request instanceof GetMailboxesRequest);
         GetMailboxesRequest mailboxesRequest = (GetMailboxesRequest) request;
 
+        return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + 
METHOD_NAME.getName(),
+            () -> process(methodCallId, mailboxSession, mailboxesRequest)
+            .subscriberContext(context(ACTION, mdc(mailboxesRequest))));
+    }
+
+    private MDCBuilder mdc(GetMailboxesRequest mailboxesRequest) {
         return MDCBuilder.create()
-            .addContext(MDCBuilder.ACTION, "GET_MAILBOXES")
+            .addContext(MDCBuilder.ACTION, ACTION)
             .addContext("accountId", mailboxesRequest.getAccountId())
             .addContext("mailboxIds", mailboxesRequest.getIds())
-            .addContext("properties", mailboxesRequest.getProperties())
-            .wrapArround(
-                () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX 
+ METHOD_NAME.getName(),
-                    () -> process(methodCallId, mailboxSession, 
mailboxesRequest)))
-            .get();
+            .addContext("properties", mailboxesRequest.getProperties());
     }
 
-    private Stream<JmapResponse> process(MethodCallId methodCallId, 
MailboxSession mailboxSession, GetMailboxesRequest mailboxesRequest) {
-        return Stream.of(
-            JmapResponse.builder().methodCallId(methodCallId)
-                .response(getMailboxesResponse(mailboxesRequest, 
mailboxSession))
+    private Flux<JmapResponse> process(MethodCallId methodCallId, 
MailboxSession mailboxSession, GetMailboxesRequest mailboxesRequest) {
+        return Flux.from(getMailboxesResponse(mailboxesRequest, mailboxSession)
+            .map(response -> JmapResponse.builder().methodCallId(methodCallId)
+                .response(response)
                 
.properties(mailboxesRequest.getProperties().map(this::ensureContainsId))
                 .responseName(RESPONSE_NAME)
-                .build());
+                .build()));
     }
 
     private Set<MailboxProperty> ensureContainsId(Set<MailboxProperty> input) {
         return Sets.union(input, 
ImmutableSet.of(MailboxProperty.ID)).immutableCopy();
     }
 
-    private GetMailboxesResponse getMailboxesResponse(GetMailboxesRequest 
mailboxesRequest, MailboxSession mailboxSession) {
-        GetMailboxesResponse.Builder builder = GetMailboxesResponse.builder();
-        try {
-            Optional<ImmutableList<MailboxId>> mailboxIds = 
mailboxesRequest.getIds();
-            List<Mailbox> mailboxes = retrieveMailboxes(mailboxIds, 
mailboxSession)
-                .sorted(Comparator.comparing(Mailbox::getSortOrder))
-                .collect(Guavate.toImmutableList());
-            return builder.addAll(mailboxes).build();
-        } catch (MailboxException e) {
-            throw new RuntimeException(e);
-        }
+    private Mono<GetMailboxesResponse> 
getMailboxesResponse(GetMailboxesRequest mailboxesRequest, MailboxSession 
mailboxSession) {
+        Optional<ImmutableList<MailboxId>> mailboxIds = 
mailboxesRequest.getIds();
+        return retrieveMailboxes(mailboxIds, mailboxSession)
+            .sort(Comparator.comparing(Mailbox::getSortOrder))
+            .reduce(GetMailboxesResponse.builder(), 
GetMailboxesResponse.Builder::add)
+            .map(GetMailboxesResponse.Builder::build);
     }
 
-    private Stream<Mailbox> 
retrieveMailboxes(Optional<ImmutableList<MailboxId>> mailboxIds, MailboxSession 
mailboxSession) throws MailboxException {
+    private Flux<Mailbox> retrieveMailboxes(Optional<ImmutableList<MailboxId>> 
mailboxIds, MailboxSession mailboxSession) {
         return mailboxIds
             .map(ids -> retrieveSpecificMailboxes(mailboxSession, ids))
             .orElseGet(Throwing.supplier(() -> 
retrieveAllMailboxes(mailboxSession)).sneakyThrow());
     }
 
 
-    private Stream<Mailbox> retrieveSpecificMailboxes(MailboxSession 
mailboxSession, ImmutableList<MailboxId> mailboxIds) {
-        return mailboxIds
-            .stream()
-            .map(mailboxId -> mailboxFactory.builder()
-                .id(mailboxId)
-                .session(mailboxSession)
-                .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA)
-                .build()
-            )
-            .flatMap(OptionalUtils::toStream);
+    private Flux<Mailbox> retrieveSpecificMailboxes(MailboxSession 
mailboxSession, ImmutableList<MailboxId> mailboxIds) {
+        return Flux.fromIterable(mailboxIds)
+            .flatMap(mailboxId -> Mono.fromCallable(() ->
+                mailboxFactory.builder()
+                    .id(mailboxId)
+                    .session(mailboxSession)
+                    .usingPreloadedMailboxesMetadata(NO_PRELOADED_METADATA)
+                    .build())
+                .subscribeOn(Schedulers.elastic()))
+            .handle((element, sink) -> element.ifPresent(sink::next));
     }
 
-    private Stream<Mailbox> retrieveAllMailboxes(MailboxSession 
mailboxSession) throws MailboxException {
-        List<MailboxMetaData> userMailboxes = 
getAllMailboxesMetaData(mailboxSession);
-        QuotaLoader quotaLoader = new 
QuotaLoaderWithDefaultPreloaded(quotaRootResolver, quotaManager, 
mailboxSession);
-
-        return userMailboxes
-            .stream()
-            .map(mailboxMetaData -> mailboxFactory.builder()
-                .mailboxMetadata(mailboxMetaData)
-                .session(mailboxSession)
-                .usingPreloadedMailboxesMetadata(Optional.of(userMailboxes))
-                .quotaLoader(quotaLoader)
-                .build())
-            .flatMap(OptionalUtils::toStream);
+    private Flux<Mailbox> retrieveAllMailboxes(MailboxSession mailboxSession) {
+        Mono<List<MailboxMetaData>> userMailboxesMono = 
getAllMailboxesMetaData(mailboxSession).collectList();
+        Mono<QuotaLoaderWithDefaultPreloaded> quotaLoaderMono = 
Mono.fromCallable(() ->
+            new QuotaLoaderWithDefaultPreloaded(quotaRootResolver, 
quotaManager, mailboxSession))
+            .subscribeOn(Schedulers.elastic());
+
+        return userMailboxesMono.zipWith(quotaLoaderMono)
+            .flatMapMany(
+                tuple -> Flux.fromIterable(tuple.getT1())
+                    .flatMap(mailboxMetaData -> 
Mono.justOrEmpty(mailboxFactory.builder()
+                        .mailboxMetadata(mailboxMetaData)
+                        .session(mailboxSession)
+                        
.usingPreloadedMailboxesMetadata(Optional.of(tuple.getT1()))
+                        .quotaLoader(tuple.getT2())
+                        .build())));
     }
 
-    private List<MailboxMetaData> getAllMailboxesMetaData(MailboxSession 
mailboxSession) throws MailboxException {
-        return mailboxManager.search(
+    private Flux<MailboxMetaData> getAllMailboxesMetaData(MailboxSession 
mailboxSession) {
+        return mailboxManager.searchReactive(
             MailboxQuery.builder()
                 .matchesAllMailboxNames()
                 .build(),
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
index f0b9cca..f51721b 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/GetMailboxesMethodTest.java
@@ -57,6 +57,8 @@ import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public class GetMailboxesMethodTest {
 
     private static final Username USERNAME = 
Username.of("[email protected]");
@@ -106,6 +108,8 @@ public class GetMailboxesMethodTest {
             .thenReturn(ImmutableList.of(new MailboxPath("namespace", 
Username.of("user"), "name")));
         when(mockedMailboxManager.getMailbox(any(MailboxPath.class), any()))
             .thenThrow(new MailboxException());
+        when(mockedMailboxManager.searchReactive(any(), any()))
+            .thenReturn(Flux.empty());
         GetMailboxesMethod testee = new 
GetMailboxesMethod(mockedMailboxManager, quotaRootResolver, quotaManager, 
mailboxFactory, new DefaultMetricFactory());
 
         GetMailboxesRequest getMailboxesRequest = GetMailboxesRequest.builder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to