This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f8b5e8c0a27377646c7fd89e8521160243acad60
Author: Tung Tran <[email protected]>
AuthorDate: Mon Mar 18 20:48:06 2024 +0700

    JAMES-2586 [REFACTORING] - Extract dedicated class for retrieving Postgres 
Message
    
    in order to remove duplicated code:
    - Introduce PostgresMessageRetriever  (= AttachmentLoader + Retrieve byte 
message content)
---
 .../mailbox/postgres/mail/AttachmentLoader.java    |  84 ------------
 .../postgres/mail/PostgresMessageIdMapper.java     |  38 +-----
 .../postgres/mail/PostgresMessageMapper.java       |  39 +-----
 .../postgres/mail/PostgresMessageRetriever.java    | 142 +++++++++++++++++++++
 4 files changed, 151 insertions(+), 152 deletions(-)

diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
deleted file mode 100644
index 4927867ce4..0000000000
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.mailbox.postgres.mail;
-
-import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.ATTACHMENT_METADATA;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.james.mailbox.model.AttachmentId;
-import org.apache.james.mailbox.model.AttachmentMetadata;
-import org.apache.james.mailbox.model.MessageAttachmentMetadata;
-import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO;
-import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.jooq.Record;
-
-import com.google.common.collect.ImmutableMap;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-public class AttachmentLoader {
-
-    private final PostgresAttachmentMapper attachmentMapper;
-
-    public AttachmentLoader(PostgresAttachmentMapper attachmentMapper) {
-        this.attachmentMapper = attachmentMapper;
-    }
-
-
-    public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
addAttachmentToMessage(Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagePublisher, MessageMapper.FetchType fetchType) {
-        if (fetchType != MessageMapper.FetchType.FULL && fetchType != 
MessageMapper.FetchType.ATTACHMENTS_METADATA) {
-            return findMessagePublisher;
-        }
-
-        return findMessagePublisher.collectList()  // convert to list to avoid 
hanging the database connection with Jooq
-            .flatMapMany(list -> Flux.fromIterable(list)
-                .flatMapSequential(pair -> Mono.fromCallable(() -> 
toMap(pair.getRight().get(ATTACHMENT_METADATA)))
-                    .flatMap(this::getAttachments)
-                    .map(messageAttachmentMetadata -> {
-                        
pair.getLeft().addAttachments(messageAttachmentMetadata);
-                        return pair;
-                    }).switchIfEmpty(Mono.just(pair))));
-    }
-
-    private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> 
toMap(AttachmentsDTO attachmentRepresentations) {
-        return 
attachmentRepresentations.stream().collect(ImmutableMap.toImmutableMap(MessageRepresentation.AttachmentRepresentation::getAttachmentId,
 obj -> obj));
-    }
-
-    private Mono<List<MessageAttachmentMetadata>> 
getAttachments(Map<AttachmentId, 
MessageRepresentation.AttachmentRepresentation> 
mapAttachmentIdToAttachmentRepresentation) {
-        return 
Mono.fromCallable(mapAttachmentIdToAttachmentRepresentation::keySet)
-            .flatMapMany(attachmentMapper::getAttachmentsReactive)
-            .map(attachmentMetadata -> 
constructMessageAttachment(attachmentMetadata, 
mapAttachmentIdToAttachmentRepresentation.get(attachmentMetadata.getAttachmentId())))
-            .collectList();
-    }
-
-    private MessageAttachmentMetadata 
constructMessageAttachment(AttachmentMetadata attachment, 
MessageRepresentation.AttachmentRepresentation messageAttachmentRepresentation) 
{
-        return MessageAttachmentMetadata.builder()
-            .attachment(attachment)
-            .name(messageAttachmentRepresentation.getName().orElse(null))
-            .cid(messageAttachmentRepresentation.getCid())
-            .isInline(messageAttachmentRepresentation.isInline())
-            .build();
-    }
-}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
index 01b7304eda..961b51fb53 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
@@ -20,9 +20,6 @@
 package org.apache.james.mailbox.postgres.mail;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
