This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3048a9f990e3f6f80e22b8790be87681916019bb Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Sun May 23 10:39:41 2021 +0700 [PERFORMANCE] Reactify ReferenceUpdater --- .../james/jmap/draft/methods/ReferenceUpdater.java | 55 ++++++++++++---------- .../methods/SetMessagesCreationProcessor.java | 5 +- .../draft/methods/SetMessagesUpdateProcessor.java | 10 ++-- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java index 17c946a..a965e67 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java @@ -19,7 +19,6 @@ package org.apache.james.jmap.draft.methods; -import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; @@ -34,7 +33,6 @@ import org.apache.james.mailbox.MessageManager.FlagsUpdateMode; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Header; import org.apache.james.mailbox.model.Headers; -import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MultimailboxesSearchQuery; import org.apache.james.mailbox.model.SearchQuery; @@ -48,6 +46,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.Iterables; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class ReferenceUpdater { public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id"; @@ -64,45 +63,49 @@ public class ReferenceUpdater { this.mailboxManager = mailboxManager; } - public void updateReferences(Headers headers, MailboxSession session) throws MailboxException { + public Mono<Void> updateReferences(Headers headers, MailboxSession session) throws MailboxException { Map<String, String> headersAsMap = Iterators.toStream(headers.headers()) .collect(Guavate.toImmutableMap(Header::getName, Header::getValue)); - updateReferences(headersAsMap, session); + return updateReferences(headersAsMap, session); } - public void updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException { + public Mono<Void> updateReferences(Map<String, String> headers, MailboxSession session) throws MailboxException { Optional<String> inReplyToId = Optional.ofNullable(headers.get(RFC2822Headers.IN_REPLY_TO)); Optional<String> forwardedId = Optional.ofNullable(headers.get(X_FORWARDED_ID_HEADER)); - inReplyToId.ifPresent(Throwing.consumer((String id) -> updateAnswered(id, session)).sneakyThrow()); - forwardedId.ifPresent(Throwing.consumer((String id) -> updateForwarded(id, session)).sneakyThrow()); + + return inReplyToId.map(Throwing.function((String id) -> updateAnswered(id, session)).sneakyThrow()).orElse(Mono.empty()) + .then(forwardedId.map((Throwing.function((String id) -> updateForwarded(id, session)).sneakyThrow())).orElse(Mono.empty())); } - private void updateAnswered(String messageId, MailboxSession session) throws MailboxException { - updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED)); + private Mono<Void> updateAnswered(String messageId, MailboxSession session) throws MailboxException { + return updateFlag(messageId, session, new Flags(Flags.Flag.ANSWERED)); } - private void updateForwarded(String messageId, MailboxSession session) throws MailboxException { - updateFlag(messageId, session, FORWARDED_FLAG); + private Mono<Void> updateForwarded(String messageId, MailboxSession session) throws MailboxException { + return updateFlag(messageId, session, FORWARDED_FLAG); } - private void updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException { + private Mono<Void> updateFlag(String messageId, MailboxSession session, Flags flag) throws MailboxException { int limit = 2; MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery .from(SearchQuery.of(SearchQuery.mimeMessageID(messageId))) .build(); - List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)) - .collectList().block(); - try { - MessageId reference = Iterables.getOnlyElement(references); - List<MailboxId> mailboxIds = Flux.from(messageIdManager.messageMetadata(reference, session)) - .map(metaData -> metaData.getComposedMessageId().getMailboxId()) - .collect(Guavate.toImmutableList()) - .block(); - messageIdManager.setFlags(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session); - } catch (NoSuchElementException e) { - logger.info("Unable to find a message with this Mime Message Id: " + messageId); - } catch (IllegalArgumentException e) { - logger.info("Too many messages are matching this Mime Message Id: " + messageId); - } + return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)) + .collectList() + .flatMap(references -> { + MessageId reference = Iterables.getOnlyElement(references); + return Flux.from(messageIdManager.messageMetadata(reference, session)) + .map(metaData -> metaData.getComposedMessageId().getMailboxId()) + .collect(Guavate.toImmutableList()) + .flatMap(mailboxIds -> Mono.from(messageIdManager.setFlagsReactive(flag, FlagsUpdateMode.ADD, reference, mailboxIds, session))); + }) + .onErrorResume(NoSuchElementException.class, e -> { + logger.info("Unable to find a message with this Mime Message Id: " + messageId); + return Mono.empty(); + }) + .onErrorResume(IllegalArgumentException.class, e -> { + logger.info("Too many messages are matching this Mime Message Id: " + messageId); + return Mono.empty(); + }); } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java index 1c505fd..fc1ca2b 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java @@ -293,8 +293,9 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor { MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session); MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block(); Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage); - messageSender.sendMessage(newMessage, envelope, session).block(); - referenceUpdater.updateReferences(entry.getValue().getHeaders(), session); + messageSender.sendMessage(newMessage, envelope, session) + .then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session)) + .block(); return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java index 2cc67b1..f92e91c 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java @@ -36,6 +36,7 @@ import javax.mail.MessagingException; import javax.mail.Session; import javax.mail.internet.MimeMessage; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.core.Username; import org.apache.james.jmap.draft.exceptions.InvalidOutboxMoveException; import org.apache.james.jmap.draft.model.Keyword; @@ -70,6 +71,7 @@ import org.apache.james.util.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -333,10 +335,12 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .asOptional() .map(Username::fromMailAddress); assertUserCanSendFrom(mailboxSession.getUser(), fromUser); - messageSender.sendMessage(messageId, mail, mailboxSession).block(); - referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession); - return SetMessagesResponse.builder(); + return Pair.of(messageToSend, mail); }).subscribeOn(Schedulers.elastic())) + .flatMap(Throwing.<Pair<MessageResult, MailImpl>, Mono<SetMessagesResponse.Builder>>function( + pair -> messageSender.sendMessage(messageId, pair.getRight(), mailboxSession) + .then(referenceUpdater.updateReferences(pair.getKey().getHeaders(), mailboxSession)) + .thenReturn(SetMessagesResponse.builder())).sneakyThrow()) .switchIfEmpty(Mono.just(addMessageIdNotFoundToResponse(messageId))); } return Mono.just(SetMessagesResponse.builder()); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org