This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6736651799a5dd6350770753cd853057c9c2ed8c Author: Benoit Tellier <[email protected]> AuthorDate: Wed Nov 11 11:58:27 2020 +0700 JAMES-3432 UploadRoutes error handling and schedulers Required to pass on Distributed James --- .../apache/james/jmap/routes/UploadRoutes.scala | 33 ++++++++++------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala index 9bb9f78..2f084c6 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala @@ -19,7 +19,7 @@ package org.apache.james.jmap.routes -import java.io.{IOException, InputStream, UncheckedIOException} +import java.io.InputStream import java.util.stream import java.util.stream.Stream @@ -51,6 +51,8 @@ import reactor.core.scala.publisher.SMono import reactor.core.scheduler.Schedulers import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} +case class TooBigUploadException() extends RuntimeException + object UploadRoutes { val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes]) @@ -96,23 +98,23 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A def post(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = { request.requestHeaders.get(CONTENT_TYPE) match { case contentType => SMono.fromPublisher( - authenticator.authenticate(request)) - .flatMap(session => post(request, response, ContentType.of(contentType), session)) - .onErrorResume { - case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e)) - case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e)) - } - .asJava().`then`() + authenticator.authenticate(request)) + .flatMap(session => post(request, response, ContentType.of(contentType), session)) + .onErrorResume { + case e: TooBigUploadException => SMono.fromPublisher(response.status(BAD_REQUEST).sendString(SMono.just("Attempt to upload exceed max size")).`then`()) + case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e)) + case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e)) + } + .asJava() + .subscribeOn(Schedulers.elastic()) + .`then`() case _ => response.status(BAD_REQUEST).send } } - private def handleUncheckedIOException(response: HttpServerResponse, e: UncheckedIOException) = - response.status(BAD_REQUEST).sendString(SMono.just(e.getMessage)) - def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = { Id.validate(request.param(accountIdParam)) match { - case Right(id: Id) => { + case Right(id: Id) => val targetAccountId: AccountId = AccountId(id) AccountId.from(session.getUser).map(accountId => accountId.equals(targetAccountId)) .fold[SMono[Void]]( @@ -120,11 +122,9 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A value => if (value) { SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer)) .flatMap(content => handle(targetAccountId, contentType, content, session, response)) - .subscribeOn(Schedulers.elastic()) } else { SMono.raiseError(new UnauthorizedException("Attempt to upload in another account")) }) - } case Left(throwable: Throwable) => SMono.raiseError(throwable) } @@ -135,16 +135,13 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A SMono.fromCallable(() => new LimitedInputStream(content, maxSize) { override def raiseError(max: Long, count: Long): Unit = if (count > max) { - throw new IOException("Attempt to upload exceed max size") + throw TooBigUploadException() }}) .flatMap(uploadContent(accountId, contentType, _, mailboxSession)) .flatMap(uploadResponse => SMono.fromPublisher(response .header(CONTENT_TYPE, uploadResponse.`type`.asString()) .status(CREATED) .sendString(SMono.just(serializer.serialize(uploadResponse).toString())))) - .onErrorResume { - case e: UncheckedIOException => SMono.fromPublisher(handleUncheckedIOException(response, e)) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