-import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
-import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID;
-import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -44,8 +41,6 @@ import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.Content;
-import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
@@ -100,9 +95,8 @@ public class PostgresMessageIdMapper implements 
MessageIdMapper {
     private final PostgresMailboxMessageDAO mailboxMessageDAO;
     private final PostgresModSeqProvider modSeqProvider;
     private final BlobStore blobStore;
-    private final BlobId.Factory blobIdFactory;
     private final Clock clock;
-    private final AttachmentLoader attachmentLoader;
+    private final PostgresMessageRetriever messageRetriever;
 
     public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO,
                                    PostgresMessageDAO messageDAO,
@@ -117,9 +111,8 @@ public class PostgresMessageIdMapper implements 
MessageIdMapper {
         this.mailboxMessageDAO = mailboxMessageDAO;
         this.modSeqProvider = modSeqProvider;
         this.blobStore = blobStore;
-        this.blobIdFactory = blobIdFactory;
         this.clock = clock;
-        this.attachmentLoader = new AttachmentLoader(attachmentMapper);;
+        this.messageRetriever = new PostgresMessageRetriever(blobStore, 
blobIdFactory, attachmentMapper);
     }
 
     @Override
@@ -136,23 +129,8 @@ public class PostgresMessageIdMapper implements 
MessageIdMapper {
 
     @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, 
MessageMapper.FetchType fetchType) {
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
=
-            
mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
 fetchType)
-                .transform(pairFlux -> 
attachmentLoader.addAttachmentToMessage(pairFlux, fetchType));
-
-        if (fetchType == MessageMapper.FetchType.FULL) {
-            return fetchMessagePublisher
-                .flatMapSequential(messageBuilderAndRecord -> {
-                    SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
-                    return 
retrieveFullContent(messageBuilderAndRecord.getRight())
-                        .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
-                }, ReactorUtils.DEFAULT_CONCURRENCY)
-                .map(message -> message);
-        } else {
-            return fetchMessagePublisher
-                .map(messageBuilderAndBlobId -> 
messageBuilderAndBlobId.getLeft()
-                    .build());
-        }
+        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= 
mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
 fetchType);
+        return messageRetriever.get(fetchType, fetchMessagePublisher);
     }
 
     @Override
@@ -272,14 +250,6 @@ public class PostgresMessageIdMapper implements 
MessageIdMapper {
         return oldComposedId.getFlags().equals(newFlags);
     }
 
-    private Mono<Content> retrieveFullContent(Record messageRecord) {
-        byte[] headerBytes = messageRecord.get(HEADER_CONTENT);
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
-                blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)),
-                SIZE_BASED))
-            .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, 
bodyBytes));
-    }
-
     private Mono<BlobId> saveBodyContent(MailboxMessage message) {
         return Mono.fromCallable(() -> 
MESSAGE_BODY_CONTENT_LOADER.apply(message))
             .flatMap(bodyByteSource -> 
Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, 
LOW_COST)));
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
index ef9bb0afe9..ff00ee4e2f 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -20,9 +20,6 @@
 package org.apache.james.mailbox.postgres.mail;
 
 import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
-import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
-import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID;
-import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -48,8 +45,6 @@ import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
-import org.apache.james.mailbox.model.Content;
-import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxCounters;
 import org.apache.james.mailbox.model.MessageMetaData;
@@ -65,7 +60,6 @@ import org.apache.james.mailbox.store.MailboxReactorUtils;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Limit;
 import org.jooq.Record;
 
