This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 134a0e0f891dae8f3d76e9a31ee088f5f6f0232f Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Sun May 23 11:52:45 2021 +0700 [PERFORMANCE] Reactify SetMessagesCreationProcessor Blocking calls are still made to check message attachments and append the message. --- .../exceptions/AttachmentsNotFoundException.java | 2 +- .../exceptions/MessageHasNoMailboxException.java | 24 -- .../jmap/draft/methods/AttachmentChecker.java | 18 +- .../methods/SetMessagesCreationProcessor.java | 319 ++++++++++----------- .../jmap/draft/send/PostDequeueDecorator.java | 2 + .../exception/MailShouldBeInOutboxException.java | 1 - .../jmap/draft/methods/AttachmentCheckerTest.java | 10 +- .../methods/SetMessagesCreationProcessorTest.java | 42 +-- .../jmap/draft/send/PostDequeueDecoratorTest.java | 25 +- 9 files changed, 217 insertions(+), 226 deletions(-) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java index be0bf0a..3b7abfa 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/AttachmentsNotFoundException.java @@ -25,7 +25,7 @@ import org.apache.james.jmap.draft.model.BlobId; import com.google.common.collect.ImmutableList; -public class AttachmentsNotFoundException extends Exception { +public class AttachmentsNotFoundException extends RuntimeException { private List<BlobId> attachmentIds; diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java deleted file mode 100644 index db184c8..0000000 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/exceptions/MessageHasNoMailboxException.java +++ /dev/null @@ -1,24 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.jmap.draft.exceptions; - -public class MessageHasNoMailboxException extends RuntimeException { - -} diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java index a65076e..9f2d104 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/AttachmentChecker.java @@ -35,6 +35,9 @@ import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.predicates.ThrowingPredicate; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class AttachmentChecker { private final AttachmentManager attachmentManager; @@ -43,12 +46,15 @@ public class AttachmentChecker { this.attachmentManager = attachmentManager; } - public void assertAttachmentsExist(ValueWithId.CreationMessageEntry entry, MailboxSession session) throws AttachmentsNotFoundException, MailboxException { - List<Attachment> attachments = entry.getValue().getAttachments(); - List<BlobId> notFounds = listAttachmentsNotFound(attachments, session); - if (!notFounds.isEmpty()) { - throw new AttachmentsNotFoundException(notFounds); - } + public Mono<Void> assertAttachmentsExist(ValueWithId.CreationMessageEntry entry, MailboxSession session) { + return Mono.fromRunnable(Throwing.runnable(() -> { + List<Attachment> attachments = entry.getValue().getAttachments(); + List<BlobId> notFounds = listAttachmentsNotFound(attachments, session); + if (!notFounds.isEmpty()) { + throw new AttachmentsNotFoundException(notFounds); + } + })).subscribeOn(Schedulers.elastic()) + .then(); } private List<BlobId> listAttachmentsNotFound(List<Attachment> attachments, MailboxSession session) throws MailboxException { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java index 7c876ce..3e8a3d5 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessor.java @@ -35,7 +35,6 @@ import org.apache.james.jmap.draft.exceptions.AttachmentsNotFoundException; import org.apache.james.jmap.draft.exceptions.InvalidDraftKeywordsException; import org.apache.james.jmap.draft.exceptions.InvalidMailboxForCreationException; import org.apache.james.jmap.draft.exceptions.MailboxNotOwnedException; -import org.apache.james.jmap.draft.exceptions.MessageHasNoMailboxException; import org.apache.james.jmap.draft.methods.ValueWithId.CreationMessageEntry; import org.apache.james.jmap.draft.methods.ValueWithId.MessageWithId; import org.apache.james.jmap.draft.model.CreationMessage; @@ -50,7 +49,6 @@ import org.apache.james.jmap.draft.model.SetMessagesResponse; import org.apache.james.jmap.draft.model.SetMessagesResponse.Builder; import org.apache.james.jmap.draft.model.message.view.MessageFullView; import org.apache.james.jmap.draft.model.message.view.MessageFullViewFactory; -import org.apache.james.jmap.draft.model.message.view.MessageFullViewFactory.MetaDataWithContent; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageManager; @@ -61,20 +59,20 @@ import org.apache.james.mailbox.exception.MailboxNotFoundException; import org.apache.james.mailbox.exception.OverQuotaException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.metrics.api.MetricFactory; -import org.apache.james.metrics.api.TimeMetric; import org.apache.james.rrt.api.CanSendFrom; import org.apache.james.server.core.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; -import com.github.fge.lambdas.functions.FunctionChainer; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class SetMessagesCreationProcessor implements SetMessagesProcessor { @@ -116,104 +114,92 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor { } @Override - public SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) { - TimeMetric timeMetric = metricFactory.timer(JMAP_PREFIX + "SetMessageCreationProcessor"); + public Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) { + return Mono.from(metricFactory.decoratePublisherWithTimerMetric(JMAP_PREFIX + "SetMessageCreationProcessor", + Flux.fromIterable(request.getCreate()) + .flatMap(create -> handleCreate(create, mailboxSession)) + .reduce(Builder::mergeWith) + .switchIfEmpty(Mono.just(SetMessagesResponse.builder())) + .map(Builder::build))); + } - SetMessagesResponse result = request.getCreate() - .stream() - .map(create -> handleCreate(create, mailboxSession)) - .reduce(SetMessagesResponse.builder(), Builder::mergeWith) - .build(); + private Mono<Builder> handleCreate(CreationMessageEntry create, MailboxSession mailboxSession) { + List<MailboxId> mailboxIds = toMailboxIds(create); - timeMetric.stopAndPublish(); - return result; - } + if (mailboxIds.isEmpty()) { + return Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.INVALID_PROPERTIES) + .properties(MessageProperty.mailboxIds) + .description("Message needs to be in at least one mailbox") + .build())); + } - private Builder handleCreate(CreationMessageEntry create, MailboxSession mailboxSession) { - try { - List<MailboxId> mailboxIds = toMailboxIds(create); - assertAtLeastOneMailbox(mailboxIds); - assertIsUserOwnerOfMailboxes(mailboxIds, mailboxSession); - return performCreate(create, mailboxSession); - } catch (MailboxSendingNotAllowedException e) { - LOG.debug("{} is not allowed to send a mail using {} identity", e.getConnectedUser().asString(), e.getFromField()); + return assertIsUserOwnerOfMailboxes(mailboxIds, mailboxSession) + .then(performCreate(create, mailboxSession)) + .onErrorResume(MailboxSendingNotAllowedException.class, e -> { + LOG.debug("{} is not allowed to send a mail using {} identity", e.getConnectedUser().asString(), e.getFromField()); - return SetMessagesResponse.builder().notCreated(create.getCreationId(), + return Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), SetError.builder() .type(SetError.Type.INVALID_PROPERTIES) .properties(MessageProperty.from) .description("Invalid 'from' field. One accepted value is " + - e.getConnectedUser().asString()) - .build()); - - } catch (InvalidDraftKeywordsException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), + e.getConnectedUser().asString()) + .build())); + }) + .onErrorResume(InvalidDraftKeywordsException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), SetError.builder() .type(SetError.Type.INVALID_PROPERTIES) .properties(MessageProperty.keywords) .description(e.getMessage()) - .build()); - - } catch (AttachmentsNotFoundException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - SetMessagesError.builder() - .type(SetError.Type.INVALID_PROPERTIES) - .properties(MessageProperty.attachments) - .attachmentsNotFound(e.getAttachmentIds()) - .description("Attachment not found") - .build()); - - } catch (InvalidMailboxForCreationException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - SetError.builder() - .type(SetError.Type.INVALID_PROPERTIES) - .properties(MessageProperty.mailboxIds) - .description("Message creation is only supported in mailboxes with role Draft and Outbox") - .build()); - - } catch (MessageHasNoMailboxException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - SetError.builder() - .type(SetError.Type.INVALID_PROPERTIES) - .properties(MessageProperty.mailboxIds) - .description("Message needs to be in at least one mailbox") - .build()); - - } catch (MailboxInvalidMessageCreationException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - buildSetErrorFromValidationResult(create.getValue().validate())); - - } catch (MailboxNotFoundException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - SetError.builder() - .type(SetError.Type.ERROR) - .description(e.getMessage()) - .build()); - - } catch (MailboxNotOwnedException e) { - LOG.error("Appending message in an unknown mailbox", e); - return SetMessagesResponse.builder().notCreated(create.getCreationId(), + .build()))) + .onErrorResume(AttachmentsNotFoundException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetMessagesError.builder() + .type(SetError.Type.INVALID_PROPERTIES) + .properties(MessageProperty.attachments) + .attachmentsNotFound(e.getAttachmentIds()) + .description("Attachment not found") + .build()))) + .onErrorResume(InvalidMailboxForCreationException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.INVALID_PROPERTIES) + .properties(MessageProperty.mailboxIds) + .description("Message creation is only supported in mailboxes with role Draft and Outbox") + .build()))) + .onErrorResume(MailboxInvalidMessageCreationException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + buildSetErrorFromValidationResult(create.getValue().validate())))) + .onErrorResume(MailboxNotFoundException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.ERROR) + .description(e.getMessage()) + .build()))) + .onErrorResume(MailboxNotOwnedException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), SetError.builder() .type(SetError.Type.ERROR) .properties(MessageProperty.mailboxIds) .description("MailboxId invalid") - .build()); - - } catch (OverQuotaException e) { - return SetMessagesResponse.builder().notCreated(create.getCreationId(), + .build()))) + .onErrorResume(OverQuotaException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), SetError.builder() .type(SetError.Type.MAX_QUOTA_REACHED) .description(e.getMessage()) - .build()); - - } catch (MailboxException | MessagingException | IOException e) { - LOG.error("Unexpected error while creating message", e); - return SetMessagesResponse.builder().notCreated(create.getCreationId(), - SetError.builder() - .type(SetError.Type.ERROR) - .description("unexpected error") - .build()); - } + .build()))) + .onErrorResume(MailboxException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.ERROR) + .description("unexpected error") + .build()))) + .onErrorResume(MessagingException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.ERROR) + .description("unexpected error") + .build()))) + .onErrorResume(IOException.class, e -> Mono.just(SetMessagesResponse.builder().notCreated(create.getCreationId(), + SetError.builder() + .type(SetError.Type.ERROR) + .description("unexpected error") + .build()))); } private ImmutableList<MailboxId> toMailboxIds(CreationMessageEntry create) { @@ -224,95 +210,100 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor { .collect(Guavate.toImmutableList()); } - private Builder performCreate(CreationMessageEntry entry, MailboxSession session) - throws MailboxException, MessagingException, AttachmentsNotFoundException, IOException { - - if (isAppendToMailboxWithRole(Role.OUTBOX, entry.getValue(), session)) { - return sendMailViaOutbox(entry, session); - } else if (entry.getValue().isDraft()) { - assertNoOutbox(entry, session); - return saveDraft(entry, session); - } else { - if (isAppendToMailboxWithRole(Role.DRAFTS, entry.getValue(), session)) { - throw new InvalidDraftKeywordsException("A draft message should be flagged as Draft"); - } - throw new InvalidMailboxForCreationException("The only implemented feature is sending via outbox and draft saving"); - } - } - - private void assertNoOutbox(CreationMessageEntry entry, MailboxSession session) throws MailboxException { - if (isTargettingAMailboxWithRole(Role.OUTBOX, entry.getValue(), session)) { - throw new InvalidMailboxForCreationException("Mailbox ids can combine Outbox with other mailbox"); - } + private Mono<Builder> performCreate(CreationMessageEntry entry, MailboxSession session) { + return isAppendToMailboxWithRole(Role.OUTBOX, entry.getValue(), session) + .flatMap(isAppendToMailboxWithRole -> { + if (isAppendToMailboxWithRole) { + return sendMailViaOutbox(entry, session); + } else if (entry.getValue().isDraft()) { + return assertNoOutbox(entry, session) + .then(saveDraft(entry, session)); + } else { + return isAppendToMailboxWithRole(Role.DRAFTS, entry.getValue(), session) + .handle((isAppendedToDraft, sink) -> { + if (isAppendedToDraft) { + sink.error(new InvalidDraftKeywordsException("A draft message should be flagged as Draft")); + } else { + sink.error(new InvalidMailboxForCreationException("The only implemented feature is sending via outbox and draft saving")); + } + }); + } + }); } - private void assertAtLeastOneMailbox(List<MailboxId> mailboxIds) throws MailboxException { - if (mailboxIds.isEmpty()) { - throw new MessageHasNoMailboxException(); - } + private Mono<Void> assertNoOutbox(CreationMessageEntry entry, MailboxSession session) { + return isTargettingAMailboxWithRole(Role.OUTBOX, entry.getValue(), session) + .handle((targetsOutbox, sink) -> { + if (targetsOutbox) { + sink.error(new InvalidMailboxForCreationException("Mailbox ids can combine Outbox with other mailbox")); + } + }); } - private Builder sendMailViaOutbox(CreationMessageEntry entry, MailboxSession session) - throws AttachmentsNotFoundException, MailboxException, MessagingException, IOException { - - validateArguments(entry, session); - MessageWithId created = handleOutboxMessages(entry, session); - return SetMessagesResponse.builder().created(created.getCreationId(), created.getValue()); + private Mono<Builder> sendMailViaOutbox(CreationMessageEntry entry, MailboxSession session) { + return validateArguments(entry, session) + .then(handleOutboxMessages(entry, session) + .map(created -> SetMessagesResponse.builder().created(created.getCreationId(), created.getValue()))); } - private Builder saveDraft(CreationMessageEntry entry, MailboxSession session) - throws AttachmentsNotFoundException, MailboxException, MessagingException, IOException { - - attachmentChecker.assertAttachmentsExist(entry, session); - MessageWithId created = handleDraftMessages(entry, session); - return SetMessagesResponse.builder().created(created.getCreationId(), created.getValue()); + private Mono<Builder> saveDraft(CreationMessageEntry entry, MailboxSession session) { + return attachmentChecker.assertAttachmentsExist(entry, session) + .then(handleDraftMessages(entry, session) + .map(created -> SetMessagesResponse.builder().created(created.getCreationId(), created.getValue()))); } - private void validateArguments(CreationMessageEntry entry, MailboxSession session) throws MailboxInvalidMessageCreationException, AttachmentsNotFoundException, MailboxException { + private Mono<Void> validateArguments(CreationMessageEntry entry, MailboxSession session) { CreationMessage message = entry.getValue(); if (!message.isValid()) { - throw new MailboxInvalidMessageCreationException(); + return Mono.error(new MailboxInvalidMessageCreationException()); } - attachmentChecker.assertAttachmentsExist(entry, session); + return attachmentChecker.assertAttachmentsExist(entry, session); } - @VisibleForTesting void assertIsUserOwnerOfMailboxes(List<MailboxId> mailboxIds, MailboxSession session) throws MailboxNotOwnedException { - if (!allMailboxOwned(mailboxIds, session)) { - throw new MailboxNotOwnedException(); - } + @VisibleForTesting Mono<Void> assertIsUserOwnerOfMailboxes(List<MailboxId> mailboxIds, MailboxSession session) { + return allMailboxOwned(mailboxIds, session) + .handle((allOwned, sink) -> { + if (!allOwned) { + sink.error(new MailboxNotOwnedException()); + } + }); } - private boolean allMailboxOwned(List<MailboxId> mailboxIds, MailboxSession session) { - FunctionChainer<MailboxId, MessageManager> findMailbox = Throwing.function(mailboxId -> mailboxManager.getMailbox(mailboxId, session)); - return mailboxIds.stream() - .map(findMailbox.sneakyThrow()) + private Mono<Boolean> allMailboxOwned(List<MailboxId> mailboxIds, MailboxSession session) { + return Flux.fromIterable(mailboxIds) + .concatMap(id -> mailboxManager.getMailboxReactive(id, session)) .map(Throwing.function(MessageManager::getMailboxPath)) - .allMatch(path -> path.belongsTo(session)); + .all(path -> path.belongsTo(session)); } - private MessageWithId handleOutboxMessages(CreationMessageEntry entry, MailboxSession session) throws MailboxException, MessagingException, IOException { - assertUserCanSendFrom(session.getUser(), entry.getValue().getFrom()); - MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session); - MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block(); - Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage); - messageSender.sendMessage(newMessage, envelope, session) - .then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session)) - .block(); - return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage); + private Mono<MessageWithId> handleOutboxMessages(CreationMessageEntry entry, MailboxSession session) { + return assertUserCanSendFrom(session.getUser(), entry.getValue().getFrom()) + .then(Mono.fromCallable(() -> messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session)) + .subscribeOn(Schedulers.elastic()) + .flatMap(newMessage -> + messageFullViewFactory.fromMetaDataWithContent(newMessage) + .flatMap(Throwing.function((MessageFullView jmapMessage) -> { + Envelope envelope = EnvelopeUtils.fromMessage(jmapMessage); + return messageSender.sendMessage(newMessage, envelope, session) + .then(referenceUpdater.updateReferences(entry.getValue().getHeaders(), session)) + .thenReturn(new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage)); + }).sneakyThrow()))); } @VisibleForTesting - void assertUserCanSendFrom(Username connectedUser, Optional<DraftEmailer> from) throws MailboxSendingNotAllowedException { - - Optional<Username> maybeFromUser = from.flatMap(DraftEmailer::getEmail) - .map(Username::of); - - if (!canSendMailUsingIdentity(connectedUser, maybeFromUser)) { - String allowedSender = connectedUser.asString(); - throw new MailboxSendingNotAllowedException(connectedUser, maybeFromUser); - } else { - LOG.debug("{} is allowed to send a mail using {} identity", connectedUser.asString(), from); - } + Mono<Void> assertUserCanSendFrom(Username connectedUser, Optional<DraftEmailer> from) { + return Mono.fromRunnable(Throwing.runnable(() -> { + Optional<Username> maybeFromUser = from.flatMap(DraftEmailer::getEmail) + .map(Username::of); + + if (!canSendMailUsingIdentity(connectedUser, maybeFromUser)) { + String allowedSender = connectedUser.asString(); + throw new MailboxSendingNotAllowedException(connectedUser, maybeFromUser); + } else { + LOG.debug("{} is allowed to send a mail using {} identity", connectedUser.asString(), from); + } + }).sneakyThrow()).subscribeOn(Schedulers.elastic()) + .then(); } private boolean canSendMailUsingIdentity(Username connectedUser, Optional<Username> maybeFromUser) { @@ -321,28 +312,28 @@ public class SetMessagesCreationProcessor implements SetMessagesProcessor { .isPresent(); } - private MessageWithId handleDraftMessages(CreationMessageEntry entry, MailboxSession session) throws MailboxException, MessagingException, IOException { - MetaDataWithContent newMessage = messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session); - MessageFullView jmapMessage = messageFullViewFactory.fromMetaDataWithContent(newMessage).block(); - return new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage); + private Mono<MessageWithId> handleDraftMessages(CreationMessageEntry entry, MailboxSession session) { + return Mono.fromCallable(() -> messageAppender.appendMessageInMailboxes(entry, toMailboxIds(entry), session)) + .subscribeOn(Schedulers.elastic()) + .flatMap(messageFullViewFactory::fromMetaDataWithContent) + .map(jmapMessage -> new ValueWithId.MessageWithId(entry.getCreationId(), jmapMessage)); } - private boolean isAppendToMailboxWithRole(Role role, CreationMessage entry, MailboxSession mailboxSession) throws MailboxException { + private Mono<Boolean> isAppendToMailboxWithRole(Role role, CreationMessage entry, MailboxSession mailboxSession) { return getMailboxWithRole(mailboxSession, role) - .map(entry::isOnlyIn) - .orElse(false); + .map(entry::isOnlyIn) + .switchIfEmpty(Mono.just(false)); } - private boolean isTargettingAMailboxWithRole(Role role, CreationMessage entry, MailboxSession mailboxSession) throws MailboxException { + private Mono<Boolean> isTargettingAMailboxWithRole(Role role, CreationMessage entry, MailboxSession mailboxSession) { return getMailboxWithRole(mailboxSession, role) - .map(entry::isIn) - .orElse(false); + .map(entry::isIn) + .switchIfEmpty(Mono.just(false)); } - private Optional<MessageManager> getMailboxWithRole(MailboxSession mailboxSession, Role role) throws MailboxException { + private Mono<MessageManager> getMailboxWithRole(MailboxSession mailboxSession, Role role) { return Flux.from(systemMailboxesProvider.getMailboxByRole(role, mailboxSession.getUser())) - .toStream() - .findFirst(); + .next(); } private SetError buildSetErrorFromValidationResult(List<ValidationResult> validationErrors) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java index 6a222c3..0f02550 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/PostDequeueDecorator.java @@ -31,6 +31,7 @@ import org.apache.james.mailbox.MessageIdManager; import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.Role; import org.apache.james.mailbox.SystemMailboxesProvider; +import org.apache.james.mailbox.exception.MailboxRoleNotFoundException; import org.apache.james.mailbox.model.FetchGroup; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; @@ -134,6 +135,7 @@ public class PostDequeueDecorator extends MailQueueItemDecorator { private void moveFromOutboxToSentWithSeenFlag(MessageId messageId, MailboxSession mailboxSession) { assertMessageBelongsToOutbox(messageId, mailboxSession) .then(getSentMailboxId(mailboxSession) + .switchIfEmpty(Mono.error(new MailboxRoleNotFoundException(Role.SENT))) .flatMap(sentMailboxId -> Mono.from(messageIdManager.setInMailboxesReactive(messageId, ImmutableList.of(sentMailboxId), mailboxSession)) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java index f831bed..2edec9f 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/send/exception/MailShouldBeInOutboxException.java @@ -19,7 +19,6 @@ package org.apache.james.jmap.draft.send.exception; import org.apache.james.mailbox.model.MessageId; -import org.apache.james.queue.api.MailQueue.MailQueueException; public class MailShouldBeInOutboxException extends RuntimeException { diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java index 79e34f7..620ef87 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/AttachmentCheckerTest.java @@ -75,7 +75,7 @@ public class AttachmentCheckerTest { Attachment.builder().size(12L).type("image/jpeg").blobId(unknownBlobId).build()) .build() ), - session)) + session).block()) .isInstanceOf(AttachmentsNotFoundException.class); } @@ -92,7 +92,7 @@ public class AttachmentCheckerTest { Attachment.builder().size(12L).type("image/jpeg").blobId(blobId).build()) .build() ), - session); + session).block(); } @Test @@ -113,7 +113,7 @@ public class AttachmentCheckerTest { Attachment.builder().size(23L).type("image/git").blobId(unknownBlobId2).build()) .build() ), - session)) + session).block()) .isInstanceOf(AttachmentsNotFoundException.class) .matches(e -> ((AttachmentsNotFoundException)e).getAttachmentIds().containsAll(ImmutableSet.of(unknownBlobId1, unknownBlobId2))); } @@ -136,7 +136,7 @@ public class AttachmentCheckerTest { Attachment.builder().size(23L).type("image/git").blobId(blobId2).build()) .build() ), - session); + session).block(); } @Test @@ -157,7 +157,7 @@ public class AttachmentCheckerTest { Attachment.builder().size(23L).type("image/git").blobId(unknownBlobId2).build()) .build() ), - session)) + session).block()) .isInstanceOf(AttachmentsNotFoundException.class) .matches(e -> ((AttachmentsNotFoundException)e).getAttachmentIds() .containsAll(ImmutableSet.of(unknownBlobId2))); diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java index 6e4c162..d06c545 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesCreationProcessorTest.java @@ -94,6 +94,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class SetMessagesCreationProcessorTest { private static final Username USER = Username.of("u...@example.com"); @@ -163,6 +164,7 @@ public class SetMessagesCreationProcessorTest { messageIdManager, new MemoryMessageFastViewProjection(new RecordingMetricFactory())); mockedMailSpool = mock(MailSpool.class); + when(mockedMailSpool.send(any(), any())).thenReturn(Mono.empty()); mockedAttachmentManager = mock(AttachmentManager.class); mockedMailboxManager = mock(MailboxManager.class); mockedMailboxIdFactory = mock(Factory.class); @@ -190,6 +192,8 @@ public class SetMessagesCreationProcessorTest { .thenReturn(OUTBOX_ID); when(mockedMailboxManager.getMailbox(OUTBOX_ID, session)) .thenReturn(outbox); + when(mockedMailboxManager.getMailboxReactive(OUTBOX_ID, session)) + .thenReturn(Mono.just(outbox)); when(outbox.getId()).thenReturn(OUTBOX_ID); when(outbox.getMailboxPath()).thenReturn(MailboxPath.forUser(USER, OUTBOX)); @@ -344,6 +348,8 @@ public class SetMessagesCreationProcessorTest { .build(); when(mockedMailboxManager.getMailbox(any(MailboxId.class), any())) .thenReturn(drafts); + when(mockedMailboxManager.getMailboxReactive(any(MailboxId.class), any())) + .thenReturn(Mono.just(drafts)); when(mockedMailboxIdFactory.fromString(anyString())).thenReturn(DRAFTS_ID); sut.process(createMessageInDrafts, session); @@ -360,7 +366,7 @@ public class SetMessagesCreationProcessorTest { when(mockedMailboxIdFactory.fromString(mailboxId.serialize())) .thenReturn(mailboxId); - assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session)); + assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block()); } @Test @@ -374,36 +380,36 @@ public class SetMessagesCreationProcessorTest { when(mailbox.getMailboxPath()) .thenThrow(new MailboxException()); - assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session)); + assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block()); } @Test public void assertIsUserOwnerOfMailboxesShouldThrowWhenUserIsNotTheOwnerOfTheMailbox() throws Exception { InMemoryId mailboxId = InMemoryId.of(6789); MessageManager mailbox = mock(MessageManager.class); - when(mockedMailboxManager.getMailbox(mailboxId, session)) - .thenReturn(mailbox); + when(mockedMailboxManager.getMailboxReactive(mailboxId, session)) + .thenReturn(Mono.just(mailbox)); when(mockedMailboxIdFactory.fromString(mailboxId.serialize())) .thenReturn(mailboxId); when(mailbox.getMailboxPath()) .thenReturn(MailboxPath.forUser(Username.of("otheru...@example.com"), mailboxId.serialize())); - assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session)) - .isInstanceOf(MailboxNotOwnedException.class); + assertThatThrownBy(() -> sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block()) + .hasCauseInstanceOf(MailboxNotOwnedException.class); } @Test public void assertIsUserOwnerOfMailboxesShouldNotThrowWhenUserIsTheOwnerOfTheMailbox() throws Exception { InMemoryId mailboxId = InMemoryId.of(6789); MessageManager mailbox = mock(MessageManager.class); - when(mockedMailboxManager.getMailbox(mailboxId, session)) - .thenReturn(mailbox); + when(mockedMailboxManager.getMailboxReactive(mailboxId, session)) + .thenReturn(Mono.just(mailbox)); when(mockedMailboxIdFactory.fromString(mailboxId.serialize())) .thenReturn(mailboxId); when(mailbox.getMailboxPath()) .thenReturn(MailboxPath.forUser(USER, mailboxId.serialize())); - sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session); + sut.assertIsUserOwnerOfMailboxes(ImmutableList.of(mailboxId), session).block(); } @Test @@ -414,8 +420,8 @@ public class SetMessagesCreationProcessorTest { .email("ot...@example.com") .build(); - assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) - .isInstanceOf(MailboxSendingNotAllowedException.class); + assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) + .hasCauseInstanceOf(MailboxSendingNotAllowedException.class); } @Test @@ -426,7 +432,7 @@ public class SetMessagesCreationProcessorTest { .email(USER.asString()) .build(); - assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) + assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) .doesNotThrowAnyException(); } @@ -440,8 +446,8 @@ public class SetMessagesCreationProcessorTest { recipientRewriteTable.addAliasMapping(MappingSource.fromUser("alias", "example.com"), OTHER_USER.asString()); - assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) - .isInstanceOf(MailboxSendingNotAllowedException.class); + assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) + .hasCauseInstanceOf(MailboxSendingNotAllowedException.class); } @Test @@ -454,7 +460,7 @@ public class SetMessagesCreationProcessorTest { recipientRewriteTable.addAliasMapping(MappingSource.fromUser("alias", "example.com"), USER.asString()); - assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) + assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) .doesNotThrowAnyException(); } @@ -468,7 +474,7 @@ public class SetMessagesCreationProcessorTest { recipientRewriteTable.addMapping(MappingSource.fromDomain(Domain.of("other.org")), Mapping.domainAlias(Domain.of("example.com"))); - assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) + assertThatCode(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) .doesNotThrowAnyException(); } @@ -482,8 +488,8 @@ public class SetMessagesCreationProcessorTest { recipientRewriteTable.addGroupMapping(MappingSource.fromUser("group", "example.com"), USER.asString()); - assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender))) - .isInstanceOf(MailboxSendingNotAllowedException.class); + assertThatThrownBy(() -> sut.assertUserCanSendFrom(USER, Optional.of(sender)).block()) + .hasCauseInstanceOf(MailboxSendingNotAllowedException.class); } public static class TestSystemMailboxesProvider implements SystemMailboxesProvider { diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java index 8366beb..27d411d 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/send/PostDequeueDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.james.jmap.draft.send; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -64,6 +65,9 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class PostDequeueDecoratorTest { private static final String OUTBOX = DefaultMailboxes.OUTBOX; private static final String SENT = DefaultMailboxes.SENT; @@ -271,19 +275,24 @@ public class PostDequeueDecoratorTest { ImmutableList<MessageResult> allMessages = ImmutableList.copyOf(messageManager.getMessages(MessageRange.all(), FetchGroup.MINIMAL, mailboxSession)); - when(messageIdManager.getMessage(eq(messageId.getMessageId()), eq(FetchGroup.MINIMAL), any(MailboxSession.class))).thenReturn(allMessages); + when(messageIdManager.getMessagesReactive(any(), eq(FetchGroup.MINIMAL), any(MailboxSession.class))) + .thenReturn(Flux.fromIterable(allMessages)); + when(messageIdManager.setInMailboxesReactive(eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class))) + .thenReturn(Mono.empty()); + when(messageIdManager.setFlagsReactive(eq(new Flags(Flag.SEEN)), eq(MessageManager.FlagsUpdateMode.ADD), eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class))) + .thenReturn(Mono.empty()); testee.done(true); testee.done(true); - verify(messageIdManager, times(1)).getMessage(eq(messageId.getMessageId()), eq(FetchGroup.MINIMAL), any(MailboxSession.class)); - verify(messageIdManager, times(1)).setInMailboxes(eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)); - verify(messageIdManager, times(1)).setFlags(eq(new Flags(Flag.SEEN)), eq(MessageManager.FlagsUpdateMode.ADD), eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)); + verify(messageIdManager, times(1)).getMessagesReactive(any(), eq(FetchGroup.MINIMAL), any(MailboxSession.class)); + verify(messageIdManager, times(1)).setInMailboxesReactive(eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)); + verify(messageIdManager, times(1)).setFlagsReactive(eq(new Flags(Flag.SEEN)), eq(MessageManager.FlagsUpdateMode.ADD), eq(messageId.getMessageId()), eq(ImmutableList.of(sentMailboxId)), any(MailboxSession.class)); verifyNoMoreInteractions(messageIdManager); } - @Test(expected = MailQueue.MailQueueException.class) + @Test public void doneShouldThrowWhenMailboxException() throws Exception { MessageIdManager messageIdManager = mock(MessageIdManager.class); testee = new PostDequeueDecorator(mockedMailQueueItem, mailboxManager, new InMemoryMessageId.Factory(), @@ -297,9 +306,11 @@ public class PostDequeueDecoratorTest { mail.setAttribute(messageIdAttribute(messageId.getMessageId().serialize())); mail.setAttribute(USERNAME_ATTRIBUTE); - when(messageIdManager.getMessage(eq(messageId.getMessageId()), eq(FetchGroup.MINIMAL), any(MailboxSession.class))).thenThrow(MailboxException.class); + when(messageIdManager.getMessagesReactive(any(), eq(FetchGroup.MINIMAL), any(MailboxSession.class))) + .thenReturn(Flux.error(new MailboxException())); - testee.done(true); + assertThatThrownBy(() -> testee.done(true)) + .hasCauseInstanceOf(MailboxException.class); } private Attribute messageIdAttribute(String value) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org