This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 134a0e0f891dae8f3d76e9a31ee088f5f6f0232f
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Sun May 23 11:52:45 2021 +0700

    [PERFORMANCE] Reactify SetMessagesCreationProcessor
    
    Blocking calls are still made to check message attachments and
    append the message.
---
 .../exceptions/AttachmentsNotFoundException.java   |   2 +-
 .../exceptions/MessageHasNoMailboxException.java   |  24 --
 .../jmap/draft/methods/AttachmentChecker.java      |  18 +-
 .../methods/SetMessagesCreationProcessor.java      | 319 ++++++++++-----------
 .../jmap/draft/send/PostDequeueDecorator.java      |   2 +
 .../exception/MailShouldBeInOutboxException.java   |   1 -
 .../jmap/draft/methods/AttachmentCheckerTest.java  |  10 +-
 .../methods/SetMessagesCreationProcessorTest.java  |  42 +--
 .../jmap/draft/send/PostDequeueDecoratorTest.java  |  25 +-
 9 files changed, 217 insertions(+), 226 deletions(-)

diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java
index be0bf0a..3b7abfa 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java
@@ -25,7 +25,7 @@ import org.apache.james.jmap.draft.model.BlobId;
 
 import com.google.common.collect.ImmutableList;
 
-public class AttachmentsNotFoundException extends Exception {
+public class AttachmentsNotFoundException extends RuntimeException {
     
     private List<BlobId> attachmentIds;
 
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java
deleted file mode 100644
index db184c8..0000000
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.jmap.draft.exceptions;
-
-public class MessageHasNoMailboxException extends RuntimeException {
-    
-}
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java
index a65076e..9f2d104 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java
@@ -35,6 +35,9 @@ import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.predicates.ThrowingPredicate;
 import com.github.steveash.guavate.Guavate;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class AttachmentChecker {
     private final AttachmentManager attachmentManager;
 
@@ -43,12 +46,15 @@ public class AttachmentChecker {
         this.attachmentManager = attachmentManager;
     }
 
-    public void assertAttachmentsExist(ValueWithId.CreationMessageEntry entry, 
MailboxSession session) throws AttachmentsNotFoundException, MailboxException {
-        List<Attachment> attachments = entry.getValue().getAttachments();
-        List<BlobId> notFounds = listAttachmentsNotFound(attachments, session);
-        if (!notFounds.isEmpty()) {
-            throw new AttachmentsNotFoundException(notFounds);
-        }
+    public Mono<Void> assertAttachmentsExist(ValueWithId.CreationMessageEntry 
entry, MailboxSession session) {
+        return Mono.fromRunnable(Throwing.runnable(() -> {
+            List<Attachment> attachments = entry.getValue().getAttachments();
+            List<BlobId> notFounds = listAttachmentsNotFound(attachments, 
session);
+            if (!notFounds.isEmpty()) {
+                throw new AttachmentsNotFoundException(notFounds);
+            }
+        })).subscribeOn(Schedulers.elastic())
+            .then();
     }
 
     private List<BlobId> listAttachmentsNotFound(List<Attachment> attachments, 
MailboxSession session) throws MailboxException {
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
index 7c876ce..3e8a3d5 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java
@@ -35,7 +35,6 @@ import 
org.apache.james.jmap.draft.exceptions.AttachmentsNotFoundException;
 import org.apache.james.jmap.draft.exceptions.InvalidDraftKeywordsException;
 import 
org.apache.james.jmap.draft.exceptions.InvalidMailboxForCreationException;
 import org.apache.james.jmap.draft.exceptions.MailboxNotOwnedException;
-import org.apache.james.jmap.draft.exceptions.MessageHasNoMailboxException;
 import org.apache.james.jmap.draft.methods.ValueWithId.CreationMessageEntry;
 import org.apache.james.jmap.draft.methods.ValueWithId.MessageWithId;
 import org.apache.james.jmap.draft.model.CreationMessage;
@@ -50,7 +49,6 @@ import org.apache.james.jmap.draft.model.SetMessagesResponse;
 import org.apache.james.jmap.draft.model.SetMessagesResponse.Builder;
 import org.apache.james.jmap.draft.model.message.view.MessageFullView;
 import org.apache.james.jmap.draft.model.message.view.MessageFullViewFactory;
-import 
org.apache.james.jmap.draft.model.message.view.MessageFullViewFactory.MetaDataWithContent;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageManager;
@@ -61,20 +59,20 @@ import 
org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.exception.OverQuotaException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.metrics.api.MetricFactory;
-import org.apache.james.metrics.api.TimeMetric;
 import org.apache.james.rrt.api.CanSendFrom;
 import org.apache.james.server.core.Envelope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
-import com.github.fge.lambdas.functions.FunctionChainer;
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 
 public class SetMessagesCreationProcessor implements SetMessagesProcessor {
@@ -116,104 +114,92 @@ public class SetMessagesCreationProcessor implements 
SetMessagesProcessor {
     }
 
     @Override
-    public SetMessagesResponse process(SetMessagesRequest request, 
MailboxSession mailboxSession) {
-        TimeMetric timeMetric = metricFactory.timer(JMAP_PREFIX + 
"SetMessageCreationProcessor");
+    public Mono<SetMessagesResponse> processReactive(SetMessagesRequest 
request, MailboxSession mailboxSession) {
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(JMAP_PREFIX + 
"SetMessageCreationProcessor",
+            Flux.fromIterable(request.getCreate())
+                .flatMap(create -> handleCreate(create, mailboxSession))
+                .reduce(Builder::mergeWith)
+                .switchIfEmpty(Mono.just(SetMessagesResponse.builder()))
+                .map(Builder::build)));
+    }
 
-        SetMessagesResponse result = request.getCreate()
-            .stream()
-            .map(create -> handleCreate(create, mailboxSession))
-            .reduce(SetMessagesResponse.builder(), Builder::mergeWith)
-            .build();
+    private Mono<Builder> handleCreate(CreationMessageEntry create, 
MailboxSession mailboxSession) {
+        List<MailboxId> mailboxIds = toMailboxIds(create);
 
-        timeMetric.stopAndPublish();
-        return result;
-    }
+        if (mailboxIds.isEmpty()) {
+            return 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.INVALID_PROPERTIES)
+                    .properties(MessageProperty.mailboxIds)
+                    .description("Message needs to be in at least one mailbox")
+                    .build()));
+        }
 
-    private Builder handleCreate(CreationMessageEntry create, MailboxSession 
mailboxSession) {
-        try {
-            List<MailboxId> mailboxIds = toMailboxIds(create);
-            assertAtLeastOneMailbox(mailboxIds);
-            assertIsUserOwnerOfMailboxes(mailboxIds, mailboxSession);
-            return performCreate(create, mailboxSession);
-        } catch (MailboxSendingNotAllowedException e) {
-            LOG.debug("{} is not allowed to send a mail using {} identity", 
e.getConnectedUser().asString(), e.getFromField());
+        return assertIsUserOwnerOfMailboxes(mailboxIds, mailboxSession)
+            .then(performCreate(create, mailboxSession))
+            .onErrorResume(MailboxSendingNotAllowedException.class, e -> {
+                LOG.debug("{} is not allowed to send a mail using {} 
identity", e.getConnectedUser().asString(), e.getFromField());
 
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                return 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
                     SetError.builder()
                         .type(SetError.Type.INVALID_PROPERTIES)
                         .properties(MessageProperty.from)
                         .description("Invalid 'from' field. One accepted value 
is " +
-                                e.getConnectedUser().asString())
-                        .build());
-
-        } catch (InvalidDraftKeywordsException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                            e.getConnectedUser().asString())
+                        .build()));
+            })
+            .onErrorResume(InvalidDraftKeywordsException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
                 SetError.builder()
                     .type(SetError.Type.INVALID_PROPERTIES)
                     .properties(MessageProperty.keywords)
                     .description(e.getMessage())
-                    .build());
-
-        } catch (AttachmentsNotFoundException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    SetMessagesError.builder()
-                        .type(SetError.Type.INVALID_PROPERTIES)
-                        .properties(MessageProperty.attachments)
-                        .attachmentsNotFound(e.getAttachmentIds())
-                        .description("Attachment not found")
-                        .build());
-
-        } catch (InvalidMailboxForCreationException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    SetError.builder()
-                        .type(SetError.Type.INVALID_PROPERTIES)
-                        .properties(MessageProperty.mailboxIds)
-                        .description("Message creation is only supported in 
mailboxes with role Draft and Outbox")
-                        .build());
-
-        } catch (MessageHasNoMailboxException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    SetError.builder()
-                        .type(SetError.Type.INVALID_PROPERTIES)
-                        .properties(MessageProperty.mailboxIds)
-                        .description("Message needs to be in at least one 
mailbox")
-                        .build());
-
-        } catch (MailboxInvalidMessageCreationException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    
buildSetErrorFromValidationResult(create.getValue().validate()));
-
-        } catch (MailboxNotFoundException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    SetError.builder()
-                        .type(SetError.Type.ERROR)
-                        .description(e.getMessage())
-                        .build());
-
-        } catch (MailboxNotOwnedException e) {
-            LOG.error("Appending message in an unknown mailbox", e);
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                    .build())))
+            .onErrorResume(AttachmentsNotFoundException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetMessagesError.builder()
+                    .type(SetError.Type.INVALID_PROPERTIES)
+                    .properties(MessageProperty.attachments)
+                    .attachmentsNotFound(e.getAttachmentIds())
+                    .description("Attachment not found")
+                    .build())))
+            .onErrorResume(InvalidMailboxForCreationException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.INVALID_PROPERTIES)
+                    .properties(MessageProperty.mailboxIds)
+                    .description("Message creation is only supported in 
mailboxes with role Draft and Outbox")
+                    .build())))
+            .onErrorResume(MailboxInvalidMessageCreationException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                
buildSetErrorFromValidationResult(create.getValue().validate()))))
+            .onErrorResume(MailboxNotFoundException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.ERROR)
+                    .description(e.getMessage())
+                    .build())))
+            .onErrorResume(MailboxNotOwnedException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
                 SetError.builder()
                     .type(SetError.Type.ERROR)
                     .properties(MessageProperty.mailboxIds)
                     .description("MailboxId invalid")
-                    .build());
-
-        } catch (OverQuotaException e) {
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                    .build())))
+            .onErrorResume(OverQuotaException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
                 SetError.builder()
                     .type(SetError.Type.MAX_QUOTA_REACHED)
                     .description(e.getMessage())
-                    .build());
-
-        } catch (MailboxException | MessagingException | IOException e) {
-            LOG.error("Unexpected error while creating message", e);
-            return 
SetMessagesResponse.builder().notCreated(create.getCreationId(),
-                    SetError.builder()
-                        .type(SetError.Type.ERROR)
-                        .description("unexpected error")
-                        .build());
-        }
+                    .build())))
+            .onErrorResume(MailboxException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.ERROR)
+                    .description("unexpected error")
+                    .build())))
+            .onErrorResume(MessagingException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.ERROR)
+                    .description("unexpected error")
+                    .build())))
+            .onErrorResume(IOException.class, e -> 
Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(),
+                SetError.builder()
+                    .type(SetError.Type.ERROR)
+                    .description("unexpected error")
+                    .build())));
     }
 
     private ImmutableList<MailboxId> toMailboxIds(CreationMessageEntry create) 
{
@@ -224,95 +210,100 @@ public class SetMessagesCreationProcessor implements 
SetMessagesProcessor {
             .collect(Guavate.toImmutableList());
     }
 
-    private Builder performCreate(CreationMessageEntry entry, MailboxSession 
session)
-        throws MailboxException, MessagingException, 
AttachmentsNotFoundException, IOException {
-
-        if (isAppendToMailboxWithRole(Role.OUTBOX, entry.getValue(), session)) 
{
-            return sendMailViaOutbox(entry, session);
-        } else if (entry.getValue().isDraft()) {
-            assertNoOutbox(entry, session);
-            return saveDraft(entry, session);
-        } else {
-            if (isAppendToMailboxWithRole(Role.DRAFTS, entry.getValue(), 
session)) {
-                throw new InvalidDraftKeywordsException("A draft message 
should be flagged as Draft");
-            }
-            throw new InvalidMailboxForCreationException("The only implemented 
feature is sending via outbox and draft saving");
-        }
-    }
-
-    private void assertNoOutbox(CreationMessageEntry entry, MailboxSession 
session) throws MailboxException {
-        if (isTargettingAMailboxWithRole(Role.OUTBOX, entry.getValue(), 
session)) {
-            throw new InvalidMailboxForCreationException("Mailbox ids can 
combine Outbox with other mailbox");
-        }
+    private Mono<Builder> performCreate(CreationMessageEntry entry, 
MailboxSession session) {
+        return isAppendToMailboxWithRole(Role.OUTBOX, entry.getValue(), 
session)
+            .flatMap(isAppendToMailboxWithRole -> {
+                if (isAppendToMailboxWithRole) {
+                    return sendMailViaOutbox(entry, session);
+                } else if (entry.getValue().isDraft()) {
+                    return assertNoOutbox(entry, session)
+                        .then(saveDraft(entry, session));
+                } else {
+                    return isAppendToMailboxWithRole(Role.DRAFTS, 
entry.getValue(), session)
+                        .handle((isAppendedToDraft, sink) -> {
+                            if (isAppendedToDraft) {
+                                sink.error(new 
InvalidDraftKeywordsException("A draft message should be flagged as Draft"));
+                            } else {
+                                sink.error(new 
InvalidMailboxForCreationException("The only implemented feature is sending via 
outbox and draft saving"));
+                            }
+                        });
+                }
+            });
     }
 
-    private void assertAtLeastOneMailbox(List<MailboxId> mailboxIds) throws 
MailboxException {
-        if (mailboxIds.isEmpty()) {
-            throw new MessageHasNoMailboxException();
-        }
+    private Mono<Void> assertNoOutbox(CreationMessageEntry entry, 
MailboxSession session) {
+        return isTargettingAMailboxWithRole(Role.OUTBOX, entry.getValue(), 
session)
+            .handle((targetsOutbox, sink) -> {
+                if (targetsOutbox) {
+                    sink.error(new InvalidMailboxForCreationException("Mailbox 
ids can combine Outbox with other mailbox"));
+                }
+            });
     }
 
-    private Builder sendMailViaOutbox(CreationMessageEntry entry, 
MailboxSession session)
-        throws AttachmentsNotFoundException, MailboxException, 
MessagingException, IOException {
-
-        validateArguments(entry, session);
-        MessageWithId created = handleOutboxMessages(entry, session);
-        return SetMessagesResponse.builder().created(created.getCreationId(), 
created.getValue());
+    private Mono<Builder> sendMailViaOutbox(CreationMessageEntry entry, 
MailboxSession session) {
+        return validateArguments(entry, session)
+            .then(handleOutboxMessages(entry, session)
+                .map(created -> 
SetMessagesResponse.builder().created(created.getCreationId(), 
created.getValue())));
     }
 
-    private Builder saveDraft(CreationMessageEntry entry, MailboxSession 
session)
-        throws AttachmentsNotFoundException, MailboxException, 
MessagingException, IOException {
-
-        attachmentChecker.assertAttachmentsExist(entry, session);
-        MessageWithId created = handleDraftMessages(entry, session);
-        return SetMessagesResponse.builder().created(created.getCreationId(), 
created.getValue());
+    private Mono<Builder> saveDraft(CreationMessageEntry entry, MailboxSession 
session) {
+        return attachmentChecker.assertAttachmentsExist(entry, session)
+            .then(handleDraftMessages(entry, session)
+                .map(created -> 
SetMessagesResponse.builder().created(created.getCreationId(), 
created.getValue())));
     }
 
-    private void validateArguments(CreationMessageEntry entry, MailboxSession 
session) throws MailboxInvalidMessageCreationException, 
AttachmentsNotFoundException, MailboxException {
+    private Mono<Void> validateArguments(CreationMessageEntry entry, 
MailboxSession session) {
         CreationMessage message = entry.getValue();
         if (!message.isValid()) {
-            throw new MailboxInvalidMessageCreationException();
+            return Mono.error(new MailboxInvalidMessageCreationException());
         }
-        attachmentChecker.assertAttachmentsExist(entry, session);
+        return attachmentChecker.assertAttachmentsExist(entry, session);
     }
 
-    @VisibleForTesting void assertIsUserOwnerOfMailboxes(List<MailboxId> 
mailboxIds, MailboxSession session) throws MailboxNotOwnedException {
-        if (!allMailboxOwned(mailboxIds, session)) {
-            throw new MailboxNotOwnedException();
-        }
+    @VisibleForTesting Mono<Void> assertIsUserOwnerOfMailboxes(List<MailboxId> 
mailboxIds, MailboxSession session) {
+        return allMailboxOwned(mailboxIds, session)
+            .handle((allOwned, sink) -> {
+                if (!allOwned) {
+                    sink.error(new MailboxNotOwnedException());
+                }
+            });
     }
 
-    private boolean allMailboxOwned(List<MailboxId> mailboxIds, MailboxSession 
session) {
-        FunctionChainer<MailboxId, MessageManager> findMailbox = 
Throwing.function(mailboxId -> mailboxManager.getMailbox(mailboxId, session));
-        return mailboxIds.stream()
-            .map(findMailbox.sneakyThrow())
+    private Mono<Boolean> allMailboxOwned(List<MailboxId> mailboxIds, 
MailboxSession session) {
+        return Flux.fromIterable(mailboxIds)
+            .concatMap(id ->  mailboxManager.getMailboxReactive(id, session))
             .map(Throwing.function(MessageManager::getMailboxPath))
-            .allMatch(path -> path.belongsTo(session));
+            .all(path -> path.belongsTo(session));
     }
 
-    private MessageWithId handleOutboxMessages(CreationMessageEntry entry, 
MailboxSession session) throws MailboxException, MessagingException, 
IOException {
-        assertUserCanSendFrom(session.getUser(), entry.getValue().getFrom());
-        MetaDataWithContent newMessage = 
messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
-        MessageFullView jmapMessage = 
messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
-        Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage);
-        messageSender.sendMessage(newMessage, envelope, session)
-            
.then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session))
-            .block();
-        return new ValueWithId.MessageWithId(entry.getCreationId(), 
jmapMessage);
+    private Mono<MessageWithId> handleOutboxMessages(CreationMessageEntry 
entry, MailboxSession session) {
+        return assertUserCanSendFrom(session.getUser(), 
entry.getValue().getFrom())
+            .then(Mono.fromCallable(() -> 
messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session))
+                .subscribeOn(Schedulers.elastic())
+                .flatMap(newMessage ->
+                    messageFullViewFactory.fromMetaDataWithContent(newMessage)
+                        .flatMap(Throwing.function((MessageFullView 
jmapMessage) -> {
+                            Envelope envelope = 
EnvelopeUtils.fromMessage(jmapMessage);
+                            return messageSender.sendMessage(newMessage, 
envelope, session)
+                                
.then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session))
+                                .thenReturn(new 
ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage));
+                        }).sneakyThrow())));
     }
 
     @VisibleForTesting
-    void assertUserCanSendFrom(Username connectedUser, Optional<DraftEmailer> 
from) throws MailboxSendingNotAllowedException {
-
-        Optional<Username> maybeFromUser = from.flatMap(DraftEmailer::getEmail)
-            .map(Username::of);
-
-        if (!canSendMailUsingIdentity(connectedUser, maybeFromUser)) {
-            String allowedSender = connectedUser.asString();
-            throw new MailboxSendingNotAllowedException(connectedUser, 
maybeFromUser);
-        } else {
-            LOG.debug("{} is allowed to send a mail using {} identity", 
connectedUser.asString(), from);
-        }
+    Mono<Void> assertUserCanSendFrom(Username connectedUser, 
Optional<DraftEmailer> from) {
+        return Mono.fromRunnable(Throwing.runnable(() -> {
+            Optional<Username> maybeFromUser = 
from.flatMap(DraftEmailer::getEmail)
+                .map(Username::of);
+
+            if (!canSendMailUsingIdentity(connectedUser, maybeFromUser)) {
+                String allowedSender = connectedUser.asString();
+                throw new MailboxSendingNotAllowedException(connectedUser, 
maybeFromUser);
+            } else {
+                LOG.debug("{} is allowed to send a mail using {} identity", 
connectedUser.asString(), from);
+            }
+        }).sneakyThrow()).subscribeOn(Schedulers.elastic())
+            .then();
     }
 
     private boolean canSendMailUsingIdentity(Username connectedUser, 
Optional<Username> maybeFromUser) {
@@ -321,28 +312,28 @@ public class SetMessagesCreationProcessor implements 
SetMessagesProcessor {
                 .isPresent();
     }
 
-    private MessageWithId handleDraftMessages(CreationMessageEntry entry, 
MailboxSession session) throws MailboxException, MessagingException, 
IOException {
-        MetaDataWithContent newMessage = 
messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session);
-        MessageFullView jmapMessage = 
messageFullViewFactory.fromMetaDataWithContent(newMessage).block();
-        return new ValueWithId.MessageWithId(entry.getCreationId(), 
jmapMessage);
+    private Mono<MessageWithId> handleDraftMessages(CreationMessageEntry 
entry, MailboxSession session) {
+        return Mono.fromCallable(() -> 
messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session))
+            .subscribeOn(Schedulers.elastic())
+            .flatMap(messageFullViewFactory::fromMetaDataWithContent)
+            .map(jmapMessage -> new 
ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage));
     }
 
-    private boolean isAppendToMailboxWithRole(Role role, CreationMessage 
entry, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<Boolean> isAppendToMailboxWithRole(Role role, CreationMessage 
entry, MailboxSession mailboxSession) {
         return getMailboxWithRole(mailboxSession, role)
-                .map(entry::isOnlyIn)
-                .orElse(false);
+            .map(entry::isOnlyIn)
+            .switchIfEmpty(Mono.just(false));
     }
 
-    private boolean isTargettingAMailboxWithRole(Role role, CreationMessage 
entry, MailboxSession mailboxSession) throws MailboxException {
+    private Mono<Boolean> isTargettingAMailboxWithRole(Role role, 
CreationMessage entry, MailboxSession mailboxSession) {
         return getMailboxWithRole(mailboxSession, role)
-                .map(entry::isIn)
-                .orElse(false);
+            .map(entry::isIn)
+            .switchIfEmpty(Mono.just(false));
     }
 
-    private Optional<MessageManager> getMailboxWithRole(MailboxSession 
mailboxSession, Role role) throws MailboxException {
+    private Mono<MessageManager> getMailboxWithRole(MailboxSession 
mailboxSession, Role role) {
         return Flux.from(systemMailboxesProvider.getMailboxByRole(role, 
mailboxSession.getUser()))
-            .toStream()
-            .findFirst();
+            .next();
     }
 
     private SetError buildSetErrorFromValidationResult(List<ValidationResult> 
validationErrors) {
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
index 6a222c3..0f02550 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java
@@ -31,6 +31,7 @@ import org.apache.james.mailbox.MessageIdManager;
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.Role;
 import org.apache.james.mailbox.SystemMailboxesProvider;
+import org.apache.james.mailbox.exception.MailboxRoleNotFoundException;
 import org.apache.james.mailbox.model.FetchGroup;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
@@ -134,6 +135,7 @@ public class PostDequeueDecorator extends 
MailQueueItemDecorator {
     private void moveFromOutboxToSentWithSeenFlag(MessageId messageId, 
MailboxSession mailboxSession) {
         assertMessageBelongsToOutbox(messageId, mailboxSession)
             .then(getSentMailboxId(mailboxSession)
+                .switchIfEmpty(Mono.error(new 
MailboxRoleNotFoundException(Role.SENT)))
                 .flatMap(sentMailboxId ->
                         
Mono.from(messageIdManager.setInMailboxesReactive(messageId,
                             ImmutableList.of(sentMailboxId), mailboxSession))
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java
index f831bed..2edec9f 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java
@@ -19,7 +19,6 @@
 package org.apache.james.jmap.draft.send.exception;
 
 import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.queue.api.MailQueue.MailQueueException;
 
 public class MailShouldBeInOutboxException extends RuntimeException {
 
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java
index 79e34f7..620ef87 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java
@@ -75,7 +75,7 @@ public class AttachmentCheckerTest {
                     
Attachment.builder().size(12L).type("image/jpeg").blobId(unknownBlobId).build())
                     .build()
             ),
-            session))
+            session).block())
             .isInstanceOf(AttachmentsNotFoundException.class);
     }
 
@@ -92,7 +92,7 @@ public class AttachmentCheckerTest {
                     
Attachment.builder().size(12L).type("image/jpeg").blobId(blobId).build())
                     .build()
             ),
-            session);
+            session).block();
     }
 
     @Test
@@ -113,7 +113,7 @@ public class AttachmentCheckerTest {
                     
Attachment.builder().size(23L).type("image/git").blobId(unknownBlobId2).build())
                     .build()
             ),
-            session))
+            session).block())
             .isInstanceOf(AttachmentsNotFoundException.class)
             .matches(e -> 
((AttachmentsNotFoundException)e).getAttachmentIds().containsAll(ImmutableSet.of(unknownBlobId1,
 unknownBlobId2)));
     }
@@ -136,7 +136,7 @@ public class AttachmentCheckerTest {
                     
Attachment.builder().size(23L).type("image/git").blobId(blobId2).build())
                     .build()
             ),
-            session);
+            session).block();
     }
 
     @Test
