This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bde8a850b1536d992b296d51a855f3e6e5bb73e2 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:48:36 2021 +0700 JAMES-3575 Reactive single message move for JMAP RFC-8621 --- .../org/apache/james/mailbox/MessageIdManager.java | 2 + .../james/mailbox/store/StoreMessageIdManager.java | 44 ++++++++++++---------- .../jmap/method/EmailSetUpdatePerformer.scala | 4 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index abb0f8f..218819a 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -70,6 +70,8 @@ public interface MessageIdManager { void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; + Publisher<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession); + default List<MessageResult> getMessage(MessageId messageId, FetchGroup fetchGroup, MailboxSession mailboxSession) throws MailboxException { return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 7dfc8ae..a7fa33b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -286,25 +286,29 @@ public class StoreMessageIdManager implements MessageIdManager { @Override public void setInMailboxes(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) throws MailboxException { - List<MailboxMessage> currentMailboxMessages = findRelatedMailboxMessages(messageId, mailboxSession); - - MailboxReactorUtils.block(messageMovesWithMailbox(MessageMoves.builder() - .targetMailboxIds(targetMailboxIds) - .previousMailboxIds(toMailboxIds(currentMailboxMessages)) - .build(), mailboxSession) - .flatMap(Throwing.<MessageMovesWithMailbox, Mono<Void>>function(messageMove -> { - MessageMovesWithMailbox refined = messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read)); + MailboxReactorUtils.block(setInMailboxesReactive(messageId, targetMailboxIds, mailboxSession) + .subscribeOn(Schedulers.elastic())); + } - if (messageMove.getPreviousMailboxes().isEmpty()) { - LOGGER.info("Tried to access {} not accessible for {}", messageId, mailboxSession.getUser().asString()); + @Override + public Mono<Void> setInMailboxesReactive(MessageId messageId, Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) { + return findRelatedMailboxMessages(messageId, mailboxSession) + .flatMap(currentMailboxMessages -> messageMovesWithMailbox(MessageMoves.builder() + .targetMailboxIds(targetMailboxIds) + .previousMailboxIds(toMailboxIds(currentMailboxMessages)) + .build(), mailboxSession) + .flatMap(Throwing.<MessageMovesWithMailbox, Mono<Void>>function(messageMove -> { + MessageMovesWithMailbox refined = messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read)); + + if (messageMove.getPreviousMailboxes().isEmpty()) { + LOGGER.info("Tried to access {} not accessible for {}", messageId, mailboxSession.getUser().asString()); + return Mono.empty(); + } + if (refined.isChange()) { + return applyMessageMoves(mailboxSession, currentMailboxMessages, refined); + } return Mono.empty(); - } - if (refined.isChange()) { - return applyMessageMoves(mailboxSession, currentMailboxMessages, refined); - } - return Mono.empty(); - }).sneakyThrow()) - .subscribeOn(Schedulers.elastic())); + }).sneakyThrow())); } public void setInMailboxesNoCheck(MessageId messageId, MailboxId targetMailboxId, MailboxSession mailboxSession) throws MailboxException { @@ -325,11 +329,11 @@ public class StoreMessageIdManager implements MessageIdManager { .subscribeOn(Schedulers.elastic())); } - private List<MailboxMessage> findRelatedMailboxMessages(MessageId messageId, MailboxSession mailboxSession) throws MailboxException { + private Mono<List<MailboxMessage>> findRelatedMailboxMessages(MessageId messageId, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); - return MailboxReactorUtils.block(messageIdMapper.findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) - .collect(Guavate.toImmutableList())); + return messageIdMapper.findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) + .collect(Guavate.toImmutableList()); } private Mono<Void> applyMessageMoves(MailboxSession mailboxSession, List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox messageMoves) throws MailboxNotFoundException { diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala index c69517f..409ecaf 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala @@ -219,11 +219,11 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer, if (targetIds.equals(mailboxIds)) { SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) } else { - SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session)) - .subscribeOn(Schedulers.elastic()) + SMono(messageIdManager.setInMailboxesReactive(messageId, targetIds.value.asJava, session)) .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e))) .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + .subscribeOn(Schedulers.elastic()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
