This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6736651799a5dd6350770753cd853057c9c2ed8c
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Nov 11 11:58:27 2020 +0700

    JAMES-3432 UploadRoutes error handling and schedulers
    
    Required to pass on Distributed James
---
 .../apache/james/jmap/routes/UploadRoutes.scala    | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
index 9bb9f78..2f084c6 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -19,7 +19,7 @@
 
 package org.apache.james.jmap.routes
 
-import java.io.{IOException, InputStream, UncheckedIOException}
+import java.io.InputStream
 import java.util.stream
 import java.util.stream.Stream
 
@@ -51,6 +51,8 @@ import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
 
+case class TooBigUploadException() extends RuntimeException
+
 object UploadRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes])
 
@@ -96,23 +98,23 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) 
val authenticator: A
   def post(request: HttpServerRequest, response: HttpServerResponse): 
Mono[Void] = {
     request.requestHeaders.get(CONTENT_TYPE) match {
       case contentType => SMono.fromPublisher(
-          authenticator.authenticate(request))
-          .flatMap(session => post(request, response, 
ContentType.of(contentType), session))
-          .onErrorResume {
-            case e: UnauthorizedException => 
SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
-            case e: Throwable => 
SMono.fromPublisher(handleInternalError(response, LOGGER, e))
-          }
-          .asJava().`then`()
+        authenticator.authenticate(request))
+        .flatMap(session => post(request, response, 
ContentType.of(contentType), session))
+        .onErrorResume {
+          case e: TooBigUploadException => 
SMono.fromPublisher(response.status(BAD_REQUEST).sendString(SMono.just("Attempt 
to upload exceed max size")).`then`())
+          case e: UnauthorizedException => 
SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e))
+          case e: Throwable => 
SMono.fromPublisher(handleInternalError(response, LOGGER, e))
+        }
+        .asJava()
+        .subscribeOn(Schedulers.elastic())
+        .`then`()
       case _ => response.status(BAD_REQUEST).send
     }
   }
 
-  private def handleUncheckedIOException(response: HttpServerResponse, e: 
UncheckedIOException) =
-    response.status(BAD_REQUEST).sendString(SMono.just(e.getMessage))
-
   def post(request: HttpServerRequest, response: HttpServerResponse, 
contentType: ContentType, session: MailboxSession): SMono[Void] = {
     Id.validate(request.param(accountIdParam)) match {
-      case Right(id: Id) => {
+      case Right(id: Id) =>
         val targetAccountId: AccountId = AccountId(id)
         AccountId.from(session.getUser).map(accountId => 
accountId.equals(targetAccountId))
           .fold[SMono[Void]](
@@ -120,11 +122,9 @@ class UploadRoutes 
@Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A
             value => if (value) {
               SMono.fromCallable(() => 
ReactorUtils.toInputStream(request.receive.asByteBuffer))
               .flatMap(content => handle(targetAccountId, contentType, 
content, session, response))
-              .subscribeOn(Schedulers.elastic())
             } else {
               SMono.raiseError(new UnauthorizedException("Attempt to upload in 
another account"))
             })
-      }
 
       case Left(throwable: Throwable) => SMono.raiseError(throwable)
     }
@@ -135,16 +135,13 @@ class UploadRoutes 
@Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A
 
     SMono.fromCallable(() => new LimitedInputStream(content, maxSize) {
       override def raiseError(max: Long, count: Long): Unit = if (count > max) 
{
-        throw new IOException("Attempt to upload exceed max size")
+        throw TooBigUploadException()
       }})
       .flatMap(uploadContent(accountId, contentType, _, mailboxSession))
       .flatMap(uploadResponse => SMono.fromPublisher(response
               .header(CONTENT_TYPE, uploadResponse.`type`.asString())
               .status(CREATED)
               
.sendString(SMono.just(serializer.serialize(uploadResponse).toString()))))
-      .onErrorResume {
-        case e: UncheckedIOException => 
SMono.fromPublisher(handleUncheckedIOException(response, e))
-      }
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to