This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 66dc43c67c463be0c26a771fae80cd321c2c84c4 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 15 10:12:09 2021 +0700 [REFACTORING] JMAP draft setMessages update: do not block for outbox reading --- .../james/mailbox/SystemMailboxesProvider.java | 2 +- .../draft/methods/SetMessagesUpdateProcessor.java | 37 +++++++++++----------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java b/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java index 58c0f57..53a9410 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/SystemMailboxesProvider.java @@ -27,7 +27,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; public interface SystemMailboxesProvider { - Publisher<MessageManager> getMailboxByRole(Role aRole, Username username) throws MailboxException; + Publisher<MessageManager> getMailboxByRole(Role aRole, Username username); default MessageManager findMailbox(Role role, Username username) throws MailboxException { return Flux.from(getMailboxByRole(role, username)) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java index 9f4c176..82c8f57 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java @@ -66,7 +66,6 @@ import org.apache.james.mailbox.model.MessageMoves; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.MessageResult; import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.metrics.api.TimeMetric; import org.apache.james.rrt.api.CanSendFrom; import org.apache.james.server.core.MailImpl; import org.apache.james.util.StreamUtils; @@ -80,8 +79,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; -import io.vavr.control.Try; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class SetMessagesUpdateProcessor implements SetMessagesProcessor { @@ -121,19 +121,21 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { } @Override - public SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) { - TimeMetric timeMetric = metricFactory.timer(JMAP_PREFIX + "SetMessagesUpdateProcessor"); - SetMessagesResponse.Builder responseBuilder = SetMessagesResponse.builder(); - Try.ofCallable(() -> listMailboxIdsForRole(mailboxSession, Role.OUTBOX)) - .map(outboxes -> { - prepareResponse(request, mailboxSession, responseBuilder, outboxes); - return null; - }) - .onFailure(e -> request.buildUpdatePatches(updatePatchConverter) - .forEach((id, patch) -> prepareResponseIfCantReadOutboxes(responseBuilder, e, id, patch))); - - timeMetric.stopAndPublish(); - return responseBuilder.build(); + public Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) { + return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + "SetMessagesUpdateProcessor", + listMailboxIdsForRole(mailboxSession, Role.OUTBOX) + .flatMap(outboxIds -> Mono.fromCallable(() -> { + SetMessagesResponse.Builder responseBuilder = SetMessagesResponse.builder(); + prepareResponse(request, mailboxSession, responseBuilder, outboxIds); + return responseBuilder.build(); + }).subscribeOn(Schedulers.elastic())) + .onErrorResume(e -> + Mono.fromCallable(() -> { + SetMessagesResponse.Builder responseBuilder = SetMessagesResponse.builder(); + request.buildUpdatePatches(updatePatchConverter) + .forEach((id, patch) -> prepareResponseIfCantReadOutboxes(responseBuilder, e, id, patch)); + return responseBuilder.build(); + }).subscribeOn(Schedulers.elastic())))); } private void prepareResponseIfCantReadOutboxes(SetMessagesResponse.Builder responseBuilder, Throwable e, MessageId id, UpdateMessagePatch patch) { @@ -406,13 +408,12 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .collect(Guavate.toImmutableSet()); } - private boolean isTargetingOutbox(Set<MailboxId> outboxes, Set<MailboxId> targetMailboxIds) throws MailboxException { + private boolean isTargetingOutbox(Set<MailboxId> outboxes, Set<MailboxId> targetMailboxIds) { return targetMailboxIds.stream().anyMatch(outboxes::contains); } - private Set<MailboxId> listMailboxIdsForRole(MailboxSession session, Role role) throws MailboxException { + private Mono<Set<MailboxId>> listMailboxIdsForRole(MailboxSession session, Role role) { return Flux.from(systemMailboxesProvider.getMailboxByRole(role, session.getUser())) - .toStream() .map(MessageManager::getId) .collect(Guavate.toImmutableSet()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
