This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c2ca11b5eabfc23cea02685c1e2d70e9b96e23c8 Author: Benoit Tellier <[email protected]> AuthorDate: Sun May 2 07:38:13 2021 +0700 JAMES-3575 Reactify EmailSubmission/set --- .../SetMessagesOutboxFlagUpdateTest.java | 9 ++++ .../jmap/method/EmailSubmissionSetMethod.scala | 60 ++++++++++++---------- .../java/org/apache/james/queue/api/MailQueue.java | 2 + .../james/queue/file/FileCacheableMailQueue.java | 6 +++ .../james/queue/jms/JMSCacheableMailQueue.java | 6 +++ .../james/queue/memory/MemoryMailQueueFactory.java | 6 +++ .../org/apache/james/queue/rabbitmq/Enqueuer.java | 7 ++- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 15 ++++-- 8 files changed, 76 insertions(+), 35 deletions(-) diff --git a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java index bf21d1e..1dd258b 100644 --- a/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java +++ b/server/protocols/jmap-draft-integration-testing/jmap-draft-integration-testing-common/src/test/java/org/apache/james/jmap/draft/methods/integration/SetMessagesOutboxFlagUpdateTest.java @@ -55,12 +55,16 @@ import org.apache.mailet.Mail; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; import io.restassured.RestAssured; import io.restassured.builder.RequestSpecBuilder; import io.restassured.http.ContentType; import io.restassured.parsing.Parser; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public abstract class SetMessagesOutboxFlagUpdateTest { private static final Username USERNAME = Username.of("username@" + DOMAIN); @@ -94,6 +98,11 @@ public abstract class SetMessagesOutboxFlagUpdateTest { } @Override + public Publisher<Void> enqueueReactive(Mail mail) { + return Mono.fromRunnable(Throwing.runnable(() -> enQueue(mail)).sneakyThrow()); + } + + @Override public void enQueue(Mail mail) { } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala index 4c9edd3..f7c03a5 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala @@ -199,30 +199,38 @@ class EmailSubmissionSetMethod @Inject()(serializer: EmailSubmissionSetSerialize private def create(request: EmailSubmissionSetRequest, session: MailboxSession, processingContext: ProcessingContext): SMono[(CreationResults, ProcessingContext)] = - SFlux.fromIterable(request.create - .getOrElse(Map.empty) - .view) - .fold((CreationResults(Nil), processingContext)) { - (acc : (CreationResults, ProcessingContext), elem: (EmailSubmissionCreationId, JsObject)) => { + SFlux.fromIterable( + request.create + .getOrElse(Map.empty) + .view) + .fold[SMono[(CreationResults, ProcessingContext)]](SMono.just((CreationResults(Nil), processingContext))) { + (acc: SMono[(CreationResults, ProcessingContext)], elem: (EmailSubmissionCreationId, JsObject)) => { val (emailSubmissionCreationId, jsObject) = elem - val (creationResult, updatedProcessingContext) = createSubmission(session, emailSubmissionCreationId, jsObject, acc._2) - (CreationResults(acc._1.created :+ creationResult), updatedProcessingContext) + acc.flatMap { + case (creationResults, processingContext) => + createSubmission(session, emailSubmissionCreationId, jsObject, processingContext) + .map { + case (created, updatedProcessingContext) => CreationResults(creationResults.created :+ created) -> updatedProcessingContext + } + .switchIfEmpty(SMono.error(new RuntimeException("I should not be empty"))) + }.cache() } } + .flatMap(x => x) .subscribeOn(Schedulers.elastic()) private def createSubmission(mailboxSession: MailboxSession, emailSubmissionCreationId: EmailSubmissionCreationId, jsObject: JsObject, - processingContext: ProcessingContext): (CreationResult, ProcessingContext) = + processingContext: ProcessingContext): SMono[(CreationResult, ProcessingContext)] = parseCreate(jsObject) - .flatMap(emailSubmissionCreationRequest => sendEmail(mailboxSession, emailSubmissionCreationRequest)) + .fold(e => SMono.error(e), sendEmail(mailboxSession, _)) .map { case (creationResponse, messageId) => - (creationResponse, messageId, recordCreationIdInProcessingContext(emailSubmissionCreationId, processingContext, creationResponse.id)) + CreationSuccess(emailSubmissionCreationId, creationResponse, messageId) -> + recordCreationIdInProcessingContext(emailSubmissionCreationId, processingContext, creationResponse.id) } - .fold(e => (CreationFailure(emailSubmissionCreationId, e), processingContext), - creation => CreationSuccess(emailSubmissionCreationId, creation._1, creation._2) -> creation._3) + .onErrorResume(e => SMono.just((CreationFailure(emailSubmissionCreationId, e), processingContext))) private def parseCreate(jsObject: JsObject): Either[EmailSubmissionCreationParseException, EmailSubmissionCreationRequest] = EmailSubmissionCreationRequest.validateProperties(jsObject) @@ -240,18 +248,16 @@ class EmailSubmissionSetMethod @Inject()(serializer: EmailSubmissionSetSerialize } private def sendEmail(mailboxSession: MailboxSession, - request: EmailSubmissionCreationRequest): Either[Throwable, (EmailSubmissionCreationResponse, MessageId)] = { - val result = for { - message <- messageIdManager.getMessage(request.emailId, FetchGroup.FULL_CONTENT, mailboxSession) - .asScala - .toList - .headOption - .toRight(MessageNotFoundException(request.emailId)) + request: EmailSubmissionCreationRequest): SMono[(EmailSubmissionCreationResponse, MessageId)] = + for { + message <- SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava, FetchGroup.FULL_CONTENT, mailboxSession)) + .next + .switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId))) submissionId = EmailSubmissionId.generate - message <- toMimeMessage(submissionId.value, message).toEither - envelope <- resolveEnvelope(message, request.envelope).toEither - validation <- validate(mailboxSession)(message, envelope).toEither - mail <- Try({ + message <- SMono.fromTry(toMimeMessage(submissionId.value, message)) + envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope)) + validation <- SMono.fromTry(validate(mailboxSession)(message, envelope)) + mail <- SMono.fromCallable(() => { val mailImpl = MailImpl.builder() .name(submissionId.value) .addRecipients(envelope.rcptTo.map(_.email).asJava) @@ -260,13 +266,11 @@ class EmailSubmissionSetMethod @Inject()(serializer: EmailSubmissionSetSerialize .build() mailImpl.setMessageNoCopy(message) mailImpl - }).toEither - enqueue <- Try(queue.enQueue(mail)).toEither + }) + enqueue <- SMono(queue.enqueueReactive(mail)).`then`(SMono.just(submissionId)) } yield { - EmailSubmissionCreationResponse(submissionId) + EmailSubmissionCreationResponse(submissionId) -> request.emailId } - result.map(response => (response, request.emailId)) - } private def toMimeMessage(name: String, message: MessageResult): Try[MimeMessageWrapper] = { val source = MessageMimeMessageSource(name, message) diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java index 38b8c3f..a39990c 100644 --- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java +++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java @@ -105,6 +105,8 @@ public interface MailQueue extends Closeable { */ void enQueue(Mail mail) throws MailQueueException; + Publisher<Void> enqueueReactive(Mail mail); + /** * Dequeue the next ready-to-process Mail of the queue. This method will * block until a Mail is ready and then process the operation. diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java index d8b0676..146ddb2 100644 --- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java +++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java @@ -61,6 +61,7 @@ import org.apache.mailet.AttributeName; import org.apache.mailet.AttributeUtils; import org.apache.mailet.AttributeValue; import org.apache.mailet.Mail; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +122,11 @@ public class FileCacheableMailQueue implements ManageableMailQueue { return queueName; } + @Override + public Publisher<Void> enqueueReactive(Mail mail) { + return Mono.fromRunnable(Throwing.runnable(() -> enQueue(mail)).sneakyThrow()); + } + private void init() throws IOException { for (int i = 1; i <= SPLITCOUNT; i++) { diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java index 56b374d..e60e9fa 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java @@ -74,6 +74,7 @@ import org.apache.mailet.AttributeUtils; import org.apache.mailet.AttributeValue; import org.apache.mailet.Mail; import org.apache.mailet.PerRecipientHeaders; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -305,6 +306,11 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS); } + @Override + public Publisher<Void> enqueueReactive(Mail mail) { + return Mono.fromRunnable(Throwing.runnable(() -> enQueue(mail)).sneakyThrow()); + } + /** * Produce the mail to the JMS Queue */ diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index 025a024..a941b91 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -48,6 +48,7 @@ import org.apache.james.queue.api.MailQueueName; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.server.core.MailImpl; import org.apache.mailet.Mail; +import org.reactivestreams.Publisher; import org.threeten.extra.Temporals; import com.github.fge.lambdas.Throwing; @@ -127,6 +128,11 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF } } + @Override + public Publisher<Void> enqueueReactive(Mail mail) { + return Mono.fromRunnable(Throwing.runnable(() -> enQueue(mail)).sneakyThrow()); + } + private ZonedDateTime calculateNextDelivery(Duration delay) { if (!delay.isNegative()) { try { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 13db251..82fbc18 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -68,9 +68,9 @@ class Enqueuer { this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString()); } - void enQueue(Mail mail) throws MailQueue.MailQueueException { + Mono<Void> enQueue(Mail mail) throws MailQueue.MailQueueException { EnqueueId enqueueId = EnqueueId.generate(); - saveMail(mail) + return saveMail(mail) .map(partIds -> new MailReference(enqueueId, mail, partIds)) .flatMap(Throwing.<MailReference, Mono<Void>>function(mailReference -> { EnqueuedItem enqueuedItem = toEnqueuedItems(mailReference); @@ -79,8 +79,7 @@ class Enqueuer { publishReferenceToRabbit(mailReference)) .then(); }).sneakyThrow()) - .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)) - .block(); + .thenEmpty(Mono.fromRunnable(enqueueMetric::increment)); } Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 52c8660..cd233fb 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -31,10 +31,10 @@ import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; import com.google.common.base.Function; import com.google.common.base.MoreObjects; @@ -83,8 +83,17 @@ public class RabbitMQMailQueue implements ManageableMailQueue { @Override public void enQueue(Mail mail) { - metricFactory.runPublishingTimerMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), - Throwing.runnable(() -> enqueuer.enQueue(mail)).sneakyThrow()); + Mono.from(enqueueReactive(mail)).block(); + } + + @Override + public Publisher<Void> enqueueReactive(Mail mail) { + try { + return metricFactory.decoratePublisherWithTimerMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), + enqueuer.enQueue(mail)); + } catch (MailQueueException e) { + return Mono.error(e); + } } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
