This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2402634b5cb32f11d80dd78e6b996a5aac9a22d1 Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 15 09:56:42 2021 +0700 [REFACTORING] JMAP draft setMessages destroy is now fully reactive --- .../org/apache/james/mailbox/MessageIdManager.java | 4 +- .../james/mailbox/store/StoreMessageIdManager.java | 2 +- .../methods/SetMessagesDestructionProcessor.java | 74 ++++++++++------------ .../jmap/draft/methods/SetMessagesProcessor.java | 4 +- 4 files changed, 39 insertions(+), 45 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 218819a..272d94c 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -66,7 +66,7 @@ public interface MessageIdManager { DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; - Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException; + Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession); void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; @@ -76,7 +76,7 @@ public interface MessageIdManager { return getMessages(ImmutableList.of(messageId), fetchGroup, mailboxSession); } - default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException { + default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) { return Mono.from(delete(ImmutableList.of(messageId), mailboxSession)) .subscribeOn(Schedulers.elastic()) .block(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index 11ca8ff..47d44a0 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -223,7 +223,7 @@ public class StoreMessageIdManager implements MessageIdManager { } @Override - public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException { + public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java index 5a90a6b..2564aa7 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java @@ -22,29 +22,27 @@ package org.apache.james.jmap.draft.methods; import static org.apache.james.jmap.draft.methods.Method.JMAP_PREFIX; import java.util.List; -import java.util.stream.Stream; import javax.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.jmap.draft.model.SetError; import org.apache.james.jmap.draft.model.SetMessagesRequest; import org.apache.james.jmap.draft.model.SetMessagesResponse; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageIdManager; -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.DeleteResult; import org.apache.james.mailbox.model.MessageId; import org.apache.james.metrics.api.MetricFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public class SetMessagesDestructionProcessor implements SetMessagesProcessor { - private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class); private final MessageIdManager messageIdManager; @@ -58,44 +56,38 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor { } @Override - public SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) { - return metricFactory.decorateSupplierWithTimerMetric(JMAP_PREFIX + "SetMessageDestructionProcessor", - () -> delete(request.getDestroy(), mailboxSession) - .reduce(SetMessagesResponse.builder(), - SetMessagesResponse.Builder::accumulator, - SetMessagesResponse.Builder::combiner) - .build()); + public Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) { + return Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + "SetMessageDestructionProcessor", + delete(request.getDestroy(), mailboxSession))); } - - private Stream<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) { - try { - if (toBeDestroyed.isEmpty()) { - return Stream.empty(); - } - DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession)) - .subscribeOn(Schedulers.elastic()) - .block(); - - Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream() - .map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build()); - Stream<SetMessagesResponse> notFound = deleteResult.getNotFound().stream() - .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId, - SetError.builder() - .type(SetError.Type.NOT_FOUND) - .description("The message " + messageId.serialize() + " can't be found") - .build()) - .build()); - return Stream.concat(destroyed, notFound); - } catch (MailboxException e) { - LOGGER.error("An error occurred when deleting a message", e); - return toBeDestroyed.stream() - .map(messageId -> SetMessagesResponse.builder().notDestroyed(messageId, - SetError.builder() - .type(SetError.Type.ERROR) - .description("An error occurred while deleting messages " + messageId.serialize()) - .build()) - .build()); + private Mono<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, MailboxSession mailboxSession) { + if (toBeDestroyed.isEmpty()) { + return Mono.just(SetMessagesResponse.builder().build()); } + return Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession)) + .map(deleteResult -> SetMessagesResponse.builder() + .destroyed(ImmutableList.copyOf(deleteResult.getDestroyed())) + .notDestroyed(deleteResult.getNotFound().stream() + .map(messageId -> Pair.of(messageId, + SetError.builder() + .type(SetError.Type.NOT_FOUND) + .description("The message " + messageId.serialize() + " can't be found") + .build())) + .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue))) + .build()) + .onErrorResume(e -> { + LOGGER.error("An error occurred when deleting a message", e); + return Mono.just( + SetMessagesResponse.builder() + .notDestroyed(toBeDestroyed.stream() + .map(messageId -> Pair.of(messageId, + SetError.builder() + .type(SetError.Type.ERROR) + .description("An error occurred while deleting messages " + messageId.serialize()) + .build())) + .collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue))) + .build()); + }); } } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java index 787d9f8..0d34019 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java @@ -27,7 +27,9 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public interface SetMessagesProcessor { - SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession); + default SetMessagesResponse process(SetMessagesRequest request, MailboxSession mailboxSession) { + return processReactive(request, mailboxSession).block(); + } default Mono<SetMessagesResponse> processReactive(SetMessagesRequest request, MailboxSession mailboxSession) { return Mono.fromCallable(() -> process(request, mailboxSession)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
