This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a3fed7cc07c5591e582a00702ef667bcdacc6586 Author: Benoit Tellier <[email protected]> AuthorDate: Tue Mar 14 18:18:27 2023 +0700 [FIX] Reactify attachments This avoids blocking calls into a parrallel thread, or into a Cassandra driver thread... --- .../james/mailbox/AttachmentContentLoader.java | 7 ++++++ .../apache/james/mailbox/AttachmentManager.java | 9 ++++++++ .../cassandra/mail/CassandraAttachmentMapper.java | 14 +++++++++++ .../mailbox/store/StoreAttachmentManager.java | 24 +++++++++++++++++++ .../james/mailbox/store/mail/AttachmentMapper.java | 10 ++++++++ .../james/jmap/draft/methods/BlobManagerImpl.java | 27 ++++++++++++++++++---- .../org/apache/james/jmap/draft/model/Blob.java | 12 ++++++++++ .../org/apache/james/jmap/http/DownloadRoutes.java | 6 +++-- .../jmap/draft/methods/BlobManagerImplTest.java | 5 ++-- .../apache/james/jmap/routes/DownloadRoutes.scala | 5 ++-- 10 files changed, 108 insertions(+), 11 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java index 9536c722a1..5417a56197 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentContentLoader.java @@ -24,9 +24,16 @@ import java.io.InputStream; import org.apache.james.mailbox.exception.AttachmentNotFoundException; import org.apache.james.mailbox.model.AttachmentMetadata; +import org.apache.james.util.ReactorUtils; + +import reactor.core.publisher.Mono; public interface AttachmentContentLoader { InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException; + default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) { + return Mono.fromCallable(() -> load(attachment, mailboxSession)) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER); + } } diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java index 5ed6de1ad3..2387ce722c 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java @@ -28,6 +28,8 @@ import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.AttachmentMetadata; +import reactor.core.publisher.Mono; + public interface AttachmentManager extends AttachmentContentLoader { boolean exists(AttachmentId attachmentId, MailboxSession session) throws MailboxException; @@ -38,9 +40,16 @@ public interface AttachmentManager extends AttachmentContentLoader { InputStream loadAttachmentContent(AttachmentId attachmentId, MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException; + Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession); + @Override default InputStream load(AttachmentMetadata attachment, MailboxSession mailboxSession) throws IOException, AttachmentNotFoundException { return loadAttachmentContent(attachment.getAttachmentId(), mailboxSession); } + @Override + default Mono<InputStream> loadReactive(AttachmentMetadata attachment, MailboxSession mailboxSession) { + return loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession); + } + } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index a1200efc53..b1b3e16fb0 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -72,6 +72,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper { .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.getId())); } + @Override + public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) { + Preconditions.checkArgument(attachmentId != null); + return getAttachmentInternal(attachmentId) + .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId()))); + } + @Override public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) { Preconditions.checkArgument(attachmentIds != null); @@ -89,6 +96,13 @@ public class CassandraAttachmentMapper implements AttachmentMapper { .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString())); } + @Override + public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) { + return attachmentDAOV2.getAttachment(attachmentId, messageIdFallback(attachmentId)) + .flatMap(daoAttachment -> Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST))) + .switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.toString()))); + } + private Mono<CassandraMessageId> messageIdFallback(AttachmentId attachmentId) { return attachmentMessageIdDAO.getOwnerMessageIds(attachmentId) .map(CassandraMessageId.class::cast) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java index c095c9a0e4..b11ee9eccc 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java @@ -38,6 +38,8 @@ import org.apache.james.mailbox.store.mail.AttachmentMapperFactory; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; + public class StoreAttachmentManager implements AttachmentManager { private final AttachmentMapperFactory attachmentMapperFactory; private final MessageIdManager messageIdManager; @@ -55,10 +57,21 @@ public class StoreAttachmentManager implements AttachmentManager { return exists(attachment, session); } + public Mono<Boolean> existsReactive(AttachmentId attachmentId, MailboxSession session) { + return attachmentMapperFactory.getAttachmentMapper(session) + .getAttachmentReactive(attachmentId) + .flatMap(attachment -> existsReactive(attachment, session)); + } + public boolean exists(AttachmentMetadata attachment, MailboxSession session) throws MailboxException { return !messageIdManager.accessibleMessages(ImmutableList.of(attachment.getMessageId()), session).isEmpty(); } + public Mono<Boolean> existsReactive(AttachmentMetadata attachment, MailboxSession session) { + return Mono.from(messageIdManager.accessibleMessagesReactive(ImmutableList.of(attachment.getMessageId()), session)) + .map(accessibleMessages -> !accessibleMessages.isEmpty()); + } + @Override public AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession mailboxSession) throws MailboxException, AttachmentNotFoundException { AttachmentMetadata attachment = attachmentMapperFactory.getAttachmentMapper(mailboxSession).getAttachment(attachmentId); @@ -94,4 +107,15 @@ public class StoreAttachmentManager implements AttachmentManager { } return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContent(attachmentId); } + + @Override + public Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId, MailboxSession mailboxSession) { + return existsReactive(attachmentId, mailboxSession) + .flatMap(exist -> { + if (!exist) { + return Mono.error(new AttachmentNotFoundException(attachmentId.getId())); + } + return attachmentMapperFactory.getAttachmentMapper(mailboxSession).loadAttachmentContentReactive(attachmentId); + }); + } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java index 10967da25c..f566aa5f49 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AttachmentMapper.java @@ -39,8 +39,18 @@ public interface AttachmentMapper extends Mapper { InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException; + default Mono<InputStream> loadAttachmentContentReactive(AttachmentId attachmentId) { + return Mono.fromCallable(() -> loadAttachmentContent(attachmentId)) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER); + } + AttachmentMetadata getAttachment(AttachmentId attachmentId) throws AttachmentNotFoundException; + default Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId) { + return Mono.fromCallable(() -> getAttachment(attachmentId)) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER); + } + List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds); List<MessageAttachmentMetadata> storeAttachments(Collection<ParsedAttachment> attachments, MessageId ownerMessageId) throws MailboxException; diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java index c3e1d6d34e..5d7211d4b5 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/BlobManagerImpl.java @@ -19,6 +19,8 @@ package org.apache.james.jmap.draft.methods; +import java.io.IOException; +import java.io.InputStream; import java.util.Collection; import java.util.List; import java.util.Optional; @@ -156,11 +158,26 @@ public class BlobManagerImpl implements BlobManager { BlobId blobId = BlobId.of(attachment.getAttachmentId()); return Blob.builder() .id(blobId) - .payload(() -> { - try { - return attachmentManager.loadAttachmentContent(attachment.getAttachmentId(), mailboxSession); - } catch (AttachmentNotFoundException e) { - throw new BlobNotFoundException(blobId, e); + .payload(new Blob.InputStreamSupplier() { + @Override + public InputStream load() throws IOException, BlobNotFoundException { + try { + return loadReactive().block(); + } catch (RuntimeException e) { + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + if (e.getCause() instanceof BlobNotFoundException) { + throw (BlobNotFoundException) e.getCause(); + } + throw e; + } + } + + @Override + public Mono<InputStream> loadReactive() { + return attachmentManager.loadAttachmentContentReactive(attachment.getAttachmentId(), mailboxSession) + .onErrorResume(AttachmentNotFoundException.class, e -> Mono.error(new BlobNotFoundException(blobId, e))); } }) .size(attachment.getSize()) diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java index a4bae5353c..9e718f1ebb 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/model/Blob.java @@ -25,11 +25,14 @@ import java.util.Objects; import org.apache.james.jmap.draft.exceptions.BlobNotFoundException; import org.apache.james.mailbox.model.ContentType; +import org.apache.james.util.ReactorUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import reactor.core.publisher.Mono; + public class Blob { @FunctionalInterface @@ -40,6 +43,11 @@ public class Blob { * The caller is responsible of closing it. */ InputStream load() throws IOException, BlobNotFoundException; + + default Mono<InputStream> loadReactive() { + return Mono.fromCallable(this::load) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER); + } } public static class Builder { @@ -111,6 +119,10 @@ public class Blob { return payload.load(); } + public Mono<InputStream> getStreamReactive() { + return payload.loadReactive(); + } + public long getSize() { return size; } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java index 9c55521dd9..40231a7850 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/http/DownloadRoutes.java @@ -213,11 +213,13 @@ public class DownloadRoutes implements JMAPRoutes { String blobId = downloadPath.getBlobId(); return Mono.from(blobManager.retrieve(ImmutableList.of(BlobId.of(blobId)), mailboxSession)) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER) .switchIfEmpty(Mono.error(() -> new BlobNotFoundException(BlobId.of(blobId)))) .flatMap(blob -> Mono.usingWhen( - Mono.fromCallable(blob::getStream), + blob.getStreamReactive(), stream -> downloadBlob(downloadPath.getName(), response, blob.getSize(), blob.getContentType(), stream), - stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow()))) + stream -> Mono.fromRunnable(Throwing.runnable(stream::close).sneakyThrow())) + .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)) .onErrorResume(BlobNotFoundException.class, e -> { LOGGER.info("Attachment '{}' not found", blobId, e); return response.status(NOT_FOUND).send(); diff --git a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java index 1b412653ac..5fdf8d096e 100644 --- a/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java +++ b/server/protocols/jmap-draft/src/test/java/org/apache/james/jmap/draft/methods/BlobManagerImplTest.java @@ -63,6 +63,7 @@ import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; class BlobManagerImplTest { static final String ID = "abc"; @@ -97,8 +98,8 @@ class BlobManagerImplTest { .size(BYTES.length) .type(CONTENT_TYPE) .build()); - when(attachmentManager.loadAttachmentContent(ATTACHMENT_ID, session)) - .thenReturn(new ByteArrayInputStream(BYTES)); + when(attachmentManager.loadAttachmentContentReactive(ATTACHMENT_ID, session)) + .thenReturn(Mono.just(new ByteArrayInputStream(BYTES))); Blob blob = blobManager.retrieve(BLOB_ID_ATTACHMENT, session); diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala index 8b3446bd55..148111a084 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala @@ -178,8 +178,9 @@ class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) AttachmentId.from(blobId.value.value) match { case attachmentId: AttachmentId => Try(attachmentManager.getAttachment(attachmentId, mailboxSession)) match { - case Success(attachmentMetadata) => Applicable( - SMono.fromCallable(() => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession)))) + case Success(attachmentMetadata) => + Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata, mailboxSession)) + .map(content => AttachmentBlob(attachmentMetadata, content))) case Failure(_) => NonApplicable } case _ => NonApplicable --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
