This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit da19351ec92eecb9d2eedc0b80d5539c46ecd5fa
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 1 22:19:23 2021 +0700

    JAMES-3575 MessageIdManager::delete should be reactive
    
    End to end reactive chain for JMAP RFC-8621
    deletes...
---
 .../org/apache/james/mailbox/MessageIdManager.java |  8 +++--
 .../james/vault/DeletedMessageVaultHookTest.java   |  6 ++--
 .../james/mailbox/store/StoreMessageIdManager.java | 38 +++++++++++++---------
 .../store/AbstractCombinationManagerTest.java      |  6 +++-
 .../AbstractMessageIdManagerSideEffectTest.java    |  7 ++--
 .../store/AbstractMessageIdManagerStorageTest.java |  2 +-
 .../methods/SetMessagesDestructionProcessor.java   |  7 +++-
 .../jmap/method/EmailSetDeletePerformer.scala      |  2 +-
 8 files changed, 51 insertions(+), 25 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 26a53cb..6fef5d4 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -38,6 +38,8 @@ import org.reactivestreams.Publisher;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface MessageIdManager {
     default Publisher<ComposedMessageIdWithMetaData> messageMetadata(MessageId 
id, MailboxSession session) {
@@ -62,7 +64,7 @@ public interface MessageIdManager {
 
     DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, 
MailboxSession mailboxSession) throws MailboxException;
 
-    DeleteResult delete(List<MessageId> messageId, MailboxSession 
mailboxSession) throws MailboxException;
+    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession 
mailboxSession) throws MailboxException;
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, 
MailboxSession mailboxSession) throws MailboxException;
 
