This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 7acaac94f2 [ENHANCEMENT] MDN/send shall be reactive
7acaac94f2 is described below
commit 7acaac94f2cae728abffb5cd5dde393679c04f3c
Author: Benoit TELLIER <[email protected]>
AuthorDate: Thu May 16 12:00:28 2024 +0200
[ENHANCEMENT] MDN/send shall be reactive
---
.../rfc8621/contract/MDNSendMethodContract.scala | 12 +++--
.../apache/james/jmap/method/MDNSendMethod.scala | 59 ++++++++++++----------
2 files changed, 39 insertions(+), 32 deletions(-)
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
index e029aa9299..ce4ac317cb 100644
---
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/MDNSendMethodContract.scala
@@ -860,6 +860,7 @@ trait MDNSendMethodContract {
.build(buildOriginalMessage("2")))
.getMessageId
+ val serializedRandomId = randomMessageId.serialize()
val request: String =
s"""{
| "using": [
@@ -891,7 +892,7 @@ trait MDNSendMethodContract {
| }
| },
| "k1548": {
- | "forEmailId": "${randomMessageId.serialize()}",
+ | "forEmailId": "$serializedRandomId",
| "disposition": {
| "actionMode": "manual-action",
| "sendingMode": "mdn-sent-manually",
@@ -957,7 +958,7 @@ trait MDNSendMethodContract {
| },
| "k1548": {
| "type": "notFound",
- | "description": "The reference
\\"forEmailId\\" cannot be found."
+ | "description": "The reference
\\"forEmailId\\" $serializedRandomId cannot be found for user [email protected]."
| }
| }
| },
@@ -1177,6 +1178,7 @@ trait MDNSendMethodContract {
val mailboxProbe: MailboxProbeImpl =
server.getProbe(classOf[MailboxProbeImpl])
mailboxProbe.createMailbox(path)
+ val randomMessageId1 = randomMessageId
val request: String =
s"""{
| "using": [
@@ -1192,7 +1194,7 @@ trait MDNSendMethodContract {
| "identityId": "$IDENTITY_ID",
| "send": {
| "k1546": {
- | "forEmailId": "${randomMessageId.serialize()}",
+ | "forEmailId": "${randomMessageId1.serialize()}",
| "disposition": {
| "actionMode": "manual-action",
| "sendingMode": "mdn-sent-manually",
@@ -1225,10 +1227,10 @@ trait MDNSendMethodContract {
assertThatJson(response)
.inPath("methodResponses[0][1].notSent")
- .isEqualTo("""{
+ .isEqualTo(s"""{
| "k1546": {
| "type": "notFound",
- | "description": "The reference \"forEmailId\"
cannot be found."
+ | "description": "The reference \\\"forEmailId\\\"
${randomMessageId1.serialize()} cannot be found for user [email protected]."
| }
|}""".stripMargin)
}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
index acfa6240b5..a6a541cb99 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MDNSendMethod.scala
@@ -19,6 +19,7 @@
package org.apache.james.jmap.method
+import com.google.common.collect.ImmutableList
import eu.timepit.refined.auto._
import jakarta.annotation.PreDestroy
import jakarta.inject.Inject
@@ -47,7 +48,6 @@ import org.apache.james.mime4j.stream.MimeConfig
import org.apache.james.queue.api.MailQueueFactory.SPOOL
import org.apache.james.queue.api.{MailQueue, MailQueueFactory}
import org.apache.james.server.core.MailImpl
-import org.apache.james.util.ReactorUtils
import org.apache.mailet.{Attribute, AttributeValue}
import play.api.libs.json.{JsError, JsObject, JsSuccess}
import reactor.core.scala.publisher.{SFlux, SMono}
@@ -114,26 +114,34 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
private def create(identity: Identity,
request: MDNSendRequest,
session: MailboxSession,
- processingContext: ProcessingContext):
SMono[(MDNSendResults, ProcessingContext)] =
- SFlux.fromIterable(request.send.view)
- .fold(MDNSendResults.empty -> processingContext) {
- (acc: (MDNSendResults, ProcessingContext), elem: (MDNSendCreationId,
JsObject)) => {
- val (mdnSendId, jsObject) = elem
- val (creationResult, updatedProcessingContext) =
createMDNSend(session, identity, mdnSendId, jsObject, acc._2)
- (MDNSendResults.merge(acc._1, creationResult) ->
updatedProcessingContext)
- }
+ processingContext: ProcessingContext):
SMono[(MDNSendResults, ProcessingContext)] = {
+ val list = request.send.view.toList
+ SFlux.just((list, MDNSendResults.empty, processingContext))
+ .expand {
+ case (head :: tail, result, context) =>
+ createMDNSend(session, identity, head._1, head._2, context)
+ .map {
+ case (newResult, newContext) => (tail,
MDNSendResults.merge(result, newResult), newContext)
+ }
+ case _ =>
+ SMono.empty
}
- .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+ .last()
+ .map {
+ case (_, result, context) => (result, context)
+ }
+ }
private def createMDNSend(session: MailboxSession,
identity: Identity,
mdnSendCreationId: MDNSendCreationId,
jsObject: JsObject,
- processingContext: ProcessingContext):
(MDNSendResults, ProcessingContext) =
+ processingContext: ProcessingContext):
SMono[(MDNSendResults, ProcessingContext)] =
parseMDNRequest(jsObject)
+ .fold(e => SMono.error(e), request => SMono.just(request))
.flatMap(createRequest => sendMDN(session, identity, mdnSendCreationId,
createRequest))
- .fold(error => (MDNSendResults.notSent(mdnSendCreationId, error) ->
processingContext),
- creation => MDNSendResults.sent(creation) -> processingContext)
+ .map(creation => MDNSendResults.sent(creation) -> processingContext)
+ .onErrorResume(e => SMono.just(MDNSendResults.notSent(mdnSendCreationId,
e) -> processingContext))
private def parseMDNRequest(jsObject: JsObject):
Either[MDNSendRequestInvalidException, MDNSendCreateRequest] =
MDNSendCreateRequest.validateProperties(jsObject)
@@ -145,13 +153,15 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
private def sendMDN(session: MailboxSession,
identity: Identity,
mdnSendCreationId: MDNSendCreationId,
- requestEntry: MDNSendCreateRequest): Either[Throwable,
MDNSendCreateSuccess] =
+ requestEntry: MDNSendCreateRequest):
SMono[MDNSendCreateSuccess] =
for {
mdnRelatedMessageResult <- retrieveRelatedMessageResult(session,
requestEntry)
mdnRelatedMessageResultAlready <-
validateMDNNotAlreadySent(mdnRelatedMessageResult)
+ .fold(e => SMono.error(e), result => SMono.just(result))
messageRelated = parseAsMessage(mdnRelatedMessageResultAlready)
mailAndResponseAndId <- buildMailAndResponse(identity,
session.getUser.asString(), requestEntry, messageRelated, session)
- _ <- Try(enqueue(mailAndResponseAndId._1)).toEither
+ .fold(e => SMono.error(e), result => SMono.just(result))
+ _ <-
enqueue(mailAndResponseAndId._1).`then`(SMono.just(mailAndResponseAndId._1))
} yield {
MDNSendCreateSuccess(
mdnCreationId = mdnSendCreationId,
@@ -159,19 +169,14 @@ class MDNSendMethod @Inject()(serializer: MDNSerializer,
forEmailId = mdnRelatedMessageResultAlready.getMessageId)
}
- private def enqueue(mail: MailImpl): Unit = try {
- queue.enQueue(mail)
- } finally {
- LifecycleUtil.dispose(mail)
- }
-
- private def retrieveRelatedMessageResult(session: MailboxSession,
requestEntry: MDNSendCreateRequest): Either[MDNSendNotFoundException,
MessageResult] =
- messageIdManager.getMessage(requestEntry.forEmailId.originalMessageId,
FetchGroup.FULL_CONTENT, session)
- .asScala
- .toList
- .headOption
- .toRight(MDNSendNotFoundException("The reference \"forEmailId\" cannot
be found."))
+ private def enqueue(mail: MailImpl): SMono[Unit] =
+ SMono(queue.enqueueReactive(mail))
+ .doFinally(_ => LifecycleUtil.dispose(mail))
+ .`then`()
+ private def retrieveRelatedMessageResult(session: MailboxSession,
requestEntry: MDNSendCreateRequest): SMono[MessageResult] =
+
SMono(messageIdManager.getMessagesReactive(ImmutableList.of(requestEntry.forEmailId.originalMessageId),
FetchGroup.FULL_CONTENT, session))
+ .switchIfEmpty(SMono.error(MDNSendNotFoundException(s"The reference
\"forEmailId\" ${requestEntry.forEmailId.originalMessageId.serialize()} cannot
be found for user ${session.getUser.asString()}.")))
private def validateMDNNotAlreadySent(relatedMessageResult: MessageResult):
Either[MDNSendAlreadySentException, MessageResult] =
if (relatedMessageResult.getFlags.contains(MDN_ALREADY_SENT_FLAG)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]