@@ -157,7 +157,7 @@ public class AttachmentCheckerTest {
                     
Attachment.builder().size(23L).type("image/git").blobId(unknownBlobId2).build())
                     .build()
             ),
-            session))
+            session).block())
             .isInstanceOf(AttachmentsNotFoundException.class)
             .matches(e -> ((AttachmentsNotFoundException)e).getAttachmentIds()
                 .containsAll(ImmutableSet.of(unknownBlobId2)));
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java
index 6e4c162..d06c545 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java
@@ -94,6 +94,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class SetMessagesCreationProcessorTest {
     private static final Username USER = Username.of("u...@example.com");
@@ -163,6 +164,7 @@ public class SetMessagesCreationProcessorTest {
             messageIdManager,
             new MemoryMessageFastViewProjection(new RecordingMetricFactory()));
         mockedMailSpool = mock(MailSpool.class);
+        when(mockedMailSpool.send(any(), any())).thenReturn(Mono.empty());
         mockedAttachmentManager = mock(AttachmentManager.class);
         mockedMailboxManager = mock(MailboxManager.class);
         mockedMailboxIdFactory = mock(Factory.class);
@@ -190,6 +192,8 @@ public class SetMessagesCreationProcessorTest {
             .thenReturn(OUTBOX_ID);
         when(mockedMailboxManager.getMailbox(OUTBOX_ID, session))
             .thenReturn(outbox);
+        when(mockedMailboxManager.getMailboxReactive(OUTBOX_ID, session))
+            .thenReturn(Mono.just(outbox));
         
         when(outbox.getId()).thenReturn(OUTBOX_ID);
         when(outbox.getMailboxPath()).thenReturn(MailboxPath.forUser(USER, 
OUTBOX));
@@ -344,6 +348,8 @@ public class SetMessagesCreationProcessorTest {
                 .build();
         when(mockedMailboxManager.getMailbox(any(MailboxId.class), any()))
             .thenReturn(drafts);
+        when(mockedMailboxManager.getMailboxReactive(any(MailboxId.class), 
any()))
+            .thenReturn(Mono.just(drafts));
         
when(mockedMailboxIdFactory.fromString(anyString())).thenReturn(DRAFTS_ID);
         
         sut.process(createMessageInDrafts, session);
@@ -360,7 +366,7 @@ public class SetMessagesCreationProcessorTest {
         when(mockedMailboxIdFactory.fromString(mailboxId.serialize()))
             .thenReturn(mailboxId);
 
-        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session));
+        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block());
     }
 
     @Test