@@ -100,8 +94,7 @@ public class PostgresMessageMapper implements MessageMapper {
     private final PostgresUidProvider uidProvider;
     private final BlobStore blobStore;
     private final Clock clock;
-    private final BlobId.Factory blobIdFactory;
-    private final AttachmentLoader attachmentLoader;
+    private final PostgresMessageRetriever messageRetriever;
 
     public PostgresMessageMapper(PostgresExecutor postgresExecutor,
                                  PostgresModSeqProvider modSeqProvider,
@@ -116,8 +109,8 @@ public class PostgresMessageMapper implements MessageMapper 
{
         this.uidProvider = uidProvider;
         this.blobStore = blobStore;
         this.clock = clock;
-        this.blobIdFactory = blobIdFactory;
-        this.attachmentLoader = new AttachmentLoader(new 
PostgresAttachmentMapper(new PostgresAttachmentDAO(postgresExecutor, 
blobIdFactory), blobStore));
+        PostgresAttachmentMapper attachmentMapper = new 
PostgresAttachmentMapper(new PostgresAttachmentDAO(postgresExecutor, 
blobIdFactory), blobStore);
+        this.messageRetriever = new PostgresMessageRetriever(blobStore, 
blobIdFactory, attachmentMapper);
     }
 
 
@@ -135,22 +128,8 @@ public class PostgresMessageMapper implements 
MessageMapper {
 
     @Override
     public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, 
MessageRange messageRange, FetchType fetchType, int limitAsInt) {
-        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt)
-            .transform(pairFlux -> 
attachmentLoader.addAttachmentToMessage(pairFlux, fetchType));
-
-        if (fetchType == FetchType.FULL) {
-            return fetchMessagePublisher
-                .flatMapSequential(messageBuilderAndRecord -> {
-                    SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
-                    return 
retrieveFullContent(messageBuilderAndRecord.getRight())
-                        .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
-                }, ReactorUtils.DEFAULT_CONCURRENCY)
-                .map(message -> message);
-        } else {
-            return fetchMessagePublisher
-                .map(messageBuilderAndBlobId -> 
messageBuilderAndBlobId.getLeft()
-                    .build());
-        }
+        Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher 
= fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt);
+        return messageRetriever.get(fetchType, fetchMessagePublisher);
     }
 
     private Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
fetchMessageWithoutFullContent(Mailbox mailbox, MessageRange messageRange, 
FetchType fetchType, int limitAsInt) {
@@ -173,14 +152,6 @@ public class PostgresMessageMapper implements 
MessageMapper {
             });
     }
 
-    private Mono<Content> retrieveFullContent(Record messageRecord) {
-        byte[] headerBytes = messageRecord.get(HEADER_CONTENT);
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
-                blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)),
-                SIZE_BASED))
-            .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, 
bodyBytes));
-    }
-
     @Override
     public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, 
