This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit da19351ec92eecb9d2eedc0b80d5539c46ecd5fa Author: Benoit Tellier <[email protected]> AuthorDate: Sat May 1 22:19:23 2021 +0700 JAMES-3575 MessageIdManager::delete should be reactive End to end reactive chain for JMAP RFC-8621 deletes... --- .../org/apache/james/mailbox/MessageIdManager.java | 8 +++-- .../james/vault/DeletedMessageVaultHookTest.java | 6 ++-- .../james/mailbox/store/StoreMessageIdManager.java | 38 +++++++++++++--------- .../store/AbstractCombinationManagerTest.java | 6 +++- .../AbstractMessageIdManagerSideEffectTest.java | 7 ++-- .../store/AbstractMessageIdManagerStorageTest.java | 2 +- .../methods/SetMessagesDestructionProcessor.java | 7 +++- .../jmap/method/EmailSetDeletePerformer.scala | 2 +- 8 files changed, 51 insertions(+), 25 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java index 26a53cb..6fef5d4 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java @@ -38,6 +38,8 @@ import org.reactivestreams.Publisher; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public interface MessageIdManager { default Publisher<ComposedMessageIdWithMetaData> messageMetadata(MessageId id, MailboxSession session) { @@ -62,7 +64,7 @@ public interface MessageIdManager { DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; - DeleteResult delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException; + Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession mailboxSession) throws MailboxException; void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, MailboxSession mailboxSession) throws MailboxException; @@ -71,7 +73,9 @@ public interface MessageIdManager { } default DeleteResult delete(MessageId messageId, MailboxSession mailboxSession) throws MailboxException { - return delete(ImmutableList.of(messageId), mailboxSession); + return Mono.from(delete(ImmutableList.of(messageId), mailboxSession)) + .subscribeOn(Schedulers.elastic()) + .block(); } } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java index 7251a98..79e919d 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java @@ -65,6 +65,8 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; class DeletedMessageVaultHookTest { @@ -159,7 +161,7 @@ class DeletedMessageVaultHookTest { MessageId messageId = composedId.getMessageId(); long messageSize = messageSize(messageManager, composedId); - messageIdManager.delete(ImmutableList.of(messageId), aliceSession); + messageIdManager.delete(messageId, aliceSession); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize); assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).blockFirst()) @@ -175,7 +177,7 @@ class DeletedMessageVaultHookTest { .mapToObj(Throwing.intFunction(i -> appendMessage(messageManager).getMessageId())) .collect(Guavate.toImmutableList()); - assertThatCode(() -> messageIdManager.delete(ids, aliceSession)) + assertThatCode(() -> Mono.from(messageIdManager.delete(ids, aliceSession)).subscribeOn(Schedulers.elastic()).block()) .doesNotThrowAnyException(); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java index d34b4a7..116e91b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java @@ -187,10 +187,14 @@ public class StoreMessageIdManager implements MessageIdManager { } private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) throws MailboxException { - return MailboxReactorUtils.block(Flux.fromStream(idList) + return MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, rights)); + } + + private Mono<ImmutableSet<MailboxId>> getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> idList, Right... rights) { + return Flux.fromStream(idList) .distinct() .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), DEFAULT_CONCURRENCY) - .collect(Guavate.toImmutableSet())); + .collect(Guavate.toImmutableSet()); } @Override @@ -215,14 +219,21 @@ public class StoreMessageIdManager implements MessageIdManager { } @Override - public DeleteResult delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException { + public Mono<DeleteResult> delete(List<MessageId> messageIds, MailboxSession mailboxSession) throws MailboxException { MessageIdMapper messageIdMapper = mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession); - List<MailboxMessage> messageList = messageIdMapper.find(messageIds, MessageMapper.FetchType.Metadata); - ImmutableSet<MailboxId> allowedMailboxIds = getAllowedMailboxIds(mailboxSession, messageList - .stream() - .map(MailboxMessage::getMailboxId), Right.DeleteMessages); + return messageIdMapper.findReactive(messageIds, MessageMapper.FetchType.Metadata) + .collectList() + .flatMap(messageList -> + getAllowedMailboxIdsReactive(mailboxSession, + messageList + .stream() + .map(MailboxMessage::getMailboxId), + Right.DeleteMessages) + .flatMap(allowedMailboxIds -> deleteInAllowedMailboxes(messageIds, mailboxSession, messageIdMapper, messageList, allowedMailboxIds))); + } + private Mono<DeleteResult> deleteInAllowedMailboxes(List<MessageId> messageIds, MailboxSession mailboxSession, MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, ImmutableSet<MailboxId> allowedMailboxIds) { List<MailboxMessage> accessibleMessages = messageList.stream() .filter(message -> allowedMailboxIds.contains(message.getMailboxId())) .collect(Guavate.toImmutableList()); @@ -232,14 +243,11 @@ public class StoreMessageIdManager implements MessageIdManager { .collect(Guavate.toImmutableSet()); Sets.SetView<MessageId> nonAccessibleMessages = Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds); - deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession) - .subscribeOn(Schedulers.elastic()) - .block(); - - return DeleteResult.builder() - .addDestroyed(accessibleMessageIds) - .addNotFound(nonAccessibleMessages) - .build(); + return deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession) + .thenReturn(DeleteResult.builder() + .addDestroyed(accessibleMessageIds) + .addNotFound(nonAccessibleMessages) + .build()); } private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, List<MailboxMessage> messageList, MailboxSession mailboxSession) { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java index bdddff1..56c1ba3 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java @@ -53,6 +53,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public abstract class AbstractCombinationManagerTest { @@ -534,7 +536,9 @@ public abstract class AbstractCombinationManagerTest { .appendMessage(MessageManager.AppendCommand.from(mailContent), session) .getId().getMessageId(); - messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session); + Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session)) + .subscribeOn(Schedulers.elastic()) + .block(); SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty(); diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java index 7a1d087..a6011de 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java @@ -83,6 +83,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public abstract class AbstractMessageIdManagerSideEffectTest { private static final Quota<QuotaCountLimit, QuotaCountUsage> OVER_QUOTA = Quota.<QuotaCountLimit, QuotaCountUsage>builder() @@ -169,7 +170,9 @@ public abstract class AbstractMessageIdManagerSideEffectTest { MessageMetaData simpleMessageMetaData2 = messageResult2.messageMetaData(); eventBus.register(eventCollector); - messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session); + Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session)) + .subscribeOn(Schedulers.elastic()) + .block(); AbstractListAssert<?, List<? extends Expunged>, Expunged, ObjectAssert<Expunged>> events = assertThat(eventCollector.getEvents()) @@ -523,7 +526,7 @@ public abstract class AbstractMessageIdManagerSideEffectTest { MessageId messageId = testingData.createNotUsedMessageId(); eventBus.register(eventCollector); - messageIdManager.delete(ImmutableList.of(messageId), session); + messageIdManager.delete(messageId, session); assertThat(eventCollector.getEvents()).isEmpty(); } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java index d9beb5b..a154ae9 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java @@ -363,7 +363,7 @@ public abstract class AbstractMessageIdManagerStorageTest { void deleteAllShouldReturnNotDeleteWhenDeletingOnOtherSession() throws Exception { MessageId messageId = testingData.persist(bobMailbox1.getMailboxId(), messageUid1, FLAGS, bobSession); - messageIdManager.delete(ImmutableList.of(messageId), aliceSession); + messageIdManager.delete(messageId, aliceSession); assertThat(messageIdManager.getMessage(messageId, FetchGroup.MINIMAL, bobSession)).hasSize(1); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java index a961a73..5a90a6b 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java @@ -40,6 +40,9 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class SetMessagesDestructionProcessor implements SetMessagesProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(SetMessagesCreationProcessor.class); @@ -70,7 +73,9 @@ public class SetMessagesDestructionProcessor implements SetMessagesProcessor { if (toBeDestroyed.isEmpty()) { return Stream.empty(); } - DeleteResult deleteResult = messageIdManager.delete(toBeDestroyed, mailboxSession); + DeleteResult deleteResult = Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession)) + .subscribeOn(Schedulers.elastic()) + .block(); Stream<SetMessagesResponse> destroyed = deleteResult.getDestroyed().stream() .map(messageId -> SetMessagesResponse.builder().destroyed(messageId).build()); diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala index 14f106a..6908f7c 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala @@ -85,7 +85,7 @@ class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager, case _ => None } - SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession)) + SMono(messageIdManager.delete(messageIds.toList.asJava, mailboxSession)) .map(DestroyResult.from) .subscribeOn(Schedulers.elastic()) .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e)))) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
