This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a3f253e92c920635d25a99e775831acaf664fb7d Author: Benoit Tellier <[email protected]> AuthorDate: Wed Apr 12 13:46:31 2023 +0700 [PERF] CassandraMessageDAOV3: save headers on boundedElastic Those are blocking operations for large files, and were performed on the Cassandra driver event loop. --- .../cassandra/mail/CassandraMessageDAOV3.java | 47 +++++++++++----------- .../cassandra/mail/CassandraMessageMapper.java | 9 ++--- 2 files changed, 26 insertions(+), 30 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java index 575c8852fd..fe45b058e8 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java @@ -70,7 +70,6 @@ import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Attachments; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.AttachmentId; import org.apache.james.mailbox.model.ByteContent; import org.apache.james.mailbox.model.Cid; @@ -99,6 +98,7 @@ import com.google.common.io.ByteSource; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; public class CassandraMessageDAOV3 { @@ -178,7 +178,7 @@ public class CassandraMessageDAOV3 { .build()); } - public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) throws MailboxException { + public Mono<Tuple2<BlobId, BlobId>> save(MailboxMessage message) { return saveContent(message) .flatMap(pair -> cassandraAsyncExecutor.executeVoid(boundWriteStatement(message, pair)).thenReturn(pair)); } @@ -214,32 +214,31 @@ public class CassandraMessageDAOV3 { } - private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) throws MailboxException { - try { - byte[] headerContent = IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets()); - ByteSource bodyByteSource = new ByteSource() { - @Override - public InputStream openStream() { - try { - return message.getBodyContent(); - } catch (IOException e) { - throw new RuntimeException(e); + private Mono<Tuple2<BlobId, BlobId>> saveContent(MailboxMessage message) { + return Mono.fromCallable(() -> IOUtils.toByteArray(message.getHeaderContent(), message.getHeaderOctets())) + .subscribeOn(Schedulers.boundedElastic()) + .flatMap(headerContent -> { + ByteSource bodyByteSource = new ByteSource() { + @Override + public InputStream openStream() { + try { + return message.getBodyContent(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - } - @Override - public long size() { - return message.getBodyOctets(); - } - }; + @Override + public long size() { + return message.getBodyOctets(); + } + }; - Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED)); - Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST)); + Mono<BlobId> headerFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), headerContent, SIZE_BASED)); + Mono<BlobId> bodyFuture = Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST)); - return headerFuture.zipWith(bodyFuture); - } catch (IOException e) { - throw new MailboxException("Error saving mail content", e); - } + return headerFuture.zipWith(bodyFuture); + }); } private BoundStatement boundWriteStatement(MailboxMessage message, Tuple2<BlobId, BlobId> pair) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 5f6924ae0b..04ed2cab13 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -75,7 +75,6 @@ import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Streams; @@ -405,10 +404,8 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return addUidAndModseqAndSaveDate(message, mailboxId) - .flatMap(Throwing.function((MailboxMessage messageWithUidAndModSeq) -> - save(mailbox, messageWithUidAndModSeq) - .thenReturn(messageWithUidAndModSeq))) - .map(MailboxMessage::metaData); + .flatMap(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq) + .thenReturn(messageWithUidAndModSeq.metaData())); } private Mono<MailboxMessage> addUidAndModseqAndSaveDate(MailboxMessage message, CassandraId mailboxId) { @@ -620,7 +617,7 @@ public class CassandraMessageMapper implements MessageMapper { .map(MailboxMessage::metaData); } - private Mono<Void> save(Mailbox mailbox, MailboxMessage message) throws MailboxException { + private Mono<Void> save(Mailbox mailbox, MailboxMessage message) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return messageDAOV3.save(message) .flatMap(headerAndBodyBlobIds -> insertIds(message, mailboxId, headerAndBodyBlobIds.getT1())); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
