This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2402634b5cb32f11d80dd78e6b996a5aac9a22d1
Author: Benoit Tellier <[email protected]>
AuthorDate: Sat May 15 09:56:42 2021 +0700

    [REFACTORING] JMAP draft setMessages destroy is now fully reactive
---
 .../org/apache/james/mailbox/MessageIdManager.java |  4 +-
 .../james/mailbox/store/StoreMessageIdManager.java |  2 +-
 .../methods/SetMessagesDestructionProcessor.java   | 74 ++++++++++------------
 .../jmap/draft/methods/SetMessagesProcessor.java   |  4 +-
 4 files changed, 39 insertions(+), 45 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
index 218819a..272d94c 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageIdManager.java
@@ -66,7 +66,7 @@ public interface MessageIdManager {
 
     DeleteResult delete(MessageId messageId, List<MailboxId> mailboxIds, 
MailboxSession mailboxSession) throws MailboxException;
 
-    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession 
mailboxSession) throws MailboxException;
+    Publisher<DeleteResult> delete(List<MessageId> messageId, MailboxSession 
mailboxSession);
 
     void setInMailboxes(MessageId messageId, Collection<MailboxId> mailboxIds, 
MailboxSession mailboxSession) throws MailboxException;
 
@@ -76,7 +76,7 @@ public interface MessageIdManager {
         return getMessages(ImmutableList.of(messageId), fetchGroup, 
mailboxSession);
     }
 
-    default DeleteResult delete(MessageId messageId, MailboxSession 
mailboxSession) throws MailboxException {
+    default DeleteResult delete(MessageId messageId, MailboxSession 
mailboxSession) {
         return Mono.from(delete(ImmutableList.of(messageId), mailboxSession))
             .subscribeOn(Schedulers.elastic())
             .block();
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 11ca8ff..47d44a0 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -223,7 +223,7 @@ public class StoreMessageIdManager implements 
MessageIdManager {
     }
 
     @Override
-    public Mono<DeleteResult> delete(List<MessageId> messageIds, 
MailboxSession mailboxSession) throws MailboxException {
+    public Mono<DeleteResult> delete(List<MessageId> messageIds, 
MailboxSession mailboxSession) {
         MessageIdMapper messageIdMapper = 
mailboxSessionMapperFactory.getMessageIdMapper(mailboxSession);
 
         return messageIdMapper.findReactive(messageIds, 
MessageMapper.FetchType.Metadata)
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
index 5a90a6b..2564aa7 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesDestructionProcessor.java
@@ -22,29 +22,27 @@ package org.apache.james.jmap.draft.methods;
 import static org.apache.james.jmap.draft.methods.Method.JMAP_PREFIX;
 
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.jmap.draft.model.SetError;
 import org.apache.james.jmap.draft.model.SetMessagesRequest;
 import org.apache.james.jmap.draft.model.SetMessagesResponse;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageIdManager;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.model.DeleteResult;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.metrics.api.MetricFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
 
 public class SetMessagesDestructionProcessor implements SetMessagesProcessor {
-
     private static final Logger LOGGER = 
LoggerFactory.getLogger(SetMessagesCreationProcessor.class);
 
     private final MessageIdManager messageIdManager;
@@ -58,44 +56,38 @@ public class SetMessagesDestructionProcessor implements 
SetMessagesProcessor {
     }
 
     @Override
-    public SetMessagesResponse process(SetMessagesRequest request, 
MailboxSession mailboxSession) {
-        return metricFactory.decorateSupplierWithTimerMetric(JMAP_PREFIX + 
"SetMessageDestructionProcessor",
-            () -> delete(request.getDestroy(), mailboxSession)
-                .reduce(SetMessagesResponse.builder(),
-                    SetMessagesResponse.Builder::accumulator,
-                    SetMessagesResponse.Builder::combiner)
-                .build());
+    public Mono<SetMessagesResponse> processReactive(SetMessagesRequest 
request, MailboxSession mailboxSession) {
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_PREFIX + 
"SetMessageDestructionProcessor",
+            delete(request.getDestroy(), mailboxSession)));
     }
 
-
-    private Stream<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, 
MailboxSession mailboxSession) {
-        try {
-            if (toBeDestroyed.isEmpty()) {
-                return Stream.empty();
-            }
-            DeleteResult deleteResult = 
Mono.from(messageIdManager.delete(toBeDestroyed, mailboxSession))
-                .subscribeOn(Schedulers.elastic())
-                .block();
-
-            Stream<SetMessagesResponse> destroyed = 
deleteResult.getDestroyed().stream()
-                .map(messageId -> 
SetMessagesResponse.builder().destroyed(messageId).build());
-            Stream<SetMessagesResponse> notFound = 
deleteResult.getNotFound().stream()
-                .map(messageId -> 
SetMessagesResponse.builder().notDestroyed(messageId,
-                    SetError.builder()
-                        .type(SetError.Type.NOT_FOUND)
-                        .description("The message " + messageId.serialize() + 
" can't be found")
-                        .build())
-                    .build());
-            return Stream.concat(destroyed, notFound);
-        } catch (MailboxException e) {
-            LOGGER.error("An error occurred when deleting a message", e);
-            return toBeDestroyed.stream()
-                .map(messageId -> 
SetMessagesResponse.builder().notDestroyed(messageId,
-                    SetError.builder()
-                        .type(SetError.Type.ERROR)
-                        .description("An error occurred while deleting 
messages " + messageId.serialize())
-                        .build())
-                    .build());
+    private Mono<SetMessagesResponse> delete(List<MessageId> toBeDestroyed, 
MailboxSession mailboxSession) {
+        if (toBeDestroyed.isEmpty()) {
+            return Mono.just(SetMessagesResponse.builder().build());
         }
+        return Mono.from(messageIdManager.delete(toBeDestroyed, 
mailboxSession))
+            .map(deleteResult -> SetMessagesResponse.builder()
+                .destroyed(ImmutableList.copyOf(deleteResult.getDestroyed()))
+                .notDestroyed(deleteResult.getNotFound().stream()
+                    .map(messageId -> Pair.of(messageId,
+                        SetError.builder()
+                            .type(SetError.Type.NOT_FOUND)
+                            .description("The message " + 
messageId.serialize() + " can't be found")
+                            .build()))
+                    .collect(Guavate.toImmutableMap(Pair::getKey, 
Pair::getValue)))
+                .build())
+            .onErrorResume(e -> {
+                LOGGER.error("An error occurred when deleting a message", e);
+                return Mono.just(
+                    SetMessagesResponse.builder()
+                        .notDestroyed(toBeDestroyed.stream()
+                            .map(messageId -> Pair.of(messageId,
+                                SetError.builder()
+                                    .type(SetError.Type.ERROR)
+                                    .description("An error occurred while 
deleting messages " + messageId.serialize())
+                                    .build()))
+                            .collect(Guavate.toImmutableMap(Pair::getKey, 
Pair::getValue)))
+                        .build());
+            });
     }
 }
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
index 787d9f8..0d34019 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/SetMessagesProcessor.java
@@ -27,7 +27,9 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public interface SetMessagesProcessor {
-    SetMessagesResponse process(SetMessagesRequest request, MailboxSession 
mailboxSession);
+    default SetMessagesResponse process(SetMessagesRequest request, 
MailboxSession mailboxSession) {
+        return processReactive(request, mailboxSession).block();
+    }
 
     default Mono<SetMessagesResponse> processReactive(SetMessagesRequest 
request, MailboxSession mailboxSession) {
         return Mono.fromCallable(() -> process(request, mailboxSession))

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to