This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 7acaac94f2 [ENHANCEMENT] MDN/send shall be reactive
7acaac94f2 is described below

commit 7acaac94f2cae728abffb5cd5dde393679c04f3c
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 16 12:00:28 2024 +0200

    [ENHANCEMENT] MDN/send shall be reactive
---
 .../rfc8621/contract/MDNSendMethodContract.scala   | 12 +++--
 .../apache/james/jmap/method/MDNSendMethod.scala   | 59 ++++++++++++----------
 2 files changed, 39 insertions(+), 32 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
index e029aa9299..ce4ac317cb 100644
--- 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
@@ -860,6 +860,7 @@ trait MDNSendMethodContract {
         .build(buildOriginalMessage("2")))
       .getMessageId
 
+    val serializedRandomId = randomMessageId.serialize()
     val request: String =
       s"""{
          |  "using": [
@@ -891,7 +892,7 @@ trait MDNSendMethodContract {
          |            }
          |          },
          |          "k1548": {
-         |            "forEmailId": "${randomMessageId.serialize()}",
+         |            "forEmailId": "$serializedRandomId",
          |            "disposition": {
          |              "actionMode": "manual-action",
          |              "sendingMode": "mdn-sent-manually",
@@ -957,7 +958,7 @@ trait MDNSendMethodContract {
                    |                    },
                    |                    "k1548": {
                    |                        "type": "notFound",
-                   |                        "description": "The reference 
\\"forEmailId\\" cannot be found."
+                   |                        "description": "The reference 
\\"forEmailId\\" $serializedRandomId cannot be found for user [email protected]."
                    |                    }
                    |                }
                    |            },
@@ -1177,6 +1178,7 @@ trait MDNSendMethodContract {
     val mailboxProbe: MailboxProbeImpl = 
server.getProbe(classOf[MailboxProbeImpl])
     mailboxProbe.createMailbox(path)
 
+    val randomMessageId1 = randomMessageId
     val request: String =
       s"""{
          |  "using": [
@@ -1192,7 +1194,7 @@ trait MDNSendMethodContract {
          |        "identityId": "$IDENTITY_ID",
          |        "send": {
          |          "k1546": {
-         |            "forEmailId": "${randomMessageId.serialize()}",
+         |            "forEmailId": "${randomMessageId1.serialize()}",
          |            "disposition": {
          |              "actionMode": "manual-action",
          |              "sendingMode": "mdn-sent-manually",
@@ -1225,10 +1227,10 @@ trait MDNSendMethodContract {
 
     assertThatJson(response)
       .inPath("methodResponses[0][1].notSent")
-      .isEqualTo("""{
+      .isEqualTo(s"""{
                    |    "k1546": {
                    |        "type": "notFound",
-                   |        "description": "The reference \"forEmailId\" 
cannot be found."
+                   |        "description": "The reference \\\"forEmailId\\\" 
${randomMessageId1.serialize()} cannot be found for user [email protected]."
                    |    }
                    |}""".stripMargin)
   }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
index acfa6240b5..a6a541cb99 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
@@ -19,6 +19,7 @@
 
 package org.apache.james.jmap.method
 
+import com.google.common.collect.ImmutableList
 import eu.timepit.refined.auto._
 import jakarta.annotation.PreDestroy
 import jakarta.inject.Inject
@@ -47,7 +48,6 @@ import org.apache.james.mime4j.stream.MimeConfig
 import org.apache.james.queue.api.MailQueueFactory.SPOOL
 import org.apache.james.queue.api.{MailQueue, MailQueueFactory}
 import org.apache.james.server.core.MailImpl
-import org.apache.james.util.ReactorUtils
 import org.apache.mailet.{Attribute, AttributeValue}
 import play.api.libs.json.{JsError, JsObject, JsSuccess}
 import reactor.core.scala.publisher.{SFlux, SMono}
@@ -114,26 +114,34 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
   private def create(identity: Identity,
                      request: MDNSendRequest,
                      session: MailboxSession,
-                     processingContext: ProcessingContext): 
SMono[(MDNSendResults, ProcessingContext)] =
-    SFlux.fromIterable(request.send.view)
-      .fold(MDNSendResults.empty -> processingContext) {
-        (acc: (MDNSendResults, ProcessingContext), elem: (MDNSendCreationId, 
JsObject)) => {
-          val (mdnSendId, jsObject) = elem
-          val (creationResult, updatedProcessingContext) = 
createMDNSend(session, identity, mdnSendId, jsObject, acc._2)
-          (MDNSendResults.merge(acc._1, creationResult) -> 
updatedProcessingContext)
-        }
+                     processingContext: ProcessingContext): 
SMono[(MDNSendResults, ProcessingContext)] = {
+    val list = request.send.view.toList
+    SFlux.just((list, MDNSendResults.empty, processingContext))
+      .expand {
+        case (head :: tail, result, context) =>
+          createMDNSend(session, identity, head._1, head._2, context)
+            .map {
+              case (newResult, newContext) => (tail, 
MDNSendResults.merge(result, newResult), newContext)
+            }
+        case _ =>
+          SMono.empty
       }
-      .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+      .last()
+      .map {
+        case (_, result, context) => (result, context)
+      }
+  }
 
   private def createMDNSend(session: MailboxSession,
                             identity: Identity,
                             mdnSendCreationId: MDNSendCreationId,
                             jsObject: JsObject,
-                            processingContext: ProcessingContext): 
(MDNSendResults, ProcessingContext) =
+                            processingContext: ProcessingContext): 
SMono[(MDNSendResults, ProcessingContext)] =
     parseMDNRequest(jsObject)
+      .fold(e => SMono.error(e), request => SMono.just(request))
       .flatMap(createRequest => sendMDN(session, identity, mdnSendCreationId, 
createRequest))
-      .fold(error => (MDNSendResults.notSent(mdnSendCreationId, error) -> 
processingContext),
-        creation => MDNSendResults.sent(creation) -> processingContext)
+      .map(creation => MDNSendResults.sent(creation) -> processingContext)
+      .onErrorResume(e => SMono.just(MDNSendResults.notSent(mdnSendCreationId, 
e) -> processingContext))
 
   private def parseMDNRequest(jsObject: JsObject): 
Either[MDNSendRequestInvalidException, MDNSendCreateRequest] =
     MDNSendCreateRequest.validateProperties(jsObject)
@@ -145,13 +153,15 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
   private def sendMDN(session: MailboxSession,
                       identity: Identity,
                       mdnSendCreationId: MDNSendCreationId,
-                      requestEntry: MDNSendCreateRequest): Either[Throwable, 
MDNSendCreateSuccess] =
+                      requestEntry: MDNSendCreateRequest): 
SMono[MDNSendCreateSuccess] =
     for {
       mdnRelatedMessageResult <- retrieveRelatedMessageResult(session, 
requestEntry)
       mdnRelatedMessageResultAlready <- 
validateMDNNotAlreadySent(mdnRelatedMessageResult)
+        .fold(e => SMono.error(e), result => SMono.just(result))
       messageRelated = parseAsMessage(mdnRelatedMessageResultAlready)
       mailAndResponseAndId <- buildMailAndResponse(identity, 
session.getUser.asString(), requestEntry, messageRelated, session)
-      _ <- Try(enqueue(mailAndResponseAndId._1)).toEither
+        .fold(e => SMono.error(e), result => SMono.just(result))
+      _ <- 
enqueue(mailAndResponseAndId._1).`then`(SMono.just(mailAndResponseAndId._1))
     } yield {
       MDNSendCreateSuccess(
         mdnCreationId = mdnSendCreationId,
@@ -159,19 +169,14 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
         forEmailId = mdnRelatedMessageResultAlready.getMessageId)
     }
 
-  private def enqueue(mail: MailImpl): Unit = try {
-    queue.enQueue(mail)
-  } finally {
-    LifecycleUtil.dispose(mail)
-  }
-
-  private def retrieveRelatedMessageResult(session: MailboxSession, 
requestEntry: MDNSendCreateRequest): Either[MDNSendNotFoundException, 
MessageResult] =
-    messageIdManager.getMessage(requestEntry.forEmailId.originalMessageId, 
FetchGroup.FULL_CONTENT, session)
-      .asScala
-      .toList
-      .headOption
-      .toRight(MDNSendNotFoundException("The reference \"forEmailId\" cannot 
be found."))
+  private def enqueue(mail: MailImpl): SMono[Unit] =
+    SMono(queue.enqueueReactive(mail))
+      .doFinally(_ =>  LifecycleUtil.dispose(mail))
+      .`then`()
 
+  private def retrieveRelatedMessageResult(session: MailboxSession, 
requestEntry: MDNSendCreateRequest): SMono[MessageResult] =
+    
SMono(messageIdManager.getMessagesReactive(ImmutableList.of(requestEntry.forEmailId.originalMessageId),
 FetchGroup.FULL_CONTENT, session))
+      .switchIfEmpty(SMono.error(MDNSendNotFoundException(s"The reference 
\"forEmailId\" ${requestEntry.forEmailId.originalMessageId.serialize()} cannot 
be found for user ${session.getUser.asString()}.")))
 
   private def validateMDNNotAlreadySent(relatedMessageResult: MessageResult): 
Either[MDNSendAlreadySentException, MessageResult] =
     if (relatedMessageResult.getFlags.contains(MDN_ALREADY_SENT_FLAG)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to