This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc550879e JAMES-2586 Reactify AttachmentBlobResolver
0bc550879e is described below

commit 0bc550879e5059ab65d8e62340ccd5d8dd431366
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Fri Jan 17 17:40:40 2025 +0100

    JAMES-2586 Reactify AttachmentBlobResolver
---
 .../apache/james/mailbox/AttachmentManager.java    |  2 +
 .../mailbox/store/StoreAttachmentManager.java      |  8 +++
 .../org/apache/james/modules/MailboxProbeImpl.java |  7 +++
 .../rfc8621/contract/CustomMethodContract.scala    |  6 +--
 .../jmap/rfc8621/contract/DownloadContract.scala   | 28 ++++++++++-
 .../apache/james/jmap/routes/DownloadRoutes.scala  | 58 ++++++++++++----------
 6 files changed, 80 insertions(+), 29 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index 2387ce722c..c1686e4088 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -36,6 +36,8 @@ public interface AttachmentManager extends 
AttachmentContentLoader {
 
     AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession 
mailboxSession) throws MailboxException, AttachmentNotFoundException;
 
+    Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId, 
MailboxSession mailboxSession);
+
     List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds, 
MailboxSession mailboxSession) throws MailboxException;
 
     InputStream loadAttachmentContent(AttachmentId attachmentId, 
MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index 13e471afe9..570543fc5a 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -81,6 +81,14 @@ public class StoreAttachmentManager implements 
AttachmentManager {
         return attachment;
     }
 
+    @Override
+    public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId 
attachmentId, MailboxSession mailboxSession) {
+        return attachmentMapperFactory.getAttachmentMapper(mailboxSession)
+            .getAttachmentReactive(attachmentId)
+            .filterWhen(attachment -> existsReactive(attachmentId, 
mailboxSession))
+            .switchIfEmpty(Mono.error(() -> new 
AttachmentNotFoundException(attachmentId.getId())));
+    }
+
     @Override
     public List<AttachmentMetadata> getAttachments(List<AttachmentId> 
attachmentIds, MailboxSession mailboxSession) throws MailboxException {
         List<AttachmentMetadata> attachments = 
attachmentMapperFactory.getAttachmentMapper(mailboxSession)
diff --git 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
index b01c6ea653..7829a4c87e 100644
--- 
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
+++ 
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
@@ -187,6 +187,13 @@ public class MailboxProbeImpl implements GuiceProbe, 
MailboxProbe {
         return messageManager.appendMessage(appendCommand, 
mailboxSession).getId();
     }
 
+    public MessageManager.AppendResult 
appendMessageRetrieveAppendResult(String username, MailboxPath mailboxPath, 
MessageManager.AppendCommand appendCommand)
+            throws MailboxException {
+        MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of(username));
+        MessageManager messageManager = mailboxManager.getMailbox(mailboxPath, 
mailboxSession);
+        return messageManager.appendMessage(appendCommand, mailboxSession);
+    }
+
     public MessageManager.AppendResult appendMessageAndGetAppendResult(String 
username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand)
         throws MailboxException {
         MailboxSession mailboxSession = 
mailboxManager.createSystemSession(Username.of(username));
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
index 9b71b545fa..71ab030455 100644
--- 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
@@ -221,11 +221,11 @@ case object CustomBlob extends Blob {
 }
 
 class CustomBlobResolver extends BlobResolver {
-  override def resolve(blobId: org.apache.james.jmap.mail.BlobId, 
mailboxSession: MailboxSession): BlobResolutionResult =
+  override def resolve(blobId: org.apache.james.jmap.mail.BlobId, 
mailboxSession: MailboxSession): SMono[BlobResolutionResult] =
     if (blobId.equals(CustomBlob.blobId)) {
-      Applicable(SMono.just(CustomBlob))
+      SMono.just(Applicable(SMono.just(CustomBlob)))
     } else {
-      NonApplicable
+      SMono.just(NonApplicable)
     }
 }
 
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
index 68f9fec1fb..5b2ea8b20d 100644
--- 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
@@ -32,7 +32,7 @@ import 
org.apache.james.jmap.rfc8621.contract.DownloadContract.accountId
 import 
org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, 
ALICE_ACCOUNT_ID, ANDRE, BOB, BOB_PASSWORD, CEDRIC, DOMAIN, authScheme, 
baseRequestSpecBuilder}
 import org.apache.james.mailbox.MessageManager.AppendCommand
 import org.apache.james.mailbox.model.MailboxACL.Right
