This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 42fc513c5be8547d139b85210268b52f5941dfa2
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 1 22:38:48 2021 +0700

    JAMES-3575 Reactive single flag update for JMAP RFC-8621
---
 .../main/java/org/apache/james/mailbox/MessageIdManager.java   |  2 ++
 .../org/apache/james/mailbox/store/StoreMessageIdManager.java  | 10 +++++++---
 .../org/apache/james/jmap/method/EmailSetUpdatePerformer.scala |  5 ++---
 3 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 6fef5d4..abb0f8f 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -52,6 +52,8 @@ public interface MessageIdManager {
 
     void setFlags(Flags newState, FlagsUpdateMode replace, MessageId 
messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws 
MailboxException;
 
+    Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, 
MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) 
throws MailboxException;
+
     List<MessageResult> getMessages(Collection<MessageId> messageIds, 
FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException;
 
     default Publisher<MessageResult> getMessagesReactive(Collection<MessageId> 
messageIds, FetchGroup minimal, MailboxSession mailboxSession) {
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 5031573..7dfc8ae 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -120,12 +120,17 @@ public class StoreMessageIdManager implements 
MessageIdManager {
 
     @Override
     public void setFlags(Flags newState, MessageManager.FlagsUpdateMode 
replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession 
mailboxSession) throws MailboxException {
+        MailboxReactorUtils.block(setFlagsReactive(newState, replace, 
messageId, mailboxIds, mailboxSession));
+    }
+
+    @Override
+    public Mono<Void> setFlagsReactive(Flags newState, 
MessageManager.FlagsUpdateMode replace, MessageId messageId, List<MailboxId> 
mailboxIds, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
         MailboxMapper mailboxMapper = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession);
 
         int concurrency = 4;
 
-        MailboxReactorUtils.block(Flux.fromIterable(mailboxIds)
+        return Flux.fromIterable(mailboxIds)
             .flatMap(mailboxMapper::findMailboxById, concurrency)
             .collect(Guavate.toImmutableList())
             .flatMap(Throwing.<List<Mailbox>, 
Mono<Void>>function(targetMailboxes -> {
@@ -135,8 +140,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
                     .flatMapIterable(updatedFlags -> 
updatedFlags.asMap().entrySet())
                     .concatMap(entry -> dispatchFlagsChange(mailboxSession, 
entry.getKey(), ImmutableList.copyOf(entry.getValue()), targetMailboxes))
                     .then();
-            }).sneakyThrow())
-            .subscribeOn(Schedulers.elastic()));
+            }).sneakyThrow());
     }
 
     @Override
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index 3e42e69..c69517f 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -235,10 +235,9 @@ class EmailSetUpdatePerformer @Inject() (serializer: 
EmailSetSerializer,
     if (newFlags.equals(originalFlags)) {
       SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
     } else {
-      SMono.fromCallable(() =>
-        messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, 
messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
-        .subscribeOn(Schedulers.elastic())
+      SMono(messageIdManager.setFlagsReactive(newFlags, 
FlagsUpdateMode.REPLACE, messageId, 
ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session))
         .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+        .subscribeOn(Schedulers.elastic())
     }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to