This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8335aa3962e04210147f4fe043fe06853c982e32 Author: duc91 <[email protected]> AuthorDate: Fri Oct 23 15:03:32 2020 +0700 JAMES-3432 Add test case downloadShouldRejectWhenDownloadFromOther --- .../james/jmap/rfc8621/RFC8621MethodsModule.java | 2 +- .../jmap/rfc8621/contract/DownloadContract.scala | 20 ++++- .../jmap/rfc8621/contract/UploadContract.scala | 87 +++++++++++++++------- .../jmap/rfc8621/memory/MemoryUploadContract.java | 38 ++++++++++ .../apache/james/jmap/json/UploadSerializer.scala | 18 +++++ .../apache/james/jmap/routes/DownloadRoutes.scala | 57 ++++++++++---- .../apache/james/jmap/routes/UploadRoutes.scala | 55 ++++++++------ 7 files changed, 212 insertions(+), 65 deletions(-) diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java index 13eeaed..d8c2771 100644 --- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java +++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java @@ -47,8 +47,8 @@ import org.apache.james.jmap.method.VacationResponseSetMethod; import org.apache.james.jmap.method.ZoneIdProvider; import org.apache.james.jmap.model.JmapRfc8621Configuration; import org.apache.james.jmap.routes.DownloadRoutes; -import org.apache.james.jmap.routes.UploadRoutes; import org.apache.james.jmap.routes.JMAPApiRoutes; +import org.apache.james.jmap.routes.UploadRoutes; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.utils.PropertiesProvider; import org.slf4j.Logger; diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala index 5e0e753..20579b9 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala @@ -29,7 +29,7 @@ import org.apache.http.HttpStatus.{SC_NOT_FOUND, SC_OK, SC_UNAUTHORIZED} import org.apache.james.GuiceJamesServer import org.apache.james.jmap.http.UserCredential import org.apache.james.jmap.rfc8621.contract.DownloadContract.accountId -import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder} +import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ALICE_ACCOUNT_ID, ANDRE, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder} import org.apache.james.mailbox.MessageManager.AppendCommand import org.apache.james.mailbox.model.MailboxACL.Right import org.apache.james.mailbox.model.{MailboxACL, MailboxPath, MessageId} @@ -153,6 +153,24 @@ trait DownloadContract { } @Test + def downloadingInOtherAccountsShouldFail(server: GuiceJamesServer): Unit = { + val path = MailboxPath.inbox(BOB) + server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path) + val messageId: MessageId = server.getProbe(classOf[MailboxProbeImpl]) + .appendMessage(BOB.asString, path, AppendCommand.from( + ClassLoader.getSystemResourceAsStream("eml/multipart_simple.eml"))) + .getMessageId + + `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .when + .get(s"/download/$ALICE_ACCOUNT_ID/${messageId.serialize}") + .`then` + .statusCode(SC_UNAUTHORIZED) + } + + @Test def downloadPartShouldSucceedWhenDelegated(server: GuiceJamesServer): Unit = { val path = MailboxPath.inbox(ANDRE) server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path) diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala index 9605f44..770efcd 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala @@ -1,7 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ package org.apache.james.jmap.rfc8621.contract import java.io.{ByteArrayInputStream, InputStream} import java.nio.charset.StandardCharsets + import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT import io.restassured.RestAssured.{`given`, requestSpecification} import io.restassured.http.ContentType @@ -10,11 +29,11 @@ import org.apache.commons.io.IOUtils import org.apache.http.HttpStatus.{SC_CREATED, SC_NOT_FOUND, SC_OK, SC_UNAUTHORIZED} import org.apache.james.GuiceJamesServer import org.apache.james.jmap.http.UserCredential -import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, BOB, BOB_PASSWORD, DOMAIN, RFC8621_VERSION_HEADER, authScheme, baseRequestSpecBuilder} +import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ALICE, ALICE_ACCOUNT_ID, ALICE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, RFC8621_VERSION_HEADER, _2_DOT_DOMAIN, authScheme, baseRequestSpecBuilder} import org.apache.james.jmap.rfc8621.contract.UploadContract.{BIG_INPUT_STREAM, VALID_INPUT_STREAM} import org.apache.james.utils.DataProbeImpl import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.{BeforeEach, Test} +import org.junit.jupiter.api.{BeforeEach, Disabled, Test} import play.api.libs.json.{JsString, Json} object UploadContract { @@ -29,6 +48,8 @@ trait UploadContract { .fluent .addDomain(DOMAIN.asString) .addUser(BOB.asString, BOB_PASSWORD) + .addDomain(_2_DOT_DOMAIN.asString()) + .addUser(ALICE.asString(), ALICE_PASSWORD) requestSpecification = baseRequestSpecBuilder(server) .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD))) @@ -36,7 +57,7 @@ trait UploadContract { } @Test - def shouldUploadFileAndOnlyOwnerCanAccess(): Unit = { + def shouldUploadFileAndAllowToDownloadIt(): Unit = { val uploadResponse: String = `given` .basePath("") .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) @@ -69,63 +90,75 @@ trait UploadContract { } @Test - def shouldRejectWhenUploadFileTooBig(): Unit = { - val response: String = `given` + def bobShouldNotBeAllowedToUploadInAliceAccount(): Unit = { + `given` .basePath("") .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) - .contentType(ContentType.BINARY) - .body(BIG_INPUT_STREAM) + .body(VALID_INPUT_STREAM) + .when + .post(s"/upload/$ALICE_ACCOUNT_ID/") + .`then` + .statusCode(SC_UNAUTHORIZED) + } + + @Test + def aliceShouldNotAccessOrDownloadFileUploadedByBob(): Unit = { + val uploadResponse: String = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(VALID_INPUT_STREAM) .when .post(s"/upload/$ACCOUNT_ID/") .`then` - .statusCode(SC_OK) + .statusCode(SC_CREATED) .extract .body .asString - // fixme: dont know we limit size or not? - assertThatJson(response) - .isEqualTo("Should be error") - } + val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value - @Test - def uploadShouldRejectWhenUnauthenticated(): Unit = { `given` - .auth() - .none() + .auth().basic(ALICE.asString(), ALICE_PASSWORD) .basePath("") .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) - .contentType(ContentType.BINARY) - .body(VALID_INPUT_STREAM) .when - .post(s"/upload/$ACCOUNT_ID/") + .get(s"/download/$ALICE_ACCOUNT_ID/$blobId") .`then` .statusCode(SC_UNAUTHORIZED) } @Test - def uploadShouldSucceedButExpiredWhenDownload(): Unit = { - val uploadResponse: String = `given` + @Disabled("JAMES-1788 Upload size limitation needs to be contributed") + def shouldRejectWhenUploadFileTooBig(): Unit = { + val response: String = `given` .basePath("") .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) - .body(VALID_INPUT_STREAM) + .contentType(ContentType.BINARY) + .body(BIG_INPUT_STREAM) .when .post(s"/upload/$ACCOUNT_ID/") .`then` - .statusCode(SC_CREATED) + .statusCode(SC_OK) .extract .body .asString - val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value + assertThatJson(response) + .isEqualTo("Should be error") + } - // fixme: dont know how to delete file with existing attachment api + @Test + def uploadShouldRejectWhenUnauthenticated(): Unit = { `given` + .auth() + .none() .basePath("") .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .contentType(ContentType.BINARY) + .body(VALID_INPUT_STREAM) .when - .get(s"/download/$ACCOUNT_ID/$blobId") + .post(s"/upload/$ACCOUNT_ID/") .`then` - .statusCode(SC_NOT_FOUND) + .statusCode(SC_UNAUTHORIZED) } } diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryUploadContract.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryUploadContract.java new file mode 100644 index 0000000..9f55acd --- /dev/null +++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryUploadContract.java @@ -0,0 +1,38 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.rfc8621.memory; + +import static org.apache.james.MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE; + +import org.apache.james.GuiceJamesServer; +import org.apache.james.JamesServerBuilder; +import org.apache.james.JamesServerExtension; +import org.apache.james.jmap.rfc8621.contract.UploadContract; +import org.apache.james.modules.TestJMAPServerModule; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class MemoryUploadContract implements UploadContract { + @RegisterExtension + static JamesServerExtension testExtension = new JamesServerBuilder<>(JamesServerBuilder.defaultConfigurationProvider()) + .server(configuration -> GuiceJamesServer.forConfiguration(configuration) + .combineWith(IN_MEMORY_SERVER_AGGREGATE_MODULE) + .overrideWith(new TestJMAPServerModule())) + .build(); +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala index a7f8b25..3014869 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala @@ -1,3 +1,21 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ package org.apache.james.jmap.json import org.apache.james.jmap.mail.BlobId diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala index f7c580e..702fce3 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala @@ -36,8 +36,11 @@ import org.apache.james.jmap.http.Authenticator import org.apache.james.jmap.http.rfc8621.InjectionKeys import org.apache.james.jmap.mail.Email.Size import org.apache.james.jmap.mail.{BlobId, EmailBodyPart, PartId} +import org.apache.james.jmap.model.Id.Id +import org.apache.james.jmap.model.{AccountId, Id} import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER} import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} +import org.apache.james.mailbox.exception.AttachmentNotFoundException import org.apache.james.mailbox.model.{AttachmentId, AttachmentMetadata, ContentType, FetchGroup, MessageId, MessageResult} import org.apache.james.mailbox.{AttachmentManager, MailboxSession, MessageIdManager} import org.apache.james.mime4j.codec.EncoderUtil @@ -133,10 +136,13 @@ class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) extends BlobResolver { override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = AttachmentId.from(org.apache.james.mailbox.model.BlobId.fromString(blobId.value.value)) match { - case attachmentId: AttachmentId => Applicable( - SMono.fromCallable(() => attachmentManager.getAttachment(attachmentId, mailboxSession)) - .map((attachmentMetadata: AttachmentMetadata) => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession))) - ) + case attachmentId: AttachmentId => + Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match { + case Success(attachmentMetadata) => Applicable( + SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession)))) + case Failure(_) => Applicable(SMono.raiseError(BlobNotFoundException(blobId))) + } + case _ => NonApplicable() } } @@ -205,17 +211,7 @@ class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: private def get(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = SMono(authenticator.authenticate(request)) - .flatMap((mailboxSession: MailboxSession) => - SMono.fromTry(BlobId.of(request.param(blobIdParam))) - .flatMap(blobResolvers.resolve(_, mailboxSession)) - .flatMap(blob => downloadBlob( - optionalName = queryParam(request, nameParam), - response = response, - blobContentType = queryParam(request, contentTypeParam) - .map(ContentType.of) - .getOrElse(blob.contentType), - blob = blob)) - .`then`) + .flatMap(mailboxSession => getIfOwner(request, response, mailboxSession)) .onErrorResume { case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e)).`then` case _: BlobNotFoundException => SMono.fromPublisher(response.status(SC_NOT_FOUND).send).`then` @@ -227,6 +223,37 @@ class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: .asJava() .`then` + private def get(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = { + SMono.fromTry(BlobId.of(request.param(blobIdParam))) + .flatMap(blobResolvers.resolve(_, mailboxSession)) + .flatMap(blob => downloadBlob( + optionalName = queryParam(request, nameParam), + response = response, + blobContentType = queryParam(request, contentTypeParam) + .map(ContentType.of) + .getOrElse(blob.contentType), + blob = blob) + .`then`()) + } + + private def getIfOwner(request: HttpServerRequest, response: HttpServerResponse, mailboxSession: MailboxSession): SMono[Unit] = { + Id.validate(request.param(accountIdParam)) match { + case Right(id: Id) => { + val targetAccountId: AccountId = AccountId(id) + AccountId.from(mailboxSession.getUser).map(accountId => accountId.equals(targetAccountId)) + .fold[SMono[Unit]]( + e => SMono.raiseError(e), + value => if (value) { + get(request, response, mailboxSession) + } else { + SMono.raiseError(new UnauthorizedException("You cannot upload to others")) + }) + } + + case Left(throwable: Throwable) => SMono.raiseError(throwable) + } + } + private def downloadBlob(optionalName: Option[String], response: HttpServerResponse, blobContentType: ContentType, diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala index f06fa51..6e37695 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala @@ -33,7 +33,7 @@ import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} import org.apache.james.jmap.http.Authenticator import org.apache.james.jmap.http.rfc8621.InjectionKeys import org.apache.james.jmap.mail.Email.Size -import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, fromAttachment} +import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, sanitizeSize} import org.apache.james.mailbox.{AttachmentManager, MailboxSession} import org.apache.james.mailbox.model.{AttachmentMetadata, ContentType} import org.apache.james.util.ReactorUtils @@ -48,6 +48,8 @@ import eu.timepit.refined.refineV import org.apache.james.jmap.exceptions.UnauthorizedException import org.apache.james.jmap.json.UploadSerializer import org.apache.james.jmap.mail.BlobId +import org.apache.james.jmap.model.{AccountId, Id} +import org.apache.james.jmap.model.Id.Id object UploadRoutes { val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes]) @@ -63,27 +65,18 @@ object UploadRoutes { }, refinedValue => refinedValue) } - - def fromAttachment(attachmentMetadata: AttachmentMetadata): UploadResponse = - UploadResponse( - blobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get, - `type` = ContentType.of(attachmentMetadata.getType.asString), - size = sanitizeSize(attachmentMetadata.getSize), - expires = None) } -case class UploadResponse(blobId: BlobId, +case class UploadResponse(accountId: AccountId, + blobId: BlobId, `type`: ContentType, - size: Size, - expires: Option[ZonedDateTime]) + size: Size) class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, val attachmentManager: AttachmentManager, val serializer: UploadSerializer) extends JMAPRoutes { - class CancelledUploadException extends RuntimeException { - - } + class CancelledUploadException extends RuntimeException private val accountIdParam: String = "accountId" private val uploadURI = s"/upload/{$accountIdParam}/" @@ -113,21 +106,41 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: A } def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = { - SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer)) - .flatMap(content => handle(contentType, content, session, response)) - .subscribeOn(Schedulers.elastic()) + Id.validate(request.param(accountIdParam)) match { + case Right(id: Id) => { + val targetAccountId: AccountId = AccountId(id) + AccountId.from(session.getUser).map(accountId => accountId.equals(targetAccountId)) + .fold[SMono[Void]]( + e => SMono.raiseError(e), + value => if (value) { + SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer)) + .flatMap(content => handle(targetAccountId, contentType, content, session, response)) + .subscribeOn(Schedulers.elastic()) + } else { + SMono.raiseError(new UnauthorizedException("Attempt to upload in another account")) + }) + } + + case Left(throwable: Throwable) => SMono.raiseError(throwable) + } } - def handle(contentType: ContentType, content: InputStream, mailboxSession: MailboxSession, response: HttpServerResponse): SMono[Void] = - uploadContent(contentType, content, mailboxSession) + def handle(accountId: AccountId, contentType: ContentType, content: InputStream, mailboxSession: MailboxSession, response: HttpServerResponse): SMono[Void] = + uploadContent(accountId, contentType, content, mailboxSession) .flatMap(uploadResponse => SMono.fromPublisher(response .header(CONTENT_TYPE, uploadResponse.`type`.asString()) .status(CREATED) .sendString(SMono.just(serializer.serialize(uploadResponse).toString())))) - def uploadContent(contentType: ContentType, inputStream: InputStream, session: MailboxSession): SMono[UploadResponse] = + def uploadContent(accountId: AccountId, contentType: ContentType, inputStream: InputStream, session: MailboxSession): SMono[UploadResponse] = SMono .fromPublisher(attachmentManager.storeAttachment(contentType, inputStream, session)) - .map(fromAttachment) + .map(fromAttachment(_, accountId)) + private def fromAttachment(attachmentMetadata: AttachmentMetadata, accountId: AccountId): UploadResponse = + UploadResponse( + blobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get, + `type` = ContentType.of(attachmentMetadata.getType.asString), + size = sanitizeSize(attachmentMetadata.getSize), + accountId = accountId) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
