This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 0bc550879e JAMES-2586 Reactify AttachmentBlobResolver 0bc550879e is described below commit 0bc550879e5059ab65d8e62340ccd5d8dd431366 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Fri Jan 17 17:40:40 2025 +0100 JAMES-2586 Reactify AttachmentBlobResolver --- .../apache/james/mailbox/AttachmentManager.java | 2 + .../mailbox/store/StoreAttachmentManager.java | 8 +++ .../org/apache/james/modules/MailboxProbeImpl.java | 7 +++ .../rfc8621/contract/CustomMethodContract.scala | 6 +-- .../jmap/rfc8621/contract/DownloadContract.scala | 28 ++++++++++- .../apache/james/jmap/routes/DownloadRoutes.scala | 58 ++++++++++++---------- 6 files changed, 80 insertions(+), 29 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java index 2387ce722c..c1686e4088 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java @@ -36,6 +36,8 @@ public interface AttachmentManager extends AttachmentContentLoader { AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException; + Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId, MailboxSession mailboxSession); + List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException; InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException; diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java index 13e471afe9..570543fc5a 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java @@ -81,6 +81,14 @@ public class StoreAttachmentManager implements AttachmentManager { return attachment; } + @Override + public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) { + return attachmentMapperFactory.getAttachmentMapper(mailboxSession) + .getAttachmentReactive(attachmentId) + .filterWhen(attachment -> existsReactive(attachmentId, mailboxSession)) + .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId()))); + } + @Override public List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds, MailboxSession mailboxSession) throws MailboxException { List<AttachmentMetadata> attachments = attachmentMapperFactory.getAttachmentMapper(mailboxSession) diff --git a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java index b01c6ea653..7829a4c87e 100644 --- a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java +++ b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java @@ -187,6 +187,13 @@ public class MailboxProbeImpl implements GuiceProbe, MailboxProbe { return messageManager.appendMessage(appendCommand, mailboxSession).getId(); } + public MessageManager.AppendResult appendMessageRetrieveAppendResult(String username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand) + throws MailboxException { + MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username)); + MessageManager messageManager = mailboxManager.getMailbox(mailboxPath, mailboxSession); + return messageManager.appendMessage(appendCommand, mailboxSession); + } + public MessageManager.AppendResult appendMessageAndGetAppendResult(String username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand) throws MailboxException { MailboxSession mailboxSession = mailboxManager.createSystemSession(Username.of(username)); diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala index 9b71b545fa..71ab030455 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala @@ -221,11 +221,11 @@ case object CustomBlob extends Blob { } class CustomBlobResolver extends BlobResolver { - override def resolve(blobId: org.apache.james.jmap.mail.BlobId, mailboxSession: MailboxSession): BlobResolutionResult = + override def resolve(blobId: org.apache.james.jmap.mail.BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = if (blobId.equals(CustomBlob.blobId)) { - Applicable(SMono.just(CustomBlob)) + SMono.just(Applicable(SMono.just(CustomBlob))) } else { - NonApplicable + SMono.just(NonApplicable) } } diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala index 68f9fec1fb..5b2ea8b20d 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala @@ -32,7 +32,7 @@ import org.apache.james.jmap.rfc8621.contract.DownloadContract.accountId import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ALICE_ACCOUNT_ID, ANDRE, BOB, BOB_PASSWORD, CEDRIC, DOMAIN, authScheme, baseRequestSpecBuilder} import org.apache.james.mailbox.MessageManager.AppendCommand import org.apache.james.mailbox.model.MailboxACL.Right -import org.apache.james.mailbox.model.{MailboxACL, MailboxPath, MessageId} +import org.apache.james.mailbox.model.{AttachmentId, MailboxACL, MailboxPath, MessageId} import org.apache.james.mime4j.dom.Message import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl} import org.apache.james.util.ClassLoaderUtils @@ -92,6 +92,32 @@ trait DownloadContract { .hasContent(expectedResponse) } + @Test + def downloadMailboxAttachment(server: GuiceJamesServer): Unit = { + val path = MailboxPath.inbox(BOB) + server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path) + val attachmentId: AttachmentId = server.getProbe(classOf[MailboxProbeImpl]) + .appendMessageRetrieveAppendResult(BOB.asString, path, AppendCommand.from( + ClassLoaderUtils.getSystemResourceAsSharedStream("eml/multipart_simple.eml"))) + .getMessageAttachments + .get(1).getAttachmentId + + val response = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER).log().all() + .when + .get(s"/download/$accountId/${attachmentId.getId()}") + .`then` + .statusCode(SC_OK) + .contentType("application/vnd.ms-publisher; name=\"text2\"") + .extract + .body + .asString + + assertThat(response) + .isEqualTo("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDHs8bT4T/8QymbsiAjlD1MwNIXJr/WET6+9MmuTSIYWWU94csDn9WVMzRhaAbpfnSqIx8TdUtrN/ZzX2JetPSar/bU9nXAWeiC/jPFQ1qKH4GeDrYXRLKu4T8782OrGH8Jyror97TlNXhPrjdRLEB4bQqmmZhb3HwcD8a9XzfZqlm7GRWLo1WQMGt/NpQLC7jMf4fA6/+kjzsTspxwdgL74GJqPfOXOiwgLHX8CZ6/5RyTqhT6pD3MktSNWaz/zIHPNEqf5BY9CBM1TFR5w+6MDHo0gmiIsXFEJTPnfhBvHDhSjB1RI0KxUClyYrJ4fBlUVeKfnawoVcu7YvCqF4F5 quynhnn@linagora\n") + } + @Test def downloadMessageShouldFailWhenUnauthentified(server: GuiceJamesServer): Unit = { val path = MailboxPath.inbox(BOB) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala index 997957f3ba..99bc6f4951 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala @@ -48,6 +48,7 @@ import org.apache.james.jmap.mail.{BlobId, MinimalEmailBodyPart} import org.apache.james.jmap.method.{AccountNotFoundException, ZoneIdProvider} import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER} import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} +import org.apache.james.mailbox.exception.AttachmentNotFoundException import org.apache.james.mailbox.model.ContentType.{MediaType, MimeType, SubType} import org.apache.james.mailbox.model._ import org.apache.james.mailbox.{AttachmentIdFactory, AttachmentManager, MailboxSession, MessageIdManager} @@ -57,10 +58,11 @@ import org.apache.james.mime4j.codec.EncoderUtil.Usage import org.apache.james.mime4j.dom.SingleBody import org.apache.james.mime4j.message.DefaultMessageWriter import org.apache.james.util.ReactorUtils +import org.reactivestreams.Publisher import org.slf4j.{Logger, LoggerFactory} import play.api.libs.json.Json import reactor.core.publisher.Mono -import reactor.core.scala.publisher.SMono +import reactor.core.scala.publisher.{SFlux, SMono} import reactor.core.scheduler.Schedulers import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} @@ -84,7 +86,7 @@ case class Applicable(blob: SMono[Blob]) extends BlobResolutionResult { } trait BlobResolver { - def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult + def resolve(blobId: BlobId, mailboxSession: MailboxSession): Publisher[BlobResolutionResult] } trait Blob { @@ -143,13 +145,13 @@ case class EmailBodyPartBlob(blobId: BlobId, part: MinimalEmailBodyPart) extends class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, val messageIdManager: MessageIdManager) extends BlobResolver { - override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = { + override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = { Try(messageIdFactory.fromString(blobId.value.value)) match { - case Failure(_) => NonApplicable - case Success(messageId) => Applicable(SMono.fromPublisher( + case Failure(_) => SMono.just(NonApplicable) + case Success(messageId) => SMono.just(Applicable(SMono.fromPublisher( messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession)) .map[Blob](MessageBlob(blobId, _)) - .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))) + .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))) } } } @@ -157,12 +159,12 @@ class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, class UploadResolver @Inject()(val uploadService: UploadService) extends BlobResolver { private val prefix = "uploads-" - override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = { + override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = { if (!blobId.value.value.startsWith(prefix)) { - NonApplicable + SMono.just(NonApplicable) } else { val uploadIdAsString = blobId.value.value.substring(prefix.length) - Try(UploadId.from(uploadIdAsString)) match { + SMono.just(Try(UploadId.from(uploadIdAsString)) match { case Failure(_) => NonApplicable case Success(uploadId) => Applicable( SMono(uploadService.retrieve(uploadId, mailboxSession.getUser)) @@ -170,22 +172,23 @@ class UploadResolver @Inject()(val uploadService: UploadService) extends BlobRes .onErrorResume { case _: UploadNotFoundException => SMono.error(BlobNotFoundException(blobId)) }) - } + }) } } } class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager, val attachmentIdFactory: AttachmentIdFactory) extends BlobResolver { - override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = + override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = attachmentIdFactory.from(blobId.value.value) match { case attachmentId: StringBackedAttachmentId => - Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match { - case Success(attachmentMetadata) => - Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession)) - .map(content => AttachmentBlob(attachmentMetadata, content))) - case Failure(_) => NonApplicable - } - case _ => NonApplicable + SMono(attachmentManager.getAttachmentReactive(attachmentId, mailboxSession)) + .map(attachmentMetadata => Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession)) + .map(content => AttachmentBlob(attachmentMetadata, content)))) + .onErrorResume { + case e: AttachmentNotFoundException => SMono.just(NonApplicable.asInstanceOf[BlobResolutionResult]) + case e => SMono.error[BlobResolutionResult](e) + } + case _ => SMono.just(NonApplicable) } } @@ -207,11 +210,11 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, case (acc, idPart) => acc.headOption.map(prefix => prefix + "_" + idPart).getOrElse(idPart) :: acc }.flatMap(s => BlobId.of(s).toOption).take(parts.size).reverse - override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = { + override def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[BlobResolutionResult] = { asMessageAndPartIds(blobId) match { - case Failure(_) => NonApplicable + case Failure(_) => SMono.just(NonApplicable) case Success((messageId, blobIds)) => - Applicable(SMono.fromPublisher( + SMono.just(Applicable(SMono.fromPublisher( messageIdManager.getMessagesReactive(List(messageId).asJava, FetchGroup.FULL_CONTENT, mailboxSession)) .handle[MinimalEmailBodyPart] { case (message, sink) => MinimalEmailBodyPart.ofMessage(None, zoneIdSupplier.get(), BlobId.of(messageId).get, message) @@ -227,7 +230,7 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, .fold(sink.error(BlobNotFoundException(blobId)))(part => sink.next(part)) } .map[Blob](EmailBodyPartBlob(blobId, _)) - .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))) + .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))) } } } @@ -240,9 +243,14 @@ class BlobResolvers(blobResolvers: Set[BlobResolver]) { } def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] = - blobResolvers.flatMap(resolver => resolver.resolve(blobId, mailboxSession).asOption) - .headOption - .getOrElse(SMono.error(BlobNotFoundException(blobId))) + SFlux.fromIterable(blobResolvers) + .concatMap(resolver => resolver.resolve(blobId, mailboxSession)) + .filter { + case NonApplicable => false + case _: Applicable => true + } + .concatMap(result => result.asOption.getOrElse(SMono.error(BlobNotFoundException(blobId)))) + .next().switchIfEmpty(SMono.error(BlobNotFoundException(blobId))) } class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org