This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 42fc513c5be8547d139b85210268b52f5941dfa2 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:38:48 2021 +0700 JAMES-3575 Reactive single flag update for JMAP RFC-8621 --- .../main/java/org/apache/james/mailbox/MessageIdManager.java | 2 ++ .../org/apache/james/mailbox/store/StoreMessageIdManager.java | 10 +++++++--- .../org/apache/james/jmap/method/EmailSetUpdatePerformer.scala | 5 ++--- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 6fef5d4..abb0f8f 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -52,6 +52,8 @@ public interface MessageIdManager { void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; + Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; + List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException; default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 5031573..7dfc8ae 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -120,12 +120,17 @@ public class StoreMessageIdManager implements MessageIdManager { @Override public void setFlags(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException { + MailboxReactorUtils.block(setFlagsReactive(newState, replace, messageId, mailboxIds, mailboxSession)); + } + + @Override + public Mono<Void> setFlagsReactive(Flags newState, MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession); int concurrency = 4; - MailboxReactorUtils.block(Flux.fromIterable(mailboxIds) + return Flux.fromIterable(mailboxIds) .flatMap(mailboxMapper::findMailboxById, concurrency) .collect(Guavate.toImmutableList()) .flatMap(Throwing.<List<Mailbox>, Mono<Void>>function(targetMailboxes -> { @@ -135,8 +140,7 @@ public class StoreMessageIdManager implements MessageIdManager { .flatMapIterable(updatedFlags -> updatedFlags.asMap().entrySet()) .concatMap(entry -> dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes)) .then(); - }).sneakyThrow()) - .subscribeOn(Schedulers.elastic())); + }).sneakyThrow()); } @Override diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala index 3e42e69..c69517f 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala @@ -235,10 +235,9 @@ class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer, if (newFlags.equals(originalFlags)) { SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) } else { - SMono.fromCallable(() => - messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session)) - .subscribeOn(Schedulers.elastic()) + SMono(messageIdManager.setFlagsReactive(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session)) .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + .subscribeOn(Schedulers.elastic()) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