@@ -374,36 +380,36 @@ public class SetMessagesCreationProcessorTest {
         when(mailbox.getMailboxPath())
             .thenThrow(new MailboxException());
 
-        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session));
+        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block());
     }
 
     @Test
     public void 
assertIsUserOwnerOfMailboxesShouldThrowWhenUserIsNotTheOwnerOfTheMailbox() 
throws Exception {
         InMemoryId mailboxId = InMemoryId.of(6789);
         MessageManager mailbox = mock(MessageManager.class);
-        when(mockedMailboxManager.getMailbox(mailboxId, session))
-            .thenReturn(mailbox);
+        when(mockedMailboxManager.getMailboxReactive(mailboxId, session))
+            .thenReturn(Mono.just(mailbox));
         when(mockedMailboxIdFactory.fromString(mailboxId.serialize()))
             .thenReturn(mailboxId);
         when(mailbox.getMailboxPath())
             
.thenReturn(MailboxPath.forUser(Username.of("otheru...@example.com"), 
mailboxId.serialize()));
 
-        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session))
-            .isInstanceOf(MailboxNotOwnedException.class);
+        assertThatThrownBy(() -> 
sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block())
+            .hasCauseInstanceOf(MailboxNotOwnedException.class);
     }
 
     @Test
     public void 
