This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1f2cfe37f4c2b1dec5d149f3d96cb277759c94b2
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Nov 19 13:35:37 2020 +0700

    JAMES-2884 [REFACTORING] Use Either instead of SMono for request validation
    
    This gets rids of 2 nested flatmaps
---
 .../scala/org/apache/james/jmap/mail/Email.scala   | 29 ++++++------
 .../apache/james/jmap/method/EmailGetMethod.scala  | 14 +++---
 .../james/jmap/method/EmailQueryMethod.scala       | 25 +++++------
 .../apache/james/jmap/method/EmailSetMethod.scala  | 10 ++---
 .../jmap/method/EmailSubmissionSetMethod.scala     |  6 +--
 .../james/jmap/method/MailboxGetMethod.scala       | 11 ++---
 .../james/jmap/method/MailboxQueryMethod.scala     | 18 +++-----
 .../james/jmap/method/MailboxSetMethod.scala       | 12 +++--
 .../org/apache/james/jmap/method/Method.scala      | 51 ++++++++++++----------
 .../jmap/method/VacationResponseGetMethod.scala    |  9 ++--
 .../jmap/method/VacationResponseSetMethod.scala    | 17 +++-----
 .../apache/james/jmap/routes/JMAPApiRoutes.scala   |  6 +--
 .../apache/james/jmap/routes/SessionRoutes.scala   |  2 +-
 .../apache/james/jmap/routes/SessionSupplier.scala | 10 ++---
 .../james/jmap/routes/SessionSupplierTest.scala    |  4 +-
 15 files changed, 104 insertions(+), 120 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
index aa75e4e..f923fc7 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/Email.scala
@@ -405,10 +405,13 @@ private class 
GenericEmailViewReader[+EmailView](messageIdManager: MessageIdMana
         ids.toList.asJava,
         fetchGroup,
         mailboxSession))
-      .groupBy(_.getMessageId)
-      .flatMap(groupedFlux => groupedFlux.collectSeq().map(results => 
(groupedFlux.key(), results)))
+      .collectSeq()
+      .flatMapIterable(messages => messages.groupBy(_.getMessageId).toSet)
       .map(metadataViewFactory.toEmail(request))