MessageRange messageRange) {
         return retrieveMessagesMarkedForDeletionReactive(mailbox, messageRange)
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java
new file mode 100644
index 0000000000..b415b780f2
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java
@@ -0,0 +1,142 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.ATTACHMENT_METADATA;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.mailbox.model.AttachmentId;
+import org.apache.james.mailbox.model.AttachmentMetadata;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
+import org.apache.james.mailbox.model.MessageAttachmentMetadata;
+import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.ReactorUtils;
+import org.jooq.Record;
+
+import com.google.common.collect.ImmutableMap;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresMessageRetriever {
+
+    interface PartRetriever {
+
+        boolean isApplicable(MessageMapper.FetchType fetchType);
+
+        Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain);
+    }
+
+    class AttachmentPartRetriever implements PartRetriever {
+
+        @Override
+        public boolean isApplicable(MessageMapper.FetchType fetchType) {
+            return fetchType == MessageMapper.FetchType.FULL || fetchType == 
MessageMapper.FetchType.ATTACHMENTS_METADATA;
+        }
+
+        @Override
+        public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain) {
+            return chain.collectList()  // convert to list to avoid hanging 
the database connection with Jooq
+                .flatMapMany(list -> Flux.fromIterable(list)
+                    .flatMapSequential(pair -> Mono.fromCallable(() -> 
toMap(pair.getRight().get(ATTACHMENT_METADATA)))
+                        .flatMap(this::getAttachments)
+                        .map(messageAttachmentMetadata -> {
+                            
pair.getLeft().addAttachments(messageAttachmentMetadata);
+                            return pair;
+                        }).switchIfEmpty(Mono.just(pair))));
+        }
+
+        private Map<AttachmentId, 
MessageRepresentation.AttachmentRepresentation> toMap(AttachmentsDTO 
attachmentRepresentations) {
+            return 
attachmentRepresentations.stream().collect(ImmutableMap.toImmutableMap(MessageRepresentation.AttachmentRepresentation::getAttachmentId,
 obj -> obj));
+        }
+
+        private Mono<List<MessageAttachmentMetadata>> 
getAttachments(Map<AttachmentId, 
MessageRepresentation.AttachmentRepresentation> 
mapAttachmentIdToAttachmentRepresentation) {
+            return 
Mono.fromCallable(mapAttachmentIdToAttachmentRepresentation::keySet)
+                .flatMapMany(attachmentMapper::getAttachmentsReactive)
+                .map(attachmentMetadata -> 
constructMessageAttachment(attachmentMetadata, 
mapAttachmentIdToAttachmentRepresentation.get(attachmentMetadata.getAttachmentId())))
+                .collectList();
+        }
+
+        private MessageAttachmentMetadata 
constructMessageAttachment(AttachmentMetadata attachment, 
MessageRepresentation.AttachmentRepresentation messageAttachmentRepresentation) 
{
+            return MessageAttachmentMetadata.builder()
+                .attachment(attachment)
+                .name(messageAttachmentRepresentation.getName().orElse(null))
+                .cid(messageAttachmentRepresentation.getCid())
+                .isInline(messageAttachmentRepresentation.isInline())
+                .build();
+        }
+    }
+
+    class BlobContentPartRetriever implements PartRetriever {
+
+        @Override
+        public boolean isApplicable(MessageMapper.FetchType fetchType) {
+            return fetchType == MessageMapper.FetchType.FULL;
+        }
+
+        @Override
+        public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain) {
+            return chain
+                .flatMapSequential(pair -> retrieveFullContent(pair.getRight())
+                    .map(headerAndBodyContent -> 
Pair.of(pair.getLeft().content(headerAndBodyContent), pair.getRight())),
+                    ReactorUtils.DEFAULT_CONCURRENCY);
+        }
+
+        private Mono<Content> retrieveFullContent(Record messageRecord) {
+            return 
Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
+                    blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)),
+                    SIZE_BASED))
+                .map(bodyBytes -> new 
HeaderAndBodyByteContent(messageRecord.get(HEADER_CONTENT), bodyBytes));
+        }
+    }
+
+    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final PostgresAttachmentMapper attachmentMapper;
+    private final List<PartRetriever> partRetrievers = List.of(new 
AttachmentPartRetriever(), new BlobContentPartRetriever());
+
+    public PostgresMessageRetriever(BlobStore blobStore,
+                                    BlobId.Factory blobIdFactory,
+                                    PostgresAttachmentMapper attachmentMapper) 
{
+        this.blobStore = blobStore;
+        this.blobIdFactory = blobIdFactory;
+        this.attachmentMapper = attachmentMapper;
+    }
+
+    public Flux<MailboxMessage> get(MessageMapper.FetchType fetchType, 
Flux<Pair<SimpleMailboxMessage.Builder, Record>> initialFlux) {
+        return Flux.fromIterable(partRetrievers)
+            .filter(partRetriever -> partRetriever.isApplicable(fetchType))
+            .reduce(initialFlux, (flux, partRetriever) -> 
partRetriever.doRetrieve(flux))
+            .flatMapMany(flux -> flux)
+            .map(pair -> pair.getLeft().build());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to