This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ef3003a756147756bf7e45fc66745893286c2123
Author: Tung Tran <[email protected]>
AuthorDate: Mon Mar 18 18:05:33 2024 +0700

    JAMES-2586 Avoid sorting PG messages
---
 .../james/mailbox/postgres/mail/AttachmentLoader.java  | 18 ++++++++----------
 .../mailbox/postgres/mail/PostgresMessageIdMapper.java |  9 ++++-----
 .../mailbox/postgres/mail/PostgresMessageMapper.java   |  8 +++-----
 3 files changed, 15 insertions(+), 20 deletions(-)

diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
index 874d463c2c..4927867ce4 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
@@ -31,7 +31,6 @@ import 
org.apache.james.mailbox.model.MessageAttachmentMetadata;
 import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.ReactorUtils;
 import org.jooq.Record;
 
 import com.google.common.collect.ImmutableMap;
@@ -49,19 +48,18 @@ public class AttachmentLoader {
 
 
     public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
addAttachmentToMessage(Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagePublisher, MessageMapper.FetchType fetchType) {
-        return findMessagePublisher.flatMap(pair -> {
-            if (fetchType == MessageMapper.FetchType.FULL || fetchType == 
MessageMapper.FetchType.ATTACHMENTS_METADATA) {
-                return Mono.fromCallable(() -> 
pair.getRight().get(ATTACHMENT_METADATA))
-                    .map(e -> toMap((AttachmentsDTO) e))
+        if (fetchType != MessageMapper.FetchType.FULL && fetchType != 
MessageMapper.FetchType.ATTACHMENTS_METADATA) {
+            return findMessagePublisher;
+        }
+
+        return findMessagePublisher.collectList()  // convert to list to avoid 
hanging the database connection with Jooq
+            .flatMapMany(list -> Flux.fromIterable(list)
+                .flatMapSequential(pair -> Mono.fromCallable(() -> 
toMap(pair.getRight().get(ATTACHMENT_METADATA)))
                     .flatMap(this::getAttachments)
                     .map(messageAttachmentMetadata -> {
                         
pair.getLeft().addAttachments(messageAttachmentMetadata);
                         return pair;
-                    }).switchIfEmpty(Mono.just(pair));
-            } else {
-                return Mono.just(pair);
-            }
-        }, ReactorUtils.DEFAULT_CONCURRENCY);
+                    }).switchIfEmpty(Mono.just(pair))));
     }
 
     private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> 
toMap(AttachmentsDTO attachmentRepresentations) {
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
index e9df32ae4a..01b7304eda 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.time.Clock;
 import java.util.Collection;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.List;
 import java.util.function.Function;
@@ -137,17 +136,17 @@ public class PostgresMessageIdMapper implements 
MessageIdMapper {
 
     @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, 
MessageMapper.FetchType fetchType) {
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
fetchMessageWithoutFullContentPublisher = 
mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
 fetchType);
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= 
attachmentLoader.addAttachmentToMessage(fetchMessageWithoutFullContentPublisher,
 fetchType);
+        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
=
+            
mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
 fetchType)
+                .transform(pairFlux -> 
attachmentLoader.addAttachmentToMessage(pairFlux, fetchType));
 
         if (fetchType == MessageMapper.FetchType.FULL) {
             return fetchMessagePublisher
-                .flatMap(messageBuilderAndRecord -> {
+                .flatMapSequential(messageBuilderAndRecord -> {
                     SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
                     return 
retrieveFullContent(messageBuilderAndRecord.getRight())
                         .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
                 }, ReactorUtils.DEFAULT_CONCURRENCY)
-                .sort(Comparator.comparing(MailboxMessage::getUid))
                 .map(message -> message);
         } else {
             return fetchMessagePublisher
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
index 324c244a38..ef9bb0afe9 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -27,7 +27,6 @@ import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.Messa
 import java.io.IOException;
 import java.io.InputStream;
 import java.time.Clock;
-import java.util.Comparator;
 import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
@@ -136,17 +135,16 @@ public class PostgresMessageMapper implements 
MessageMapper {
 
     @Override
     public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, 
MessageRange messageRange, FetchType fetchType, int limitAsInt) {
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
fetchMessageWithoutFullContentPublisher = 
fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt);
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= 
attachmentLoader.addAttachmentToMessage(fetchMessageWithoutFullContentPublisher,
 fetchType);
+        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt)
+            .transform(pairFlux -> 
attachmentLoader.addAttachmentToMessage(pairFlux, fetchType));
 
         if (fetchType == FetchType.FULL) {
             return fetchMessagePublisher
-                .flatMap(messageBuilderAndRecord -> {
+                .flatMapSequential(messageBuilderAndRecord -> {
                     SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
                     return 
retrieveFullContent(messageBuilderAndRecord.getRight())
                         .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
                 }, ReactorUtils.DEFAULT_CONCURRENCY)
-                .sort(Comparator.comparing(MailboxMessage::getUid))
                 .map(message -> message);
         } else {
             return fetchMessagePublisher


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to