assertIsUserOwnerOfMailboxesShouldNotThrowWhenUserIsTheOwnerOfTheMailbox() 
throws Exception {
         InMemoryId mailboxId = InMemoryId.of(6789);
         MessageManager mailbox = mock(MessageManager.class);
-        when(mockedMailboxManager.getMailbox(mailboxId, session))
-            .thenReturn(mailbox);
+        when(mockedMailboxManager.getMailboxReactive(mailboxId, session))
+            .thenReturn(Mono.just(mailbox));
         when(mockedMailboxIdFactory.fromString(mailboxId.serialize()))
             .thenReturn(mailboxId);
         when(mailbox.getMailboxPath())
             .thenReturn(MailboxPath.forUser(USER, mailboxId.serialize()));
 
-        sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session);
+        sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), 
session).block();
     }
 
     @Test
@@ -414,8 +420,8 @@ public class SetMessagesCreationProcessorTest {
             .email("ot...@example.com")
             .build();
 
-        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
-            .isInstanceOf(MailboxSendingNotAllowedException.class);
+        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
+            .hasCauseInstanceOf(MailboxSendingNotAllowedException.class);
     }
 
     @Test
@@ -426,7 +432,7 @@ public class SetMessagesCreationProcessorTest {
             .email(USER.asString())
             .build();
 
-        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
+        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
             .doesNotThrowAnyException();
     }
 