-import org.apache.james.mailbox.model.{MailboxACL, MailboxPath, MessageId}
+import org.apache.james.mailbox.model.{AttachmentId, MailboxACL, MailboxPath, 
MessageId}
 import org.apache.james.mime4j.dom.Message
 import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
 import org.apache.james.util.ClassLoaderUtils
@@ -92,6 +92,32 @@ trait DownloadContract {
       .hasContent(expectedResponse)
   }
 
+  @Test
+  def downloadMailboxAttachment(server: GuiceJamesServer): Unit = {
+    val path = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path)
+    val attachmentId: AttachmentId = server.getProbe(classOf[MailboxProbeImpl])
+      .appendMessageRetrieveAppendResult(BOB.asString, path, 
AppendCommand.from(
+        
ClassLoaderUtils.getSystemResourceAsSharedStream("eml/multipart_simple.eml")))
+      .getMessageAttachments
+      .get(1).getAttachmentId
+
+    val response = `given`
+        .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER).log().all()
+    .when
+      .get(s"/download/$accountId/${attachmentId.getId()}")
+    .`then`
+      .statusCode(SC_OK)
+      .contentType("application/vnd.ms-publisher; name=\"text2\"")
+      .extract
+      .body
+      .asString
+
+    assertThat(response)
+      .isEqualTo("ssh-rsa 
AAAAB3NzaC1yc2EAAAADAQABAAABAQDHs8bT4T/8QymbsiAjlD1MwNIXJr/WET6+9MmuTSIYWWU94csDn9WVMzRhaAbpfnSqIx8TdUtrN/ZzX2JetPSar/bU9nXAWeiC/jPFQ1qKH4GeDrYXRLKu4T8782OrGH8Jyror97TlNXhPrjdRLEB4bQqmmZhb3HwcD8a9XzfZqlm7GRWLo1WQMGt/NpQLC7jMf4fA6/+kjzsTspxwdgL74GJqPfOXOiwgLHX8CZ6/5RyTqhT6pD3MktSNWaz/zIHPNEqf5BY9CBM1TFR5w+6MDHo0gmiIsXFEJTPnfhBvHDhSjB1RI0KxUClyYrJ4fBlUVeKfnawoVcu7YvCqF4F5
 quynhnn@linagora\n")
+  }
+
   @Test
   def downloadMessageShouldFailWhenUnauthentified(server: GuiceJamesServer): 
Unit = {
     val path = MailboxPath.inbox(BOB)
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 997957f3ba..99bc6f4951 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -48,6 +48,7 @@ import org.apache.james.jmap.mail.{BlobId, 
MinimalEmailBodyPart}
 import org.apache.james.jmap.method.{AccountNotFoundException, ZoneIdProvider}
 import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER}
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.mailbox.exception.AttachmentNotFoundException
 import org.apache.james.mailbox.model.ContentType.{MediaType, MimeType, 
SubType}
 import org.apache.james.mailbox.model._
 import org.apache.james.mailbox.{AttachmentIdFactory, AttachmentManager, 
MailboxSession, MessageIdManager}
@@ -57,10 +58,11 @@ import org.apache.james.mime4j.codec.EncoderUtil.Usage
 import org.apache.james.mime4j.dom.SingleBody
 import org.apache.james.mime4j.message.DefaultMessageWriter
 import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
 import org.slf4j.{Logger, LoggerFactory}
 import play.api.libs.json.Json
 import reactor.core.publisher.Mono
-import reactor.core.scala.publisher.SMono
+import reactor.core.scala.publisher.{SFlux, SMono}
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
 
@@ -84,7 +86,7 @@ case class Applicable(blob: SMono[Blob]) extends 
BlobResolutionResult {
 }
 
 trait BlobResolver {
-  def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
BlobResolutionResult
+  def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
Publisher[BlobResolutionResult]
 }
 
 trait Blob {
@@ -143,13 +145,13 @@ case class EmailBodyPartBlob(blobId: BlobId, part: 
MinimalEmailBodyPart) extends
 
 class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
                                     val messageIdManager: MessageIdManager) 