@@ -71,7 +73,9 @@ public interface MessageIdManager {
     }
 
     default DeleteResult delete(MessageId messageId, MailboxSession 
mailboxSession) throws MailboxException {
-        return delete(ImmutableList.of(messageId), mailboxSession);
+        return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
+            .subscribeOn(Schedulers.elastic())
+            .block();
     }
 
 }
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 7251a98..79e919d 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -65,6 +65,8 @@ import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class DeletedMessageVaultHookTest {
 
@@ -159,7 +161,7 @@ class DeletedMessageVaultHookTest {
         MessageId messageId = composedId.getMessageId();
         long messageSize = messageSize(messageManager, composedId);
 
-        messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+        messageIdManager.delete(messageId, aliceSession);
 
         DeletedMessage deletedMessage = 
buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, 
messageSize);
         assertThat(Flux.from(messageVault.search(ALICE, 
Query.ALL)).blockFirst())
@@ -175,7 +177,7 @@ class DeletedMessageVaultHookTest {
             .mapToObj(Throwing.intFunction(i -> 
appendMessage(messageManager).getMessageId()))
             .collect(Guavate.toImmutableList());
 
-        assertThatCode(() -> messageIdManager.delete(ids, aliceSession))
+        assertThatCode(() -> Mono.from(messageIdManager.delete(ids, 
aliceSession)).subscribeOn(Schedulers.elastic()).block())
             .doesNotThrowAnyException();
     }
 
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index d34b4a7..116e91b 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -187,10 +187,14 @@ public class StoreMessageIdManager implements 
MessageIdManager {
     }
 
     private ImmutableSet<MailboxId> getAllowedMailboxIds(MailboxSession 
mailboxSession, Stream<MailboxId> idList, Right... rights) throws 
MailboxException {
-        return MailboxReactorUtils.block(Flux.fromStream(idList)
+        return 
MailboxReactorUtils.block(getAllowedMailboxIdsReactive(mailboxSession, idList, 
rights));
+    }
+
+    private Mono<ImmutableSet<MailboxId>> 
getAllowedMailboxIdsReactive(MailboxSession mailboxSession, Stream<MailboxId> 
idList, Right... rights) {
+        return Flux.fromStream(idList)
             .distinct()
             .filterWhen(hasRightsOnMailboxReactive(mailboxSession, rights), 
DEFAULT_CONCURRENCY)
-            .collect(Guavate.toImmutableSet()));
+            .collect(Guavate.toImmutableSet());
     }
 
     @Override
@@ -215,14 +219,21 @@ public class StoreMessageIdManager implements 
MessageIdManager {
     }
 
     @Override
-    public DeleteResult delete(List<MessageId> messageIds, MailboxSession 
mailboxSession) throws MailboxException {
+    public Mono<DeleteResult> delete(List<MessageId> messageIds, 
MailboxSession mailboxSession) throws MailboxException {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
-        List<MailboxMessage> messageList = messageIdMapper.find(messageIds, 
MessageMapper.FetchType.Metadata);
-        ImmutableSet<MailboxId> allowedMailboxIds = 
getAllowedMailboxIds(mailboxSession, messageList
-            .stream()
-            .map(MailboxMessage::getMailboxId), Right.DeleteMessages);
+        return messageIdMapper.findReactive(messageIds, 
MessageMapper.FetchType.Metadata)
+            .collectList()
+            .flatMap(messageList ->
+                getAllowedMailboxIdsReactive(mailboxSession,
+                    messageList
+                        .stream()
+                        .map(MailboxMessage::getMailboxId),
+                    Right.DeleteMessages)
+                .flatMap(allowedMailboxIds -> 
deleteInAllowedMailboxes(messageIds, mailboxSession, messageIdMapper, 
messageList, allowedMailboxIds)));
+    }
 
+    private Mono<DeleteResult> deleteInAllowedMailboxes(List<MessageId> 
messageIds, MailboxSession mailboxSession, MessageIdMapper messageIdMapper, 
List<MailboxMessage> messageList, ImmutableSet<MailboxId> allowedMailboxIds) {
         List<MailboxMessage> accessibleMessages = messageList.stream()
             .filter(message -> 
allowedMailboxIds.contains(message.getMailboxId()))
             .collect(Guavate.toImmutableList());
@@ -232,14 +243,11 @@ public class StoreMessageIdManager implements 
MessageIdManager {
             .collect(Guavate.toImmutableSet());
         Sets.SetView<MessageId> nonAccessibleMessages = 
Sets.difference(ImmutableSet.copyOf(messageIds), accessibleMessageIds);
 
-        deleteWithPreHooks(messageIdMapper, accessibleMessages, mailboxSession)
-            .subscribeOn(Schedulers.elastic())
-            .block();
-
-        return DeleteResult.builder()
-            .addDestroyed(accessibleMessageIds)
-            .addNotFound(nonAccessibleMessages)
-            .build();
+        return deleteWithPreHooks(messageIdMapper, accessibleMessages, 
mailboxSession)
+            .thenReturn(DeleteResult.builder()
+                .addDestroyed(accessibleMessageIds)
+                .addNotFound(nonAccessibleMessages)
+                .build());
     }
 
     private Mono<Void> deleteWithPreHooks(MessageIdMapper messageIdMapper, 
List<MailboxMessage> messageList, MailboxSession mailboxSession) {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index bdddff1..56c1ba3 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -53,6 +53,8 @@ import org.junit.jupiter.api.Test;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public abstract class AbstractCombinationManagerTest {
 
@@ -534,7 +536,9 @@ public abstract class AbstractCombinationManagerTest {
             .appendMessage(MessageManager.AppendCommand.from(mailContent), 
session)
             .getId().getMessageId();
 
-        messageIdManager.delete(ImmutableList.of(messageId1, messageId2), 
session);
+        Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, 
messageId2), session))
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
         assertThat(Flux.from(messageManager1.search(searchQuery, 
session)).toStream()).isEmpty();
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
index 7a1d087..a6011de 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerSideEffectTest.java
@@ -83,6 +83,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedMap;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public abstract class AbstractMessageIdManagerSideEffectTest {
     private static final Quota<QuotaCountLimit, QuotaCountUsage> OVER_QUOTA = 
Quota.<QuotaCountLimit, QuotaCountUsage>builder()
@@ -169,7 +170,9 @@ public abstract class 
AbstractMessageIdManagerSideEffectTest {
         MessageMetaData simpleMessageMetaData2 = 
messageResult2.messageMetaData();
 
         eventBus.register(eventCollector);
-        messageIdManager.delete(ImmutableList.of(messageId1, messageId2), 
session);
+        Mono.from(messageIdManager.delete(ImmutableList.of(messageId1, 
messageId2), session))
+            .subscribeOn(Schedulers.elastic())
+            .block();
 
         AbstractListAssert<?, List<? extends Expunged>, Expunged, 
ObjectAssert<Expunged>> events =
             assertThat(eventCollector.getEvents())
@@ -523,7 +526,7 @@ public abstract class 
AbstractMessageIdManagerSideEffectTest {
         MessageId messageId = testingData.createNotUsedMessageId();
 
         eventBus.register(eventCollector);
-        messageIdManager.delete(ImmutableList.of(messageId), session);
+        messageIdManager.delete(messageId, session);
 
         assertThat(eventCollector.getEvents()).isEmpty();
     }
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
index d9beb5b..a154ae9 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractMessageIdManagerStorageTest.java
@@ -363,7 +363,7 @@ public abstract class AbstractMessageIdManagerStorageTest {
     void deleteAllShouldReturnNotDeleteWhenDeletingOnOtherSession() throws 
Exception {
         MessageId messageId = testingData.persist(bobMailbox1.getMailboxId(), 
messageUid1, FLAGS, bobSession);
 
-        messageIdManager.delete(ImmutableList.of(messageId), aliceSession);
+        messageIdManager.delete(messageId, aliceSession);
 
         assertThat(messageIdManager.getMessage(messageId, FetchGroup.MINIMAL, 
bobSession)).hasSize(1);
     }
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index a961a73..5a90a6b 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -40,6 +40,9 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
@@ -70,7 +73,9 @@ public class SetMessagesDestructionProcessor implements 
SetMessagesProcessor {
             if (toBeDestroyed.isEmpty()) {
                 return Stream.empty();
             }
-            DeleteResult deleteResult = messageIdManager.delete(toBeDestroyed, 
mailboxSession);
+            DeleteResult deleteResult = 
Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
+                .subscribeOn(Schedulers.elastic())
+                .block();
 
             Stream<SetMessagesResponse> destroyed = 
deleteResult.getDestroyed().stream()
                 .map(messageId -> 
SetMessagesResponse.builder().destroyed(messageId).build());
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
index 14f106a..6908f7c 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala
@@ -85,7 +85,7 @@ class EmailSetDeletePerformer @Inject()(messageIdManager: 
MessageIdManager,
         case _ => None
       }
 
-      SMono.fromCallable(() => 
messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
+      SMono(messageIdManager.delete(messageIds.toList.asJava, mailboxSession))
         .map(DestroyResult.from)
         .subscribeOn(Schedulers.elastic())
         .onErrorResume(e => SMono.just(messageIds.map(id => 
DestroyFailure(EmailSet.asUnparsed(id), e))))

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to