-      .flatMap(SMono.fromTry(_))
+      .handle[T]((aTry, sink) => aTry match {
+        case Success(value) => sink.next(value)
+        case Failure(e) => sink.error(e)
+      })
 }
 
 private class EmailMetadataViewFactory @Inject()(zoneIdProvider: 
ZoneIdProvider) extends EmailViewFactory[EmailMetadataView] {
@@ -574,16 +577,13 @@ private class EmailFastViewReader 
@Inject()(messageIdManager: MessageIdManager,
                                             fullViewFactory: 
EmailFullViewFactory) extends EmailViewReader[EmailView] {
   private val fullReader: GenericEmailViewReader[EmailFullView] = new 
GenericEmailViewReader[EmailFullView](messageIdManager, FULL_CONTENT, 
fullViewFactory)
 
-  override def read[T >: EmailView](ids: Seq[MessageId], request: 
EmailGetRequest, mailboxSession: MailboxSession): SFlux[T] = {
+  override def read[T >: EmailView](ids: Seq[MessageId], request: 
EmailGetRequest, mailboxSession: MailboxSession): SFlux[T] =
     SMono.fromPublisher(messageFastViewProjection.retrieve(ids.asJava))
       .map(_.asScala.toMap)
-      .flatMapMany(fastViews => SFlux.fromIterable(ids)
-        .map(id => fastViews.get(id)
-          .map(FastViewAvailable(id, _))
-          .getOrElse(FastViewUnavailable(id))))
-      .collectSeq()
+      .map(fastViews => ids.map(id => fastViews.get(id)
+        .map(FastViewAvailable(id, _))
+        .getOrElse(FastViewUnavailable(id))))
       .flatMapMany(results => toEmailViews(results, request, mailboxSession))
-  }
 
   private def toEmailViews[T >: EmailView](results: Seq[FastViewResult], 
request: EmailGetRequest, mailboxSession: MailboxSession): SFlux[T] = {
     val availables: Seq[FastViewAvailable] = results.flatMap {
@@ -618,10 +618,13 @@ private class EmailFastViewReader 
@Inject()(messageIdManager: MessageIdManager,
     val ids: Seq[MessageId] = fastViews.map(_.id)
 
     SFlux.fromPublisher(messageIdManager.getMessagesReactive(ids.asJava, 
HEADERS, mailboxSession))
-      .groupBy(_.getMessageId)
-      .flatMap(groupedFlux => groupedFlux.collectSeq().map(results => 
(groupedFlux.key(), results)))
+      .collectSeq()
+      .flatMapIterable(messages => messages.groupBy(_.getMessageId).toSet)
       .map(x => toEmail(request)(x, fastViewsAsMap(x._1)))
-      .flatMap(SMono.fromTry(_))
+      .handle[EmailView]((aTry, sink) => aTry match {
+        case Success(value) => sink.next(value)
+        case Failure(e) => sink.error(e)
+      })
   }
 
   private def toEmail(request: EmailGetRequest)(message: (MessageId, 
Seq[MessageResult]), fastView: MessageFastViewPrecomputedProperties): 
Try[EmailView] = {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
index f228e54..e95d711 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailGetMethod.scala
@@ -89,7 +89,11 @@ class EmailGetMethod @Inject() (readerFactory: 
EmailViewReaderFactory,
     }).map(invocationResult => InvocationWithContext(invocationResult, 
invocation.processingContext))
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[EmailGetRequest] = asEmailGetRequest(invocation.arguments)
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, EmailGetRequest] =
+    EmailGetSerializer.deserializeEmailGetRequest(invocation.arguments.value) 
match {
+      case JsSuccess(emailGetRequest, _) => Right(emailGetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+    }
 
   private def computeResponseInvocation(request: EmailGetRequest, invocation: 
Invocation, mailboxSession: MailboxSession): SMono[Invocation] =
     validateProperties(request)
@@ -133,12 +137,6 @@ class EmailGetMethod @Inject() (readerFactory: 
EmailViewReaderFactory,
         }
     }
 
-  private def asEmailGetRequest(arguments: Arguments): SMono[EmailGetRequest] =
-    EmailGetSerializer.deserializeEmailGetRequest(arguments.value) match {
-      case JsSuccess(emailGetRequest, _) => SMono.just(emailGetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
-
   private def getEmails(request: EmailGetRequest, mailboxSession: 
MailboxSession): SMono[EmailGetResponse] =
     request.ids match {
       case None => SMono.raiseError(new IllegalArgumentException("ids can not 
be ommited for email/get"))
@@ -181,7 +179,7 @@ class EmailGetMethod @Inject() (readerFactory: 
EmailViewReaderFactory,
         .read(ids, request, mailboxSession)
         .collectMap(_.metadata.id)
 
-    foundResultsMono.flatMapMany(foundResults => SFlux.fromIterable(ids)
+    foundResultsMono.flatMapIterable(foundResults => ids
       .map(id => foundResults.get(id)
         .map(EmailGetResults.found)
         .getOrElse(EmailGetResults.notFound(id))))
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala
index ae7d822..e218397 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailQueryMethod.scala
@@ -78,7 +78,18 @@ class EmailQueryMethod @Inject() (serializer: 
EmailQuerySerializer,
     validation.fold(SMono.raiseError, res => res)
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[EmailQueryRequest] = 
asEmailQueryRequest(invocation.arguments)
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[Exception, EmailQueryRequest] =
+    serializer.deserializeEmailQueryRequest(invocation.arguments.value) match {
+      case JsSuccess(emailQueryRequest, _) => 
validateRequestParameters(emailQueryRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+    }
+
+  private def validateRequestParameters(request: EmailQueryRequest): 
Either[Exception, EmailQueryRequest] =
+    (request.anchor, request.anchorOffset) match {
+      case (Some(anchor), _) => 
Left(UnsupportedRequestParameterException("anchor"))
+      case (_, Some(anchorOffset)) => 
Left(UnsupportedRequestParameterException("anchorOffset"))
+      case _ => Right(request)
+    }
 
   private def executeQuery(session: MailboxSession, request: 
EmailQueryRequest, searchQuery: MultimailboxesSearchQuery, position: Position, 
limit: Limit): SMono[EmailQueryResponse] = {
     val ids: SMono[Seq[MessageId]] = request match {
@@ -167,16 +178,4 @@ class EmailQueryMethod @Inject() (serializer: 
EmailQuerySerializer,
       .map(MailboxFilter.buildQuery(request, _, capabilities, session))
   }
 
-  private def asEmailQueryRequest(arguments: Arguments): 
SMono[EmailQueryRequest] =
-    serializer.deserializeEmailQueryRequest(arguments.value) match {
-      case JsSuccess(emailQueryRequest, _) => 
validateRequestParameters(emailQueryRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
-
-  private def validateRequestParameters(request: EmailQueryRequest): 
SMono[EmailQueryRequest] =
-    (request.anchor, request.anchorOffset) match {
-      case (Some(anchor), _) => 
SMono.raiseError(UnsupportedRequestParameterException("anchor"))
-      case (_, Some(anchorOffset)) => 
SMono.raiseError(UnsupportedRequestParameterException("anchorOffset"))
-      case _ => SMono.just(request)
-    }
 }
\ No newline at end of file
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
index ffe0679..4a69b2c 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala
@@ -70,11 +70,9 @@ class EmailSetMethod @Inject()(serializer: 
EmailSetSerializer,
         }))
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[EmailSetRequest] = asEmailSetRequest(invocation.arguments)
-
-  private def asEmailSetRequest(arguments: Arguments): SMono[EmailSetRequest] =
-    serializer.deserialize(arguments.value) match {
-      case JsSuccess(emailSetRequest, _) => SMono.just(emailSetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, EmailSetRequest] =
+    serializer.deserialize(invocation.arguments.value) match {
+      case JsSuccess(emailSetRequest, _) => Right(emailSetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
     }
 }
\ No newline at end of file
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
index 34ce804..905d1c9 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala
@@ -147,10 +147,10 @@ class EmailSubmissionSetMethod @Inject()(serializer: 
EmailSubmissionSetSerialize
         request = request.implicitEmailSetRequest))
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[EmailSubmissionSetRequest] =
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, EmailSubmissionSetRequest] =
     
serializer.deserializeEmailSubmissionSetRequest(invocation.arguments.value) 
match {
-      case JsSuccess(emailSubmissionSetRequest, _) => 
SMono.just(emailSubmissionSetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+      case JsSuccess(emailSubmissionSetRequest, _) => 
Right(emailSubmissionSetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
     }
 
   private def create(request: EmailSubmissionSetRequest,
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
index cd036cb..58ff174 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxGetMethod.scala
@@ -89,13 +89,10 @@ class MailboxGetMethod @Inject() (serializer: 
MailboxSerializer,
 
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[MailboxGetRequest] = 
asMailboxGetRequest(invocation.arguments)
-
-  private def asMailboxGetRequest(arguments: Arguments): 
SMono[MailboxGetRequest] = {
-    serializer.deserializeMailboxGetRequest(arguments.value) match {
-      case JsSuccess(mailboxGetRequest, _) => SMono.just(mailboxGetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, MailboxGetRequest] =
+    serializer.deserializeMailboxGetRequest(invocation.arguments.value) match {
+    case JsSuccess(mailboxGetRequest, _) => Right(mailboxGetRequest)
+    case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
   }
 
   private def getMailboxes(capabilities: Set[CapabilityIdentifier],
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxQueryMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxQueryMethod.scala
index 123e7aa..b19a8af 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxQueryMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxQueryMethod.scala
@@ -35,7 +35,7 @@ import reactor.core.scheduler.Schedulers
 class MailboxQueryMethod @Inject()(systemMailboxesProvider: 
SystemMailboxesProvider,
                                    val metricFactory: MetricFactory,
                                    val sessionSupplier: SessionSupplier) 
extends MethodRequiringAccountId[MailboxQueryRequest] {
-  override val methodName = MethodName("Mailbox/query")
+  override val methodName: MethodName = MethodName("Mailbox/query")
   override val requiredCapabilities: Set[CapabilityIdentifier] = 
Set(JMAP_CORE, JMAP_MAIL)
 
   override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: 
InvocationWithContext, mailboxSession: MailboxSession, request: 
MailboxQueryRequest): SMono[InvocationWithContext] = {
@@ -51,9 +51,13 @@ class MailboxQueryMethod @Inject()(systemMailboxesProvider: 
SystemMailboxesProvi
       .map(invocationResult => InvocationWithContext(invocationResult, 
invocation.processingContext))
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[MailboxQueryRequest] = 
asMailboxQueryRequest(invocation.arguments)
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, MailboxQueryRequest] =
+    MailboxQuerySerializer.deserialize(invocation.arguments.value) match {
+      case JsSuccess(emailQueryRequest, _) => Right(emailQueryRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+    }
 
-  private def processRequest(mailboxSession: MailboxSession, invocation: 
Invocation, request: MailboxQueryRequest): SMono[Invocation] = {
+  private def processRequest(mailboxSession: MailboxSession, invocation: 
Invocation, request: MailboxQueryRequest): SMono[Invocation] =
     
SFlux.fromPublisher(systemMailboxesProvider.getMailboxByRole(request.filter.role,
 mailboxSession.getUser))
       .map(_.getId)
       .collectSeq()
@@ -65,12 +69,4 @@ class MailboxQueryMethod @Inject()(systemMailboxesProvider: 
SystemMailboxesProvi
         limit = Some(Limit.default)))
       .map(response => Invocation(methodName = methodName, arguments = 
Arguments(MailboxQuerySerializer.serialize(response)), methodCallId = 
invocation.methodCallId))
       .subscribeOn(Schedulers.elastic())
-  }
-
-  private def asMailboxQueryRequest(arguments: Arguments): 
SMono[MailboxQueryRequest] =
-    MailboxQuerySerializer.deserialize(arguments.value) match {
-      case JsSuccess(emailQueryRequest, _) => SMono.just(emailQueryRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
-
 }
\ No newline at end of file
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetMethod.scala
index 723c208..6b6952c 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/MailboxSetMethod.scala
@@ -57,7 +57,11 @@ class MailboxSetMethod @Inject()(serializer: 
MailboxSerializer,
     updateResults <- updatePerformer.updateMailboxes(mailboxSession, request, 
capabilities)
   } yield InvocationWithContext(createResponse(capabilities, 
invocation.invocation, request, creationResultsWithUpdatedProcessingContext._1, 
deletionResults, updateResults), creationResultsWithUpdatedProcessingContext._2)
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[MailboxSetRequest] = 
asMailboxSetRequest(invocation.arguments)
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, MailboxSetRequest] =
+    serializer.deserializeMailboxSetRequest(invocation.arguments.value) match {
+      case JsSuccess(mailboxSetRequest, _) => Right(mailboxSetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+    }
 
   private def createResponse(capabilities: Set[CapabilityIdentifier],
                              invocation: Invocation,
@@ -81,10 +85,4 @@ class MailboxSetMethod @Inject()(serializer: 
MailboxSerializer,
       invocation.methodCallId)
   }
 
-  private def asMailboxSetRequest(arguments: Arguments): 
SMono[MailboxSetRequest] = {
-    serializer.deserializeMailboxSetRequest(arguments.value) match {
-      case JsSuccess(mailboxSetRequest, _) => SMono.just(mailboxSetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
-  }
 }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/Method.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/Method.scala
index 7a4cdff..c2663a4 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/Method.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/Method.scala
@@ -27,7 +27,9 @@ import org.apache.james.mailbox.MailboxSession
 import org.apache.james.mailbox.exception.MailboxNotFoundException
 import org.apache.james.metrics.api.MetricFactory
 import org.reactivestreams.Publisher
-import reactor.core.scala.publisher.SMono
+import reactor.core.scala.publisher.{SFlux, SMono}
+
+case class AccountNotFoundException(invocation: Invocation) extends 
IllegalArgumentException
 
 case class InvocationWithContext(invocation: Invocation, processingContext: 
ProcessingContext) {
   def recordInvocation: InvocationWithContext = 
InvocationWithContext(invocation, 
processingContext.recordInvocation(invocation))
@@ -51,47 +53,50 @@ trait MethodRequiringAccountId[REQUEST <: WithAccountId] 
extends Method {
   def sessionSupplier: SessionSupplier
 
   override def process(capabilities: Set[CapabilityIdentifier], invocation: 
InvocationWithContext, mailboxSession: MailboxSession): 
Publisher[InvocationWithContext] = {
-    val result = getRequest(mailboxSession, invocation.invocation)
-      .flatMapMany(request => {
-        validateAccountId(request.accountId, mailboxSession, sessionSupplier, 
invocation.invocation)
-          .flatMapMany {
-            case Right(_) => doProcess(capabilities, invocation, 
mailboxSession, request)
-            case Left(errorInvocation) => 
SMono.just(InvocationWithContext(errorInvocation, invocation.processingContext))
-          }
-      })
-      .onErrorResume {
-        case e: UnsupportedRequestParameterException => 
SMono.just(InvocationWithContext(Invocation.error(
+    val either: Either[Exception, Publisher[InvocationWithContext]] = for {
+      request <- getRequest(mailboxSession, invocation.invocation)
+      _ <- validateAccountId(request.accountId, mailboxSession, 
sessionSupplier, invocation.invocation)
+    } yield {
+      doProcess(capabilities, invocation, mailboxSession, request)
+    }
+
+    val result: SFlux[InvocationWithContext] = 
SFlux.fromPublisher(either.fold(e => 
SFlux.raiseError[InvocationWithContext](e), r => r))
+      .onErrorResume[InvocationWithContext] {
+        case e: AccountNotFoundException => SFlux.just[InvocationWithContext] 
(InvocationWithContext(e.invocation, invocation.processingContext))
+        case e: UnsupportedRequestParameterException => 
SFlux.just[InvocationWithContext] (InvocationWithContext(Invocation.error(
           ErrorCode.InvalidArguments,
           s"The following parameter ${e.unsupportedParam} is syntactically 
valid, but is not supported by the server.",
           invocation.invocation.methodCallId), invocation.processingContext))
-        case e: UnsupportedSortException => 
SMono.just(InvocationWithContext(Invocation.error(
+        case e: UnsupportedSortException => SFlux.just[InvocationWithContext] 
(InvocationWithContext(Invocation.error(
           ErrorCode.UnsupportedSort,
           s"The sort ${e.unsupportedSort} is syntactically valid, but it 
includes a property the server does not support sorting on or a collation 
method it does not recognise.",
           invocation.invocation.methodCallId), invocation.processingContext))
-        case e: UnsupportedFilterException => 
SMono.just(InvocationWithContext(Invocation.error(
+        case e: UnsupportedFilterException => 
SFlux.just[InvocationWithContext] (InvocationWithContext(Invocation.error(
           ErrorCode.UnsupportedFilter,
           s"The filter ${e.unsupportedFilter} is syntactically valid, but the 
server cannot process it. If the filter was the result of a user’s search 
input, the client SHOULD suggest that the user simplify their search.",
           invocation.invocation.methodCallId), invocation.processingContext))
-        case e: UnsupportedNestingException => 
SMono.just(InvocationWithContext(Invocation.error(
+        case e: UnsupportedNestingException => 
SFlux.just[InvocationWithContext] (InvocationWithContext(Invocation.error(
           ErrorCode.UnsupportedFilter,
           description = e.message,
           invocation.invocation.methodCallId), invocation.processingContext))
-        case e: IllegalArgumentException => 
SMono.just(InvocationWithContext(Invocation.error(ErrorCode.InvalidArguments, 
e.getMessage, invocation.invocation.methodCallId), 
invocation.processingContext))
-        case e: MailboxNotFoundException => 
SMono.just(InvocationWithContext(Invocation.error(ErrorCode.InvalidArguments, 
e.getMessage, invocation.invocation.methodCallId), 
invocation.processingContext))
-        case e: Throwable => SMono.raiseError(e)
+        case e: IllegalArgumentException => SFlux.just[InvocationWithContext] 
(InvocationWithContext(Invocation.error(ErrorCode.InvalidArguments, 
e.getMessage, invocation.invocation.methodCallId), 
invocation.processingContext))
+        case e: MailboxNotFoundException => SFlux.just[InvocationWithContext] 
(InvocationWithContext(Invocation.error(ErrorCode.InvalidArguments, 
e.getMessage, invocation.invocation.methodCallId), 
invocation.processingContext))
+        case e: Throwable => SFlux.raiseError[InvocationWithContext] (e)
       }
 
     metricFactory.decoratePublisherWithTimerMetricLogP99(JMAP_RFC8621_PREFIX + 
methodName.value, result)
   }
 
-  private def validateAccountId(accountId: AccountId, mailboxSession: 
MailboxSession, sessionSupplier: SessionSupplier, invocation: Invocation): 
SMono[Either[Invocation, Session]] = {
+  private def validateAccountId(accountId: AccountId, mailboxSession: 
MailboxSession, sessionSupplier: SessionSupplier, invocation: Invocation): 
Either[IllegalArgumentException, Session] =
     sessionSupplier.generate(mailboxSession.getUser)
-      .filter(session => session.accounts.map(_.accountId).contains(accountId))
-      .map(session => Right[Invocation, 
Session](session).asInstanceOf[Either[Invocation, Session]])
-      .switchIfEmpty(SMono.just(Left[Invocation, 
Session](Invocation.error(ErrorCode.AccountNotFound, invocation.methodCallId))))
-  }
+      .flatMap(session =>
+        if (session.accounts.map(_.accountId).contains(accountId)) {
+          Right(session)
+        } else {
+          
Left(AccountNotFoundException(Invocation.error(ErrorCode.AccountNotFound, 
invocation.methodCallId)))
+        })
 
   def doProcess(capabilities: Set[CapabilityIdentifier], invocation: 
InvocationWithContext, mailboxSession: MailboxSession, request: REQUEST): 
Publisher[InvocationWithContext]
 
-  def getRequest(mailboxSession: MailboxSession, invocation: Invocation): 
SMono[REQUEST]
+  def getRequest(mailboxSession: MailboxSession, invocation: Invocation): 
Either[Exception, REQUEST]
 }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
index 1c8d780..926f5e0 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseGetMethod.scala
@@ -82,11 +82,10 @@ class VacationResponseGetMethod 
@Inject()(vacationRepository: VacationRepository
     }
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[VacationResponseGetRequest] = 
asVacationResponseGetRequest(invocation.arguments)
-
-  private def asVacationResponseGetRequest(arguments: Arguments): 
SMono[VacationResponseGetRequest] = 
VacationSerializer.deserializeVacationResponseGetRequest(arguments.value) match 
{
-      case JsSuccess(vacationResponseGetRequest, _) => 
SMono.just(vacationResponseGetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, VacationResponseGetRequest] =
+    
VacationSerializer.deserializeVacationResponseGetRequest(invocation.arguments.value)
 match {
+      case JsSuccess(vacationResponseGetRequest, _) => 
Right(vacationResponseGetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
     }
 
   private def handleRequestValidationErrors(exception: Exception, 
methodCallId: MethodCallId): SMono[Invocation] = exception match {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseSetMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseSetMethod.scala
index ce0dd34..56bd506 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseSetMethod.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/VacationResponseSetMethod.scala
@@ -81,7 +81,11 @@ class VacationResponseSetMethod 
@Inject()(vacationRepository: VacationRepository
       .map(InvocationWithContext(_, invocation.processingContext))
   }
 
-  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): SMono[VacationResponseSetRequest] = 
asVacationResponseSetRequest(invocation.arguments)
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[IllegalArgumentException, VacationResponseSetRequest] =
+    
VacationSerializer.deserializeVacationResponseSetRequest(invocation.arguments.value)
 match {
+      case JsSuccess(vacationResponseSetRequest, _) => 
Right(vacationResponseSetRequest)
+      case errors: JsError => Left(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
+    }
 
   private def update(mailboxSession: MailboxSession, 
vacationResponseSetRequest: VacationResponseSetRequest): 
SMono[VacationResponseUpdateResults] = {
     SFlux.fromIterable(vacationResponseSetRequest.parsePatch()
@@ -103,16 +107,7 @@ class VacationResponseSetMethod 
@Inject()(vacationRepository: VacationRepository
       vacationRepository.modifyVacation(toVacationAccountId(mailboxSession), 
validatedPatch))
       .`then`(SMono.just(VacationResponseUpdateSuccess))
 
-  private def toVacationAccountId(mailboxSession: MailboxSession): AccountId = 
{
-    AccountId.fromUsername(mailboxSession.getUser)
-  }
-
-  private def asVacationResponseSetRequest(arguments: Arguments): 
SMono[VacationResponseSetRequest] = {
-    VacationSerializer.deserializeVacationResponseSetRequest(arguments.value) 
match {
-      case JsSuccess(vacationResponseSetRequest, _) => 
SMono.just(vacationResponseSetRequest)
-      case errors: JsError => SMono.raiseError(new 
IllegalArgumentException(ResponseSerializer.serialize(errors).toString))
-    }
-  }
+  private def toVacationAccountId(mailboxSession: MailboxSession): AccountId = 
AccountId.fromUsername(mailboxSession.getUser)
 
   private def createResponse(invocation: Invocation,
                              vacationResponseSetRequest: 
VacationResponseSetRequest,
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
index af001c3..da67b15 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/JMAPApiRoutes.scala
@@ -159,12 +159,12 @@ class JMAPApiRoutes (val authenticator: Authenticator,
   }
 
   private def processMethodWithMatchName(capabilities: 
Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: 
MailboxSession): SFlux[InvocationWithContext] =
-    SMono.justOrEmpty(methodsByName.get(invocation.invocation.methodName))
-      .flatMapMany(method => validateCapabilities(capabilities, 
method.requiredCapabilities)
+    methodsByName.get(invocation.invocation.methodName)
+      .map(method => validateCapabilities(capabilities, 
method.requiredCapabilities)
         .fold(e => 
SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod, 
e.description, invocation.invocation.methodCallId), 
invocation.processingContext)),
           _ => SFlux.fromPublisher(method.process(capabilities, invocation, 
mailboxSession))))
+      
.getOrElse(SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod,
 invocation.invocation.methodCallId), invocation.processingContext)))
       .onErrorResume(throwable => 
SMono.just(InvocationWithContext(Invocation.error(ErrorCode.ServerFail, 
throwable.getMessage, invocation.invocation.methodCallId), 
invocation.processingContext)))
-      
.switchIfEmpty(SFlux.just(InvocationWithContext(Invocation.error(ErrorCode.UnknownMethod,
 invocation.invocation.methodCallId), invocation.processingContext)))
 
   private def validateCapabilities(capabilities: Set[CapabilityIdentifier], 
requiredCapabilities: Set[CapabilityIdentifier]): 
Either[MissingCapabilityException, Unit] = {
     val missingCapabilities = requiredCapabilities -- capabilities
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
index 4c20cea..81f5cc2 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionRoutes.scala
@@ -54,7 +54,7 @@ class SessionRoutes @Inject() (@Named(InjectionKeys.RFC_8621) 
val authenticator:
   private val generateSession: JMAPRoute.Action =
     (request, response) => 
SMono.fromPublisher(authenticator.authenticate(request))
       .map(_.getUser)
-      .flatMap(sessionSupplier.generate)
+      .flatMap(username => 
sessionSupplier.generate(username).fold(SMono.raiseError[Session], 
SMono.just[Session]))
       .flatMap(session => sendRespond(session, response))
       .onErrorResume(throwable => SMono.fromPublisher(errorHandling(throwable, 
response)))
       .subscribeOn(Schedulers.elastic())
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionSupplier.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionSupplier.scala
index 760d95c..c2af050 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionSupplier.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/SessionSupplier.scala
@@ -28,7 +28,7 @@ import reactor.core.scala.publisher.SMono
 class SessionSupplier @Inject() (val configuration: JmapRfc8621Configuration){
   private val maxSizeUpload = configuration.maxUploadSize
 
-  def generate(username: Username): SMono[Session] = {
+  def generate(username: Username): Either[IllegalArgumentException, Session] =
     accounts(username)
       .map(account => Session(
         DefaultCapabilities.supported(maxSizeUpload),
@@ -39,13 +39,9 @@ class SessionSupplier @Inject() (val configuration: 
JmapRfc8621Configuration){
         downloadUrl = configuration.downloadUrl,
         uploadUrl = configuration.uploadUrl,
         eventSourceUrl = configuration.eventSourceUrl))
-  }
 
-  private def accounts(username: Username): SMono[Account] = SMono.defer(() =>
-    Account.from(username, IsPersonal(true), IsReadOnly(false), 
DefaultCapabilities.supported(maxSizeUpload).toSet) match {
-      case Left(ex: IllegalArgumentException) => SMono.raiseError(ex)
-      case Right(account: Account) => SMono.just(account)
-    })
+  private def accounts(username: Username): Either[IllegalArgumentException, 
Account] =
+    Account.from(username, IsPersonal(true), IsReadOnly(false), 
DefaultCapabilities.supported(maxSizeUpload).toSet)
 
   private def primaryAccounts(accountId: AccountId): Map[CapabilityIdentifier, 
AccountId] =
     DefaultCapabilities.supported(maxSizeUpload).toSet
diff --git 
a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionSupplierTest.scala
 
b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionSupplierTest.scala
index 01e53ce..83b7790 100644
--- 
a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionSupplierTest.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionSupplierTest.scala
@@ -33,11 +33,11 @@ class SessionSupplierTest extends AnyWordSpec with Matchers 
{
 
   "generate" should {
     "return correct username" in {
-      new 
SessionSupplier(JmapRfc8621Configuration.LOCALHOST_CONFIGURATION).generate(USERNAME).block().username
 should equal(USERNAME)
+      new 
SessionSupplier(JmapRfc8621Configuration.LOCALHOST_CONFIGURATION).generate(USERNAME).toOption.get.username
 should equal(USERNAME)
     }
 
     "return correct account" which {
-      val accounts = new 
SessionSupplier(JmapRfc8621Configuration.LOCALHOST_CONFIGURATION).generate(USERNAME).block().accounts
+      val accounts = new 
SessionSupplier(JmapRfc8621Configuration.LOCALHOST_CONFIGURATION).generate(USERNAME).toOption.get.accounts
 
       "has size" in {
         accounts should have size 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to