This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit df8c6e7adc7d870cbd2518275a2f4dc6edc4b7fa Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 15 12:41:55 2021 +0700 [REFACTORING] SetMessagesUpdateProcessor: Reactify updates --- .../org/apache/james/mailbox/MessageIdManager.java | 2 +- .../inmemory/mail/InMemoryMessageIdMapper.java | 6 +- .../james/mailbox/store/mail/MessageIdMapper.java | 4 +- .../draft/methods/SetMessagesUpdateProcessor.java | 130 ++++++++++----------- .../jmap/draft/methods/SetMailboxesMethodTest.java | 12 +- .../methods/SetMessagesUpdateProcessorTest.java | 4 +- 6 files changed, 81 insertions(+), 77 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 272d94c..4bb6060 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -52,7 +52,7 @@ public interface MessageIdManager { void setFlags(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; - Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; + Publisher<Void> setFlagsReactive(Flags newState, FlagsUpdateMode replace, MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession); List<MessageResult> getMessages(Collection<MessageId> messageIds, FetchGroup minimal, MailboxSession mailboxSession) throws MailboxException; diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index 8be1835..569b1ae 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -53,6 +53,7 @@ import com.google.common.collect.Multimap; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class InMemoryMessageIdMapper implements MessageIdMapper { private final MailboxMapper mailboxMapper; @@ -135,14 +136,15 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { @Override public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, FlagsUpdateMode updateMode) { - return Mono.fromCallable(() -> find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) + return Mono.<Multimap<MailboxId, UpdatedFlags>>fromCallable(() -> find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) .stream() .filter(message -> mailboxIds.contains(message.getMailboxId())) .map(updateMessage(newState, updateMode)) .distinct() .collect(Guavate.toImmutableListMultimap( Pair::getKey, - Pair::getValue))); + Pair::getValue))) + .subscribeOn(Schedulers.elastic()); } private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) { diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java index 02740cd..4af711b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java @@ -40,6 +40,7 @@ import com.google.common.collect.Multimap; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public interface MessageIdMapper { @@ -58,7 +59,8 @@ public interface MessageIdMapper { void copyInMailbox(MailboxMessage mailboxMessage, Mailbox mailbox) throws MailboxException; default Mono<Void> copyInMailboxReactive(MailboxMessage mailboxMessage, Mailbox mailbox) { - return Mono.fromRunnable(Throwing.runnable(() -> copyInMailbox(mailboxMessage, mailbox)).sneakyThrow()); + return Mono.<Void>fromRunnable(Throwing.runnable(() -> copyInMailbox(mailboxMessage, mailbox)).sneakyThrow()) + .subscribeOn(Schedulers.elastic()); } void delete(MessageId messageId); diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java index fea0e2a..5884cab 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessor.java @@ -221,7 +221,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .reduce(SetMessagesResponse.Builder::mergeWith) .orElse(SetMessagesResponse.builder()); } - }) .reduce(SetMessagesResponse.Builder::mergeWith) + }).reduce(SetMessagesResponse.Builder::mergeWith) .orElse(SetMessagesResponse.builder()); } else { return messages.keySet().stream() @@ -270,7 +270,7 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .reduce(SetMessagesResponse.Builder::mergeWith) .orElse(SetMessagesResponse.builder()); } - }) .reduce(SetMessagesResponse.Builder::mergeWith) + }).reduce(SetMessagesResponse.Builder::mergeWith) .orElse(SetMessagesResponse.builder()); } else { return messages.keySet().stream() @@ -280,74 +280,73 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { } } - private SetMessagesResponse.Builder update(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, Multimap<MessageId, ComposedMessageIdWithMetaData> metadata) { + private Mono<SetMessagesResponse.Builder> update(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, Multimap<MessageId, ComposedMessageIdWithMetaData> metadata) { try { - SetMessagesResponse.Builder builder = SetMessagesResponse.builder(); List<ComposedMessageIdWithMetaData> messages = Optional.ofNullable(metadata.get(messageId)) .map(ImmutableList::copyOf) .orElse(ImmutableList.of()); assertValidUpdate(messages, updateMessagePatch, outboxes); if (messages.isEmpty()) { - builder.mergeWith(addMessageIdNotFoundToResponse(messageId)); + return Mono.just(SetMessagesResponse.builder().mergeWith(addMessageIdNotFoundToResponse(messageId))); } else { - setInMailboxes(messageId, updateMessagePatch, mailboxSession); - Optional<MailboxException> updateError = messages.stream() - .flatMap(message -> updateFlags(messageId, updateMessagePatch, mailboxSession, message)) - .findAny(); - if (updateError.isPresent()) { - builder.mergeWith(handleMessageUpdateException(messageId, updateError.get())); - } else { - builder.updated(ImmutableList.of(messageId)); - } - builder.mergeWith(sendMessageWhenOutboxInTargetMailboxIds(outboxes, messageId, updateMessagePatch, mailboxSession)); + return setInMailboxes(messageId, updateMessagePatch, mailboxSession) + .then(Flux.fromIterable(messages) + .flatMap(message -> updateFlags(messageId, updateMessagePatch, mailboxSession, message)) + .then()) + .then(Mono.just(SetMessagesResponse.builder().updated(ImmutableList.of(messageId)))) + .flatMap(builder -> sendMessageWhenOutboxInTargetMailboxIds(outboxes, messageId, updateMessagePatch, mailboxSession) + .map(builder::mergeWith)) + .onErrorResume(OverQuotaException.class, e -> Mono.just(SetMessagesResponse.builder().notUpdated(messageId, + SetError.builder() + .type(SetError.Type.MAX_QUOTA_REACHED) + .description(e.getMessage()) + .build()))) + .onErrorResume(MailboxException.class, e -> Mono.just(handleMessageUpdateException(messageId, e))) + .onErrorResume(IOException.class, e -> Mono.just(handleMessageUpdateException(messageId, e))) + .onErrorResume(MessagingException.class, e -> Mono.just(handleMessageUpdateException(messageId, e))) + .onErrorResume(IllegalArgumentException.class, e -> { + ValidationResult invalidPropertyKeywords = ValidationResult.builder() + .property(MessageProperties.MessageProperty.keywords.asFieldName()) + .message(e.getMessage()) + .build(); + + return Mono.just(handleInvalidRequest(messageId, ImmutableList.of(invalidPropertyKeywords), updateMessagePatch)); + }); } - return builder; } catch (InvalidOutboxMoveException e) { ValidationResult invalidPropertyMailboxIds = ValidationResult.builder() .property(MessageProperties.MessageProperty.mailboxIds.asFieldName()) .message(e.getMessage()) .build(); - return handleInvalidRequest(messageId, ImmutableList.of(invalidPropertyMailboxIds), updateMessagePatch); - } catch (OverQuotaException e) { - return SetMessagesResponse.builder().notUpdated(messageId, - SetError.builder() - .type(SetError.Type.MAX_QUOTA_REACHED) - .description(e.getMessage()) - .build()); - } catch (MailboxException | IOException | MessagingException e) { - return handleMessageUpdateException(messageId, e); - } catch (IllegalArgumentException e) { - ValidationResult invalidPropertyKeywords = ValidationResult.builder() - .property(MessageProperties.MessageProperty.keywords.asFieldName()) - .message(e.getMessage()) - .build(); - - return handleInvalidRequest(messageId, ImmutableList.of(invalidPropertyKeywords), updateMessagePatch); + return Mono.just(handleInvalidRequest(messageId, ImmutableList.of(invalidPropertyMailboxIds), updateMessagePatch)); } } - private SetMessagesResponse.Builder sendMessageWhenOutboxInTargetMailboxIds(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession) throws MailboxException, MessagingException, IOException { + private Mono<SetMessagesResponse.Builder> sendMessageWhenOutboxInTargetMailboxIds(Set<MailboxId> outboxes, MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession) { if (isTargetingOutbox(outboxes, listTargetMailboxIds(updateMessagePatch))) { - Optional<MessageResult> maybeMessageToSend = - messageIdManager.getMessage(messageId, FetchGroup.FULL_CONTENT, mailboxSession) - .stream() - .findFirst(); - if (maybeMessageToSend.isPresent()) { - MessageResult messageToSend = maybeMessageToSend.get(); - MailImpl mail = buildMailFromMessage(messageToSend); - Optional<Username> fromUser = mail.getMaybeSender() - .asOptional() - .map(Username::fromMailAddress); - assertUserCanSendFrom(mailboxSession.getUser(), fromUser); - messageSender.sendMessage(messageId, mail, mailboxSession); - referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession); - } else { - return addMessageIdNotFoundToResponse(messageId); - } + return Mono.fromCallable(() -> { + Optional<MessageResult> maybeMessageToSend = + messageIdManager.getMessage(messageId, FetchGroup.FULL_CONTENT, mailboxSession) + .stream() + .findFirst(); + if (maybeMessageToSend.isPresent()) { + MessageResult messageToSend = maybeMessageToSend.get(); + MailImpl mail = buildMailFromMessage(messageToSend); + Optional<Username> fromUser = mail.getMaybeSender() + .asOptional() + .map(Username::fromMailAddress); + assertUserCanSendFrom(mailboxSession.getUser(), fromUser); + messageSender.sendMessage(messageId, mail, mailboxSession); + referenceUpdater.updateReferences(messageToSend.getHeaders(), mailboxSession); + return SetMessagesResponse.builder(); + } else { + return addMessageIdNotFoundToResponse(messageId); + } + }).subscribeOn(Schedulers.elastic()); } - return SetMessagesResponse.builder(); + return Mono.just(SetMessagesResponse.builder()); } @VisibleForTesting @@ -430,20 +429,16 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .collect(Guavate.toImmutableSet()); } - private Stream<MailboxException> updateFlags(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, ComposedMessageIdWithMetaData message) { - try { - if (!updateMessagePatch.isFlagsIdentity()) { - messageIdManager.setFlags( - updateMessagePatch.applyToState(message.getFlags()), - FlagsUpdateMode.REPLACE, messageId, ImmutableList.of(message.getComposedMessageId().getMailboxId()), mailboxSession); - } - return Stream.of(); - } catch (MailboxException e) { - return Stream.of(e); + private Mono<Void> updateFlags(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession, ComposedMessageIdWithMetaData message) { + if (!updateMessagePatch.isFlagsIdentity()) { + return Mono.from(messageIdManager.setFlagsReactive( + updateMessagePatch.applyToState(message.getFlags()), + FlagsUpdateMode.REPLACE, messageId, ImmutableList.of(message.getComposedMessageId().getMailboxId()), mailboxSession)); } + return Mono.empty(); } - private void setInMailboxes(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession) throws MailboxException { + private Mono<Void> setInMailboxes(MessageId messageId, UpdateMessagePatch updateMessagePatch, MailboxSession mailboxSession) { Optional<List<String>> serializedMailboxIds = updateMessagePatch.getMailboxIds(); if (serializedMailboxIds.isPresent()) { List<MailboxId> mailboxIds = serializedMailboxIds.get() @@ -451,17 +446,18 @@ public class SetMessagesUpdateProcessor implements SetMessagesProcessor { .map(mailboxIdFactory::fromString) .collect(Guavate.toImmutableList()); - messageIdManager.setInMailboxes(messageId, mailboxIds, mailboxSession); + return Mono.from(messageIdManager.setInMailboxesReactive(messageId, mailboxIds, mailboxSession)); } + return Mono.empty(); } private SetMessagesResponse.Builder addMessageIdNotFoundToResponse(MessageId messageId) { return SetMessagesResponse.builder().notUpdated(ImmutableMap.of(messageId, - SetError.builder() - .type(SetError.Type.NOT_FOUND) - .properties(ImmutableSet.of(MessageProperties.MessageProperty.id)) - .description("message not found") - .build())); + SetError.builder() + .type(SetError.Type.NOT_FOUND) + .properties(ImmutableSet.of(MessageProperties.MessageProperty.id)) + .description("message not found") + .build())); } private SetMessagesResponse.Builder handleMessageUpdateException(MessageId messageId, diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java index ea0ce18..23f2f21 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMailboxesMethodTest.java @@ -42,6 +42,8 @@ import org.junit.Test; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class SetMailboxesMethodTest { private static final ImmutableSet<SetMailboxesProcessor> NO_PROCESSOR = ImmutableSet.of(); @@ -106,11 +108,12 @@ public class SetMailboxesMethodTest { MailboxSession session = mock(MailboxSession.class); SetMailboxesProcessor creatorProcessor = mock(SetMailboxesProcessor.class); - when(creatorProcessor.process(creationRequest, session)).thenReturn(creationResponse); + when(creatorProcessor.processReactive(creationRequest, session)).thenReturn(Mono.just(creationResponse)); Stream<JmapResponse> actual = new SetMailboxesMethod(ImmutableSet.of(creatorProcessor), TIME_METRIC_FACTORY) - .processToStream(creationRequest, MethodCallId.of("methodCallId"), session); + .process(creationRequest, MethodCallId.of("methodCallId"), session) + .toStream(); assertThat(actual).contains(jmapResponse); } @@ -129,11 +132,12 @@ public class SetMailboxesMethodTest { MailboxSession session = mock(MailboxSession.class); SetMailboxesProcessor destructorProcessor = mock(SetMailboxesProcessor.class); - when(destructorProcessor.process(destructionRequest, session)).thenReturn(destructionResponse); + when(destructorProcessor.processReactive(destructionRequest, session)).thenReturn(Mono.just(destructionResponse)); Stream<JmapResponse> actual = new SetMailboxesMethod(ImmutableSet.of(destructorProcessor), TIME_METRIC_FACTORY) - .processToStream(destructionRequest, MethodCallId.of("methodCallId"), session); + .process(destructionRequest, MethodCallId.of("methodCallId"), session) + .toStream(); assertThat(actual).contains(jmapResponse); } diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java index 5ce3a4b..9c643cd 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/SetMessagesUpdateProcessorTest.java @@ -215,7 +215,7 @@ public class SetMessagesUpdateProcessorTest { public void processShouldReturnEmptyUpdatedWhenRequestHasEmptyUpdate() { SetMessagesRequest requestWithEmptyUpdate = SetMessagesRequest.builder().build(); - SetMessagesResponse result = sut.process(requestWithEmptyUpdate, null); + SetMessagesResponse result = sut.process(requestWithEmptyUpdate, session); assertThat(result.getUpdated()).isEmpty(); assertThat(result.getNotUpdated()).isEmpty(); @@ -253,7 +253,7 @@ public class SetMessagesUpdateProcessorTest { .build(); // When - SetMessagesResponse result = sut.process(requestWithInvalidUpdate, null); + SetMessagesResponse result = sut.process(requestWithInvalidUpdate, session); // Then assertThat(result.getNotUpdated()).describedAs("NotUpdated should not be empty").isNotEmpty(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
