This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bde8a850b1536d992b296d51a855f3e6e5bb73e2
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 1 22:48:36 2021 +0700

    JAMES-3575 Reactive single message move for JMAP RFC-8621
---
 .../org/apache/james/mailbox/MessageIdManager.java |  2 +
 .../james/mailbox/store/StoreMessageIdManager.java | 44 ++++++++++++----------
 .../jmap/method/EmailSetUpdatePerformer.scala      |  4 +-
 3 files changed, 28 insertions(+), 22 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index abb0f8f..218819a 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -70,6 +70,8 @@ public interface MessageIdManager {
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, 
MailboxSession mailboxSession) throws MailboxException;
 
+    Publisher<Void> setInMailboxesReactive(MessageId messageId, 
Collection<MailboxId> mailboxIds, MailboxSession mailboxSession);
+
     default List<MessageResult> getMessage(MessageId messageId, FetchGroup 
fetchGroup, MailboxSession mailboxSession) throws MailboxException {
         return getMessages(ImmutableList.of(messageId), fetchGroup, 
mailboxSession);
     }
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 7dfc8ae..a7fa33b 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -286,25 +286,29 @@ public class StoreMessageIdManager implements 
MessageIdManager {
 
     @Override
     public void setInMailboxes(MessageId messageId, Collection<MailboxId> 
targetMailboxIds, MailboxSession mailboxSession) throws MailboxException {
-        List<MailboxMessage> currentMailboxMessages = 
findRelatedMailboxMessages(messageId, mailboxSession);
-
-        
MailboxReactorUtils.block(messageMovesWithMailbox(MessageMoves.builder()
-            .targetMailboxIds(targetMailboxIds)
-            .previousMailboxIds(toMailboxIds(currentMailboxMessages))
-            .build(), mailboxSession)
-            .flatMap(Throwing.<MessageMovesWithMailbox, 
Mono<Void>>function(messageMove -> {
-                MessageMovesWithMailbox refined = 
messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read));
+        MailboxReactorUtils.block(setInMailboxesReactive(messageId, 
targetMailboxIds, mailboxSession)
+            .subscribeOn(Schedulers.elastic()));
+    }
 
-                if (messageMove.getPreviousMailboxes().isEmpty()) {
-                    LOGGER.info("Tried to access {} not accessible for {}", 
messageId, mailboxSession.getUser().asString());
+    @Override
+    public Mono<Void> setInMailboxesReactive(MessageId messageId, 
Collection<MailboxId> targetMailboxIds, MailboxSession mailboxSession) {
+        return findRelatedMailboxMessages(messageId, mailboxSession)
+            .flatMap(currentMailboxMessages -> 
messageMovesWithMailbox(MessageMoves.builder()
+                .targetMailboxIds(targetMailboxIds)
+                .previousMailboxIds(toMailboxIds(currentMailboxMessages))
+                .build(), mailboxSession)
+                .flatMap(Throwing.<MessageMovesWithMailbox, 
Mono<Void>>function(messageMove -> {
+                    MessageMovesWithMailbox refined = 
messageMove.filterPrevious(hasRightsOnMailbox(mailboxSession, Right.Read));
+
+                    if (messageMove.getPreviousMailboxes().isEmpty()) {
+                        LOGGER.info("Tried to access {} not accessible for 
{}", messageId, mailboxSession.getUser().asString());
+                        return Mono.empty();
+                    }
+                    if (refined.isChange()) {
+                        return applyMessageMoves(mailboxSession, 
currentMailboxMessages, refined);
+                    }
                     return Mono.empty();
-                }
-                if (refined.isChange()) {
-                    return applyMessageMoves(mailboxSession, 
currentMailboxMessages, refined);
-                }
-                return Mono.empty();
-            }).sneakyThrow())
-            .subscribeOn(Schedulers.elastic()));
+                }).sneakyThrow()));
     }
 
     public void setInMailboxesNoCheck(MessageId messageId, MailboxId 
targetMailboxId, MailboxSession mailboxSession) throws MailboxException {
@@ -325,11 +329,11 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .subscribeOn(Schedulers.elastic()));
     }
 
-    private List<MailboxMessage> findRelatedMailboxMessages(MessageId 
messageId, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<List<MailboxMessage>> findRelatedMailboxMessages(MessageId 
messageId, MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
-        return 
MailboxReactorUtils.block(messageIdMapper.findReactive(ImmutableList.of(messageId),
 MessageMapper.FetchType.Metadata)
-            .collect(Guavate.toImmutableList()));
+        return messageIdMapper.findReactive(ImmutableList.of(messageId), 
MessageMapper.FetchType.Metadata)
+            .collect(Guavate.toImmutableList());
     }
 
     private Mono<Void> applyMessageMoves(MailboxSession mailboxSession, 
List<MailboxMessage> currentMailboxMessages, MessageMovesWithMailbox 
messageMoves) throws MailboxNotFoundException {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
index c69517f..409ecaf 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala
@@ -219,11 +219,11 @@ class EmailSetUpdatePerformer @Inject() (serializer: 
EmailSetSerializer,
     if (targetIds.equals(mailboxIds)) {
       SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))
     } else {
-      SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, 
targetIds.value.asJava, session))
-        .subscribeOn(Schedulers.elastic())
+      SMono(messageIdManager.setInMailboxesReactive(messageId, 
targetIds.value.asJava, session))
         .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
         .onErrorResume(e => 
SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId),
 e)))
         
.switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)))
+        .subscribeOn(Schedulers.elastic())
     }
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to