This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c2ca11b5eabfc23cea02685c1e2d70e9b96e23c8
Author: Benoit Tellier <[email protected]>
AuthorDate: Sun May 2 07:38:13 2021 +0700

    JAMES-3575 Reactify EmailSubmission/set
---
 .../SetMessagesOutboxFlagUpdateTest.java           |  9 ++++
 .../jmap/method/EmailSubmissionSetMethod.scala     | 60 ++++++++++++----------
 .../java/org/apache/james/queue/api/MailQueue.java |  2 +
 .../james/queue/file/FileCacheableMailQueue.java   |  6 +++
 .../james/queue/jms/JMSCacheableMailQueue.java     |  6 +++
 .../james/queue/memory/MemoryMailQueueFactory.java |  6 +++
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  7 ++-
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    | 15 ++++--
 8 files changed, 76 insertions(+), 35 deletions(-)

diff --git 
a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
 
b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
index bf21d1e..1dd258b 100644
--- 
a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
+++ 
b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java
@@ -55,12 +55,16 @@ import org.apache.mailet.Mail;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
 
 import io.restassured.RestAssured;
 import io.restassured.builder.RequestSpecBuilder;
 import io.restassured.http.ContentType;
 import io.restassured.parsing.Parser;
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public abstract class SetMessagesOutboxFlagUpdateTest {
     private static final Username USERNAME = Username.of("username@" + DOMAIN);
@@ -94,6 +98,11 @@ public abstract class SetMessagesOutboxFlagUpdateTest {
                 }
 
                 @Override
+                public Publisher<Void> enqueueReactive(Mail mail) {
+                    return Mono.fromRunnable(Throwing.runnable(() -> 
enQueue(mail)).sneakyThrow());
+                }
+
+                @Override
                 public void enQueue(Mail mail) {
 
                 }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
index 4c9edd3..f7c03a5 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
@@ -199,30 +199,38 @@ class EmailSubmissionSetMethod @Inject()(serializer: 
EmailSubmissionSetSerialize
   private def create(request: EmailSubmissionSetRequest,
                      session: MailboxSession,
                      processingContext: ProcessingContext): 
SMono[(CreationResults, ProcessingContext)] =
-    SFlux.fromIterable(request.create
-      .getOrElse(Map.empty)
-      .view)
-      .fold((CreationResults(Nil), processingContext)) {
-        (acc : (CreationResults, ProcessingContext), elem: 
(EmailSubmissionCreationId, JsObject)) => {
+    SFlux.fromIterable(
+      request.create
+        .getOrElse(Map.empty)
+        .view)
+      .fold[SMono[(CreationResults, 
ProcessingContext)]](SMono.just((CreationResults(Nil), processingContext))) {
+        (acc: SMono[(CreationResults, ProcessingContext)], elem: 
(EmailSubmissionCreationId, JsObject)) => {
           val (emailSubmissionCreationId, jsObject) = elem
-          val (creationResult, updatedProcessingContext) = 
createSubmission(session, emailSubmissionCreationId, jsObject, acc._2)
-          (CreationResults(acc._1.created :+ creationResult), 
updatedProcessingContext)
+          acc.flatMap {
+            case (creationResults, processingContext) =>
+              createSubmission(session, emailSubmissionCreationId, jsObject, 
processingContext)
+                .map {
+                  case (created, updatedProcessingContext) => 
CreationResults(creationResults.created :+ created) -> updatedProcessingContext
+                }
+              .switchIfEmpty(SMono.error(new RuntimeException("I should not be 
empty")))
+          }.cache()
         }
       }
+      .flatMap(x => x)
       .subscribeOn(Schedulers.elastic())
 
   private def createSubmission(mailboxSession: MailboxSession,
                             emailSubmissionCreationId: 
EmailSubmissionCreationId,
                             jsObject: JsObject,
-                            processingContext: ProcessingContext): 
(CreationResult, ProcessingContext) =
+                            processingContext: ProcessingContext): 
SMono[(CreationResult, ProcessingContext)] =
     parseCreate(jsObject)
-      .flatMap(emailSubmissionCreationRequest => sendEmail(mailboxSession, 
emailSubmissionCreationRequest))
+      .fold(e => SMono.error(e), sendEmail(mailboxSession, _))
       .map {
         case (creationResponse, messageId) =>
-          (creationResponse, messageId, 
recordCreationIdInProcessingContext(emailSubmissionCreationId, 
processingContext, creationResponse.id))
+          CreationSuccess(emailSubmissionCreationId, creationResponse, 
messageId) ->
+            recordCreationIdInProcessingContext(emailSubmissionCreationId, 
processingContext, creationResponse.id)
       }
-      .fold(e => (CreationFailure(emailSubmissionCreationId, e), 
processingContext),
-        creation => CreationSuccess(emailSubmissionCreationId, creation._1, 
creation._2) -> creation._3)
+      .onErrorResume(e => 
SMono.just((CreationFailure(emailSubmissionCreationId, e), processingContext)))
 
   private def parseCreate(jsObject: JsObject): 
Either[EmailSubmissionCreationParseException, EmailSubmissionCreationRequest] =
     EmailSubmissionCreationRequest.validateProperties(jsObject)
@@ -240,18 +248,16 @@ class EmailSubmissionSetMethod @Inject()(serializer: 
EmailSubmissionSetSerialize
     }
 
   private def sendEmail(mailboxSession: MailboxSession,
-                        request: EmailSubmissionCreationRequest): 
Either[Throwable, (EmailSubmissionCreationResponse, MessageId)] = {
-   val result = for {
-      message <- messageIdManager.getMessage(request.emailId, 
FetchGroup.FULL_CONTENT, mailboxSession)
-        .asScala
-        .toList
-        .headOption
-        .toRight(MessageNotFoundException(request.emailId))
+                        request: EmailSubmissionCreationRequest): 
SMono[(EmailSubmissionCreationResponse, MessageId)] =
+   for {
+      message <- 
SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava, 
FetchGroup.FULL_CONTENT, mailboxSession))
+        .next
+        .switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
       submissionId = EmailSubmissionId.generate
-      message <- toMimeMessage(submissionId.value, message).toEither
-      envelope <- resolveEnvelope(message, request.envelope).toEither
-      validation <- validate(mailboxSession)(message, envelope).toEither
-      mail <- Try({
+      message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
+      envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
+      validation <- SMono.fromTry(validate(mailboxSession)(message, envelope))
+      mail <- SMono.fromCallable(() => {
         val mailImpl = MailImpl.builder()
           .name(submissionId.value)
           .addRecipients(envelope.rcptTo.map(_.email).asJava)
@@ -260,13 +266,11 @@ class EmailSubmissionSetMethod @Inject()(serializer: 
EmailSubmissionSetSerialize
           .build()
         mailImpl.setMessageNoCopy(message)
         mailImpl
-      }).toEither
-      enqueue <- Try(queue.enQueue(mail)).toEither
+      })
+      enqueue <- 
SMono(queue.enqueueReactive(mail)).`then`(SMono.just(submissionId))
     } yield {
-      EmailSubmissionCreationResponse(submissionId)
+      EmailSubmissionCreationResponse(submissionId) -> request.emailId
     }
-     result.map(response => (response, request.emailId))
-  }
 
   private def toMimeMessage(name: String, message: MessageResult): 
Try[MimeMessageWrapper] = {
     val source = MessageMimeMessageSource(name, message)
diff --git 
a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
 
b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
index 38b8c3f..a39990c 100644
--- 
a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
+++ 
b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java
@@ -105,6 +105,8 @@ public interface MailQueue extends Closeable {
      */
     void enQueue(Mail mail) throws MailQueueException;
 
+    Publisher<Void> enqueueReactive(Mail mail);
+
     /**
      * Dequeue the next ready-to-process Mail of the queue. This method will
      * block until a Mail is ready and then process the operation.
diff --git 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
index d8b0676..146ddb2 100644
--- 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
+++ 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
@@ -61,6 +61,7 @@ import org.apache.mailet.AttributeName;
 import org.apache.mailet.AttributeUtils;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,6 +122,11 @@ public class FileCacheableMailQueue implements 
ManageableMailQueue {
         return queueName;
     }
 
+    @Override
+    public Publisher<Void> enqueueReactive(Mail mail) {
+        return Mono.fromRunnable(Throwing.runnable(() -> 
enQueue(mail)).sneakyThrow());
+    }
+
     private void init() throws IOException {
 
         for (int i = 1; i <= SPLITCOUNT; i++) {
diff --git 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 56b374d..e60e9fa 100644
--- 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -74,6 +74,7 @@ import org.apache.mailet.AttributeUtils;
 import org.apache.mailet.AttributeValue;
 import org.apache.mailet.Mail;
 import org.apache.mailet.PerRecipientHeaders;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -305,6 +306,11 @@ public class JMSCacheableMailQueue implements 
ManageableMailQueue, JMSSupport, M
         enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS);
     }
 
+    @Override
+    public Publisher<Void> enqueueReactive(Mail mail) {
+        return Mono.fromRunnable(Throwing.runnable(() -> 
enQueue(mail)).sneakyThrow());
+    }
+
     /**
      * Produce the mail to the JMS Queue
      */
diff --git 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 025a024..a941b91 100644
--- 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -48,6 +48,7 @@ import org.apache.james.queue.api.MailQueueName;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.server.core.MailImpl;
 import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
 import org.threeten.extra.Temporals;
 
 import com.github.fge.lambdas.Throwing;
@@ -127,6 +128,11 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
             }
         }
 
+        @Override
+        public Publisher<Void> enqueueReactive(Mail mail) {
+            return Mono.fromRunnable(Throwing.runnable(() -> 
enQueue(mail)).sneakyThrow());
+        }
+
         private ZonedDateTime calculateNextDelivery(Duration delay) {
             if (!delay.isNegative()) {
                 try {
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 13db251..82fbc18 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -68,9 +68,9 @@ class Enqueuer {
         this.enqueueMetric = 
metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString());
     }
 
-    void enQueue(Mail mail) throws MailQueue.MailQueueException {
+    Mono<Void> enQueue(Mail mail) throws MailQueue.MailQueueException {
         EnqueueId enqueueId = EnqueueId.generate();
-        saveMail(mail)
+        return saveMail(mail)
             .map(partIds -> new MailReference(enqueueId, mail, partIds))
             .flatMap(Throwing.<MailReference, 
Mono<Void>>function(mailReference -> {
                 EnqueuedItem enqueuedItem = toEnqueuedItems(mailReference);
@@ -79,8 +79,7 @@ class Enqueuer {
                         publishReferenceToRabbit(mailReference))
                         .then();
             }).sneakyThrow())
-            .thenEmpty(Mono.fromRunnable(enqueueMetric::increment))
-            .block();
+            .thenEmpty(Mono.fromRunnable(enqueueMetric::increment));
     }
 
     Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView 
item) {
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 52c8660..cd233fb 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -31,10 +31,10 @@ import 
org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 
@@ -83,8 +83,17 @@ public class RabbitMQMailQueue implements 
ManageableMailQueue {
 
     @Override
     public void enQueue(Mail mail) {
-        
metricFactory.runPublishingTimerMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + 
name.asString(),
-            Throwing.runnable(() -> enqueuer.enQueue(mail)).sneakyThrow());
+        Mono.from(enqueueReactive(mail)).block();
+    }
+
+    @Override
+    public Publisher<Void> enqueueReactive(Mail mail) {
+        try {
+            return 
metricFactory.decoratePublisherWithTimerMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX
 + name.asString(),
+                enqueuer.enQueue(mail));
+        } catch (MailQueueException e) {
+            return Mono.error(e);
+        }
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to