extends BlobResolver {
-  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
BlobResolutionResult = {
+  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
SMono[BlobResolutionResult] = {
     Try(messageIdFactory.fromString(blobId.value.value)) match {
-      case Failure(_) => NonApplicable
-      case Success(messageId) => Applicable(SMono.fromPublisher(
+      case Failure(_) => SMono.just(NonApplicable)
+      case Success(messageId) => SMono.just(Applicable(SMono.fromPublisher(
         messageIdManager.getMessagesReactive(List(messageId).asJava, 
FetchGroup.FULL_CONTENT, mailboxSession))
         .map[Blob](MessageBlob(blobId, _))
-        .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
+        .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
     }
   }
 }
@@ -157,12 +159,12 @@ class MessageBlobResolver @Inject()(val messageIdFactory: 
MessageId.Factory,
 class UploadResolver @Inject()(val uploadService: UploadService) extends 
BlobResolver {
   private val prefix = "uploads-"
 
-  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
BlobResolutionResult = {
+  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
SMono[BlobResolutionResult] = {
     if (!blobId.value.value.startsWith(prefix)) {
-      NonApplicable
+      SMono.just(NonApplicable)
     } else {
       val uploadIdAsString = blobId.value.value.substring(prefix.length)
-      Try(UploadId.from(uploadIdAsString)) match {
+      SMono.just(Try(UploadId.from(uploadIdAsString)) match {
         case Failure(_) => NonApplicable
         case Success(uploadId) => Applicable(
           SMono(uploadService.retrieve(uploadId, mailboxSession.getUser))
@@ -170,22 +172,23 @@ class UploadResolver @Inject()(val uploadService: 
UploadService) extends BlobRes
             .onErrorResume {
               case _: UploadNotFoundException => 
SMono.error(BlobNotFoundException(blobId))
             })
-      }
+      })
     }
   }
 }
 
 class AttachmentBlobResolver @Inject()(val attachmentManager: 
AttachmentManager, val attachmentIdFactory: AttachmentIdFactory) extends 
BlobResolver {
-  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
BlobResolutionResult =
+  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
SMono[BlobResolutionResult] =
     attachmentIdFactory.from(blobId.value.value) match {
       case attachmentId: StringBackedAttachmentId =>
-        Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) 
match {
-          case Success(attachmentMetadata) =>
-            
Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, 
mailboxSession))
-              .map(content => AttachmentBlob(attachmentMetadata, content)))
-          case Failure(_) => NonApplicable
-        }
-      case _ => NonApplicable
+        SMono(attachmentManager.getAttachmentReactive(attachmentId, 
mailboxSession))
+          .map(attachmentMetadata => 
Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, 
mailboxSession))
+            .map(content => AttachmentBlob(attachmentMetadata, content))))
+          .onErrorResume {
+            case e: AttachmentNotFoundException =>  
SMono.just(NonApplicable.asInstanceOf[BlobResolutionResult])
+            case e => SMono.error[BlobResolutionResult](e)
+          }
+      case _ => SMono.just(NonApplicable)
     }
 }
 
@@ -207,11 +210,11 @@ class MessagePartBlobResolver @Inject()(val 
messageIdFactory: MessageId.Factory,
     case (acc, idPart) => acc.headOption.map(prefix => prefix + "_" + 
idPart).getOrElse(idPart) :: acc
   }.flatMap(s => BlobId.of(s).toOption).take(parts.size).reverse
 
-  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
BlobResolutionResult = {
+  override def resolve(blobId: BlobId, mailboxSession: MailboxSession): 
SMono[BlobResolutionResult] = {
     asMessageAndPartIds(blobId) match {
-      case Failure(_) => NonApplicable
+      case Failure(_) => SMono.just(NonApplicable)
       case Success((messageId, blobIds)) =>
-        Applicable(SMono.fromPublisher(
+        SMono.just(Applicable(SMono.fromPublisher(
           messageIdManager.getMessagesReactive(List(messageId).asJava, 
FetchGroup.FULL_CONTENT, mailboxSession))
           .handle[MinimalEmailBodyPart] {
             case (message, sink) => MinimalEmailBodyPart.ofMessage(None, 
zoneIdSupplier.get(), BlobId.of(messageId).get, message)
@@ -227,7 +230,7 @@ class MessagePartBlobResolver @Inject()(val 
messageIdFactory: MessageId.Factory,
                 .fold(sink.error(BlobNotFoundException(blobId)))(part => 
sink.next(part))
           }
           .map[Blob](EmailBodyPartBlob(blobId, _))
-          .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
+          .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
     }
   }
 }
@@ -240,9 +243,14 @@ class BlobResolvers(blobResolvers: Set[BlobResolver]) {
   }
 
   def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] =
-    blobResolvers.flatMap(resolver => resolver.resolve(blobId, 
mailboxSession).asOption)
-      .headOption
-      .getOrElse(SMono.error(BlobNotFoundException(blobId)))
+    SFlux.fromIterable(blobResolvers)
+      .concatMap(resolver => resolver.resolve(blobId, mailboxSession))
+      .filter {
+        case NonApplicable => false
+        case _: Applicable => true
+      }
+      .concatMap(result => 
result.asOption.getOrElse(SMono.error(BlobNotFoundException(blobId))))
+      .next().switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))
 }
 
 class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val 
authenticator: Authenticator,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to