@@ -440,8 +446,8 @@ public class SetMessagesCreationProcessorTest {
 
         recipientRewriteTable.addAliasMapping(MappingSource.fromUser("alias", 
"example.com"), OTHER_USER.asString());
 
-        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
-            .isInstanceOf(MailboxSendingNotAllowedException.class);
+        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
+            .hasCauseInstanceOf(MailboxSendingNotAllowedException.class);
     }
 
     @Test
@@ -454,7 +460,7 @@ public class SetMessagesCreationProcessorTest {
 
         recipientRewriteTable.addAliasMapping(MappingSource.fromUser("alias", 
"example.com"), USER.asString());
 
-        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
+        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
             .doesNotThrowAnyException();
     }
 
@@ -468,7 +474,7 @@ public class SetMessagesCreationProcessorTest {
 
            
recipientRewriteTable.addMapping(MappingSource.fromDomain(Domain.of("other.org")),
 Mapping.domainAlias(Domain.of("example.com")));
 
-        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
+        assertThatCode(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
             .doesNotThrowAnyException();
     }
 
@@ -482,8 +488,8 @@ public class SetMessagesCreationProcessorTest {
 
         recipientRewriteTable.addGroupMapping(MappingSource.fromUser("group", 
"example.com"), USER.asString());
 
-        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)))
-            .isInstanceOf(MailboxSendingNotAllowedException.class);
+        assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, 
Optional.of(sender)).block())
+            .hasCauseInstanceOf(MailboxSendingNotAllowedException.class);
     }
 
     public static class TestSystemMailboxesProvider implements 
SystemMailboxesProvider {
diff --git 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java
 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java
index 8366beb..27d411d 100644
--- 
a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java
+++ 
b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java
@@ -19,6 +19,7 @@
 package org.apache.james.jmap.draft.send;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -64,6 +65,9 @@ import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class PostDequeueDecoratorTest {
     private static final String OUTBOX = DefaultMailboxes.OUTBOX;
     private static final String SENT = DefaultMailboxes.SENT;
@@ -271,19 +275,24 @@ public class PostDequeueDecoratorTest {
 
         ImmutableList<MessageResult> allMessages = 
ImmutableList.copyOf(messageManager.getMessages(MessageRange.all(), 
FetchGroup.MINIMAL, mailboxSession));
 
-        when(messageIdManager.getMessage(eq(messageId.getMessageId()), 
eq(FetchGroup.MINIMAL), any(MailboxSession.class))).thenReturn(allMessages);
+        when(messageIdManager.getMessagesReactive(any(), 
eq(FetchGroup.MINIMAL), any(MailboxSession.class)))
+            .thenReturn(Flux.fromIterable(allMessages));
+        
when(messageIdManager.setInMailboxesReactive(eq(messageId.getMessageId()), 
eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)))
+            .thenReturn(Mono.empty());
+        when(messageIdManager.setFlagsReactive(eq(new Flags(Flag.SEEN)), 
eq(MessageManager.FlagsUpdateMode.ADD), eq(messageId.getMessageId()), 
eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)))
+            .thenReturn(Mono.empty());
 
         testee.done(true);
         testee.done(true);
 
-        verify(messageIdManager, 
times(1)).getMessage(eq(messageId.getMessageId()), eq(FetchGroup.MINIMAL), 
any(MailboxSession.class));
-        verify(messageIdManager, 
times(1)).setInMailboxes(eq(messageId.getMessageId()), 
eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class));
-        verify(messageIdManager, times(1)).setFlags(eq(new Flags(Flag.SEEN)), 
eq(MessageManager.FlagsUpdateMode.ADD), eq(messageId.getMessageId()), 
eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class));
+        verify(messageIdManager, times(1)).getMessagesReactive(any(), 
eq(FetchGroup.MINIMAL), any(MailboxSession.class));
+        verify(messageIdManager, 
times(1)).setInMailboxesReactive(eq(messageId.getMessageId()), 
eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class));
+        verify(messageIdManager, times(1)).setFlagsReactive(eq(new 
Flags(Flag.SEEN)), eq(MessageManager.FlagsUpdateMode.ADD), 
eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), 
any(MailboxSession.class));
 
         verifyNoMoreInteractions(messageIdManager);
     }
 
-    @Test(expected = MailQueue.MailQueueException.class)
+    @Test
     public void doneShouldThrowWhenMailboxException() throws Exception {
         MessageIdManager messageIdManager = mock(MessageIdManager.class);
         testee = new PostDequeueDecorator(mockedMailQueueItem, mailboxManager, 
new InMemoryMessageId.Factory(),
@@ -297,9 +306,11 @@ public class PostDequeueDecoratorTest {
         
mail.setAttribute(messageIdAttribute(messageId.getMessageId().serialize()));
         mail.setAttribute(USERNAME_ATTRIBUTE);
 
-        when(messageIdManager.getMessage(eq(messageId.getMessageId()), 
eq(FetchGroup.MINIMAL), 
any(MailboxSession.class))).thenThrow(MailboxException.class);
+        when(messageIdManager.getMessagesReactive(any(), 
eq(FetchGroup.MINIMAL), any(MailboxSession.class)))
+            .thenReturn(Flux.error(new MailboxException()));
 
-        testee.done(true);
+        assertThatThrownBy(() -> testee.done(true))
+            .hasCauseInstanceOf(MailboxException.class);
     }
 
     private Attribute messageIdAttribute(String  value) {

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to