This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 17b5e2bd2f9bafe619358d465efa9447327470ca
Author: hung phan <[email protected]>
AuthorDate: Fri Jan 12 17:49:21 2024 +0700

    JAMES-2586 Implement PostgresMessageIdMapper
---
 .../PostgresMailboxSessionMapperFactory.java       |  12 +-
 .../mail/MailboxDeleteDuringUpdateException.java   |  23 ++
 .../postgres/mail/PostgresMessageIdMapper.java     | 274 +++++++++++++++++++++
 .../postgres/mail/PostgresMessageMapper.java       |   7 +-
 .../mail/dao/PostgresMailboxMessageDAO.java        |  68 +++--
 .../postgres/mail/PostgresMapperProvider.java      |  39 ++-
 .../postgres/mail/PostgresMessageIdMapperTest.java |  45 ++++
 7 files changed, 440 insertions(+), 28 deletions(-)

diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java
index 7d78d275f4..7f54b43501 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java
@@ -29,11 +29,14 @@ import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.postgres.mail.PostgresAnnotationMapper;
 import org.apache.james.mailbox.postgres.mail.PostgresMailboxMapper;
+import org.apache.james.mailbox.postgres.mail.PostgresMessageIdMapper;
 import org.apache.james.mailbox.postgres.mail.PostgresMessageMapper;
 import org.apache.james.mailbox.postgres.mail.PostgresModSeqProvider;
 import org.apache.james.mailbox.postgres.mail.PostgresUidProvider;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxAnnotationDAO;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
 import org.apache.james.mailbox.postgres.user.PostgresSubscriptionDAO;
 import org.apache.james.mailbox.postgres.user.PostgresSubscriptionMapper;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
@@ -45,7 +48,6 @@ import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.user.SubscriptionMapper;
 
-
 public class PostgresMailboxSessionMapperFactory extends 
MailboxSessionMapperFactory implements AttachmentMapperFactory {
 
     private final PostgresExecutor.Factory executorFactory;
@@ -82,7 +84,13 @@ public class PostgresMailboxSessionMapperFactory extends 
MailboxSessionMapperFac
 
     @Override
     public MessageIdMapper createMessageIdMapper(MailboxSession session) {
-        throw new NotImplementedException("not implemented");
+        return new PostgresMessageIdMapper(new 
PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())),
+            new 
PostgresMessageDAO(executorFactory.create(session.getUser().getDomainPart()), 
blobIdFactory),
+            new 
PostgresMailboxMessageDAO(executorFactory.create(session.getUser().getDomainPart())),
+            getModSeqProvider(session),
+            blobStore,
+            blobIdFactory,
+            clock);
     }
 
     @Override
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java
new file mode 100644
index 0000000000..e738905441
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java
@@ -0,0 +1,23 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+public class MailboxDeleteDuringUpdateException extends Exception {
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
new file mode 100644
index 0000000000..b3233f8345
--- /dev/null
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java
@@ -0,0 +1,274 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Clock;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Function;
+
+import javax.mail.Flags;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.postgres.utils.PostgresUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.Content;
+import org.apache.james.mailbox.model.HeaderAndBodyByteContent;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
+import org.apache.james.mailbox.store.FlagsUpdateCalculator;
+import org.apache.james.mailbox.store.MailboxReactorUtils;
+import org.apache.james.mailbox.store.mail.MessageIdMapper;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.ReactorUtils;
+import org.jooq.Record;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PostgresMessageIdMapper implements MessageIdMapper {
+    private static final Function<MailboxMessage, ByteSource> 
MESSAGE_BODY_CONTENT_LOADER = (mailboxMessage) -> new ByteSource() {
+        @Override
+        public InputStream openStream() {
+            try {
+                return mailboxMessage.getBodyContent();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public long size() {
+            return mailboxMessage.getBodyOctets();
+        }
+    };
+
+    public static final int NUM_RETRIES = 5;
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(PostgresMessageIdMapper.class);
+
+    private final PostgresMailboxDAO mailboxDAO;
+    private final PostgresMessageDAO messageDAO;
+    private final PostgresMailboxMessageDAO mailboxMessageDAO;
+    private final PostgresModSeqProvider modSeqProvider;
+    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final Clock clock;
+
+    public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO, 
PostgresMessageDAO messageDAO,
+                                   PostgresMailboxMessageDAO 
mailboxMessageDAO, PostgresModSeqProvider modSeqProvider,
+                                   BlobStore blobStore, BlobId.Factory 
blobIdFactory,
+                                   Clock clock) {
+        this.mailboxDAO = mailboxDAO;
+        this.messageDAO = messageDAO;
+        this.mailboxMessageDAO = mailboxMessageDAO;
+        this.modSeqProvider = modSeqProvider;
+        this.blobStore = blobStore;
+        this.blobIdFactory = blobIdFactory;
+        this.clock = clock;
+    }
+
+    @Override
+    public List<MailboxMessage> find(Collection<MessageId> messageIds, 
MessageMapper.FetchType fetchType) {
+        return findReactive(messageIds, fetchType)
+            .collectList()
+            .block();
+    }
+
+    @Override
+    public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId 
messageId) {
+        return 
mailboxMessageDAO.findMetadataByMessageId(PostgresMessageId.class.cast(messageId));
+    }
+
+    @Override
+    public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, 
MessageMapper.FetchType fetchType) {
+        return 
mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
+                fetchType)
+            .flatMap(messageBuilderAndRecord -> {
+                SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
+                if (fetchType == MessageMapper.FetchType.FULL) {
+                    return 
retrieveFullContent(messageBuilderAndRecord.getRight())
+                        .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
+                }
+                return Mono.just(messageBuilder.build());
+            }, ReactorUtils.DEFAULT_CONCURRENCY);
+    }
+
+    @Override
+    public List<MailboxId> findMailboxes(MessageId messageId) {
+        return 
mailboxMessageDAO.findMailboxes(PostgresMessageId.class.cast(messageId))
+            .collect(ImmutableList.toImmutableList())
+            .block();
+    }
+
+    @Override
+    public void save(MailboxMessage mailboxMessage) throws MailboxException {
+        PostgresMailboxId mailboxId = 
PostgresMailboxId.class.cast(mailboxMessage.getMailboxId());
+        mailboxMessage.setSaveDate(Date.from(clock.instant()));
+        MailboxReactorUtils.block(mailboxDAO.findMailboxById(mailboxId)
+            .switchIfEmpty(Mono.error(() -> new 
MailboxNotFoundException(mailboxId)))
+            .then(saveBodyContent(mailboxMessage))
+            .flatMap(blobId -> messageDAO.insert(mailboxMessage, 
blobId.asString())
+                
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> 
Mono.empty()))
+            .then(mailboxMessageDAO.insert(mailboxMessage)));
+    }
+
+    @Override
+    public void copyInMailbox(MailboxMessage mailboxMessage, Mailbox mailbox) 
throws MailboxException {
+        MailboxReactorUtils.block(copyInMailboxReactive(mailboxMessage, 
mailbox));
+    }
+
+    @Override
+    public Mono<Void> copyInMailboxReactive(MailboxMessage mailboxMessage, 
Mailbox mailbox) {
+        mailboxMessage.setSaveDate(Date.from(clock.instant()));
+        PostgresMailboxId mailboxId = (PostgresMailboxId) 
mailbox.getMailboxId();
+        return mailboxMessageDAO.insert(mailboxMessage, mailboxId)
+            
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> 
Mono.empty());
+    }
+
+    @Override
+    public void delete(MessageId messageId) {
+        mailboxMessageDAO.deleteByMessageId((PostgresMessageId) 
messageId).block();
+    }
+
+    @Override
+    public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) {
+        mailboxMessageDAO.deleteByMessageIdAndMailboxIds((PostgresMessageId) 
messageId,
+            
mailboxIds.stream().map(PostgresMailboxId.class::cast).collect(ImmutableList.toImmutableList())).block();
+    }
+
+    @Override
+    public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId 
messageId, List<MailboxId> mailboxIds, Flags newState, 
MessageManager.FlagsUpdateMode updateMode) {
+        return Flux.fromIterable(mailboxIds)
+            .distinct()
+            .map(PostgresMailboxId.class::cast)
+            .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, 
mailboxId, messageId))
+            
.collect(ImmutableListMultimap.toImmutableListMultimap(Pair::getLeft, 
Pair::getRight));
+    }
+
+    private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags 
newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, 
MessageId messageId) {
+        return updateFlags(mailboxId, messageId, newState, updateMode)
+            .retry(NUM_RETRIES)
+            .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
+                LOGGER.info("Mailbox {} was deleted during flag update", 
mailboxId);
+                return Mono.empty();
+            })
+            .flatMapIterable(Function.identity())
+            .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
+    }
+
+    private Pair<MailboxId, UpdatedFlags> 
buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, 
Flags oldFlags) {
+        return 
Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(),
+            UpdatedFlags.builder()
+                
.uid(composedMessageIdWithMetaData.getComposedMessageId().getUid())
+                
.messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId())
+                .modSeq(composedMessageIdWithMetaData.getModSeq())
+                .oldFlags(oldFlags)
+                .newFlags(composedMessageIdWithMetaData.getFlags())
+                .build());
+    }
+
+    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> 
updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, 
MessageManager.FlagsUpdateMode updateMode) {
+        PostgresMailboxId postgresMailboxId = (PostgresMailboxId) mailboxId;
+        PostgresMessageId postgresMessageId = (PostgresMessageId) messageId;
+        return mailboxMessageDAO.findMetadataByMessageId(postgresMessageId, 
postgresMailboxId)
+            .flatMap(oldComposedId -> updateFlags(newState, updateMode, 
postgresMailboxId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
+            .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+            .collectList();
+    }
+
+    private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags 
newState, MessageManager.FlagsUpdateMode updateMode, PostgresMailboxId 
mailboxId, ComposedMessageIdWithMetaData oldComposedId) {
+        FlagsUpdateCalculator flagsUpdateCalculator = new 
FlagsUpdateCalculator(newState, updateMode);
+        Flags newFlags = 
flagsUpdateCalculator.buildNewFlags(oldComposedId.getFlags());
+        if (identicalFlags(oldComposedId, newFlags)) {
+            return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId));
+        } else {
+            return modSeqProvider.nextModSeqReactive(mailboxId)
+                .flatMap(newModSeq -> updateFlags(mailboxId, 
flagsUpdateCalculator, newModSeq, oldComposedId.getComposedMessageId().getUid())
+                    .map(flags -> Pair.of(oldComposedId.getFlags(), new 
ComposedMessageIdWithMetaData(
+                        oldComposedId.getComposedMessageId(),
+                        flags,
+                        newModSeq,
+                        oldComposedId.getThreadId()))));
+        }
+    }
+
+    private Mono<Flags> updateFlags(PostgresMailboxId mailboxId, 
FlagsUpdateCalculator flagsUpdateCalculator, ModSeq newModSeq, MessageUid uid) {
+
+        switch (flagsUpdateCalculator.getMode()) {
+            case ADD:
+                return mailboxMessageDAO.addFlags(mailboxId, uid, 
flagsUpdateCalculator.providedFlags(), newModSeq);
+            case REMOVE:
+                return mailboxMessageDAO.removeFlags(mailboxId, uid, 
flagsUpdateCalculator.providedFlags(), newModSeq);
+            case REPLACE:
+                return mailboxMessageDAO.replaceFlags(mailboxId, uid, 
flagsUpdateCalculator.providedFlags(), newModSeq);
+            default:
+                return Mono.error(() -> new RuntimeException("Unknown 
MessageRange type " + flagsUpdateCalculator.getMode()));
+        }
+    }
+
+    private boolean identicalFlags(ComposedMessageIdWithMetaData 
oldComposedId, Flags newFlags) {
+        return oldComposedId.getFlags().equals(newFlags);
+    }
+
+    private Mono<Content> retrieveFullContent(Record messageRecord) {
+        byte[] headerBytes = messageRecord.get(HEADER_CONTENT);
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(),
+                blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)),
+                SIZE_BASED))
+            .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, 
bodyBytes));
+    }
+
+    private Mono<BlobId> saveBodyContent(MailboxMessage message) {
+        return Mono.fromCallable(() -> 
MESSAGE_BODY_CONTENT_LOADER.apply(message))
+            .flatMap(bodyByteSource -> 
Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, 
LOW_COST)));
+    }
+}
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
index 6c45e89432..7d4385995e 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java
@@ -39,6 +39,7 @@ import javax.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.backends.postgres.utils.PostgresUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.mailbox.ApplicableFlagBuilder;
@@ -64,6 +65,7 @@ import org.apache.james.mailbox.store.MailboxReactorUtils;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Limit;
 import org.jooq.Record;
 
@@ -138,7 +140,7 @@ public class PostgresMessageMapper implements MessageMapper 
{
                     SimpleMailboxMessage.Builder messageBuilder = 
messageBuilderAndRecord.getLeft();
                     return 
retrieveFullContent(messageBuilderAndRecord.getRight())
                         .map(headerAndBodyContent -> 
messageBuilder.content(headerAndBodyContent).build());
-                })
+                }, ReactorUtils.DEFAULT_CONCURRENCY)
                 .sort(Comparator.comparing(MailboxMessage::getUid))
                 .map(message -> message);
         } else {
@@ -278,7 +280,8 @@ public class PostgresMessageMapper implements MessageMapper 
{
             })
             .flatMap(this::setNewUidAndModSeq)
             .then(saveBodyContent(message)
-                .flatMap(bodyBlobId -> messageDAO.insert(message, 
bodyBlobId.asString())))
+                .flatMap(bodyBlobId -> messageDAO.insert(message, 
bodyBlobId.asString())
+                    
.onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> 
Mono.empty())))
             .then(Mono.defer(() -> mailboxMessageDAO.insert(message)))
             .then(Mono.fromCallable(message::metaData));
     }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
index a267dfc3aa..61e48ea637 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
@@ -49,6 +49,7 @@ import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageD
 import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_METADATA_FUNCTION;
 import static 
org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_UID_FUNCTION;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
@@ -64,6 +65,7 @@ import org.apache.james.core.Domain;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
@@ -88,6 +90,7 @@ import org.jooq.UpdateSetStep;
 import org.jooq.impl.DSL;
 import org.jooq.util.postgres.PostgresDSL;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 import reactor.core.publisher.Flux;
@@ -235,6 +238,17 @@ public class PostgresMailboxMessageDAO {
             .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
     }
 
+    public Mono<Void> deleteByMessageIdAndMailboxIds(PostgresMessageId 
messageId, Collection<PostgresMailboxId> mailboxIds) {
+        return postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.deleteFrom(TABLE_NAME)
+            .where(MESSAGE_ID.eq(messageId.asUuid()))
+            
.and(MAILBOX_ID.in(mailboxIds.stream().map(PostgresMailboxId::asUuid).collect(ImmutableList.toImmutableList())))));
+    }
+
+    public Mono<Void> deleteByMessageId(PostgresMessageId messageId) {
+        return postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.deleteFrom(TABLE_NAME)
+            .where(MESSAGE_ID.eq(messageId.asUuid()))));
+    }
+
     public Mono<Integer> countTotalMessagesByMailboxId(PostgresMailboxId 
mailboxId) {
         return postgresExecutor.executeCount(dslContext -> 
Mono.from(dslContext.selectCount()
             .from(TABLE_NAME)
@@ -388,22 +402,6 @@ public class PostgresMailboxMessageDAO {
             .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
     }
 
-    public Flux<ComposedMessageIdWithMetaData> 
findMessagesMetadata(PostgresMailboxId mailboxId, List<MessageUid> messageUids) 
{
-        Function<List<MessageUid>, Flux<ComposedMessageIdWithMetaData>> 
queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext 
-> Flux.from(dslContext.select()
-                .from(TABLE_NAME)
-                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                
.and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new)))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
-            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
-
-        if (messageUids.size() <= IN_CLAUSE_MAX_SIZE) {
-            return queryPublisherFunction.apply(messageUids);
-        } else {
-            return Flux.fromIterable(Iterables.partition(messageUids, 
IN_CLAUSE_MAX_SIZE))
-                .flatMap(queryPublisherFunction);
-        }
-    }
-
     public Flux<ComposedMessageIdWithMetaData> 
findAllRecentMessageMetadata(PostgresMailboxId mailboxId) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
                 .from(TABLE_NAME)
@@ -527,8 +525,12 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Mono<Void> insert(MailboxMessage mailboxMessage) {
+        return insert(mailboxMessage, 
PostgresMailboxId.class.cast(mailboxMessage.getMailboxId()));
+    }
+
+    public Mono<Void> insert(MailboxMessage mailboxMessage, PostgresMailboxId 
mailboxId) {
         return postgresExecutor.executeVoid(dslContext -> 
Mono.from(dslContext.insertInto(TABLE_NAME)
-            .set(MAILBOX_ID, ((PostgresMailboxId) 
mailboxMessage.getMailboxId()).asUuid())
+            .set(MAILBOX_ID, mailboxId.asUuid())
             .set(MESSAGE_UID, mailboxMessage.getUid().asLong())
             .set(MOD_SEQ, mailboxMessage.getModSeq().asLong())
             .set(MESSAGE_ID, ((PostgresMessageId) 
mailboxMessage.getMessageId()).asUuid())
@@ -545,4 +547,36 @@ public class PostgresMailboxMessageDAO {
             .set(SAVE_DATE, 
mailboxMessage.getSaveDate().map(DATE_TO_LOCAL_DATE_TIME).orElse(null))));
     }
 
+    public Flux<MailboxId> findMailboxes(PostgresMessageId messageId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MAILBOX_ID)
+                .from(TABLE_NAME)
+                .where(MESSAGE_ID.eq(messageId.asUuid()))))
+            .map(record -> PostgresMailboxId.of(record.get(MAILBOX_ID)));
+    }
+
+    public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagesByMessageIds(Collection<PostgresMessageId> messageIds, 
MessageMapper.FetchType fetchType) {
+        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
+
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(fetchStrategy.fetchFields())
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(DSL.field(TABLE_NAME.getName() + "." + 
MESSAGE_ID.getName())
+                    
.in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList())))))
+            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record));
+    }
+
+    public Flux<ComposedMessageIdWithMetaData> 
findMetadataByMessageId(PostgresMessageId messageId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
+                .from(TABLE_NAME)
+                .where(MESSAGE_ID.eq(messageId.asUuid()))))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+    }
+
+    public Flux<ComposedMessageIdWithMetaData> 
findMetadataByMessageId(PostgresMessageId messageId, PostgresMailboxId 
mailboxId) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
+                .from(TABLE_NAME)
+                .where(MESSAGE_ID.eq(messageId.asUuid()))
+                .and(MAILBOX_ID.eq(mailboxId.asUuid()))))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+    }
+
 }
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
index c4705bf259..ebd3a51cf0 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java
@@ -31,20 +31,27 @@ import org.apache.james.blob.api.HashBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
 import org.apache.james.mailbox.postgres.PostgresMessageId;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO;
+import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
 import org.apache.james.mailbox.store.mail.AttachmentMapper;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.MapperProvider;
+import org.apache.james.mailbox.store.mail.model.MessageUidProvider;
 import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
+import org.testcontainers.utility.ThrowingFunction;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 
 public class PostgresMapperProvider implements MapperProvider {
@@ -54,6 +61,7 @@ public class PostgresMapperProvider implements MapperProvider 
{
     private final UpdatableTickingClock updatableTickingClock;
     private final BlobStore blobStore;
     private final BlobId.Factory blobIdFactory;
+    private final UidProvider messageUidProvider;
 
     public PostgresMapperProvider(PostgresExtension postgresExtension) {
         this.postgresExtension = postgresExtension;
@@ -61,11 +69,13 @@ public class PostgresMapperProvider implements 
MapperProvider {
         this.messageIdFactory = new PostgresMessageId.Factory();
         this.blobIdFactory = new HashBlobId.Factory();
         this.blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), 
BucketName.DEFAULT, blobIdFactory);
+        this.messageUidProvider = new PostgresUidProvider(new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor()));
     }
 
     @Override
     public List<Capabilities> getSupportedCapabilities() {
-        return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, 
Capabilities.MESSAGE, Capabilities.MOVE, Capabilities.ATTACHMENT, 
Capabilities.THREAD_SAFE_FLAGS_UPDATE);
+        return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, 
Capabilities.MESSAGE, Capabilities.MOVE,
+            Capabilities.ATTACHMENT, Capabilities.THREAD_SAFE_FLAGS_UPDATE, 
Capabilities.UNIQUE_MESSAGE_ID);
     }
 
     @Override
@@ -91,7 +101,12 @@ public class PostgresMapperProvider implements 
MapperProvider {
 
     @Override
     public MessageIdMapper createMessageIdMapper() {
-        throw new NotImplementedException("not implemented");
+        PostgresMailboxDAO mailboxDAO = new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor());
+        return new PostgresMessageIdMapper(mailboxDAO,
+            new PostgresMessageDAO(postgresExtension.getPostgresExecutor(), 
blobIdFactory),
+            new 
PostgresMailboxMessageDAO(postgresExtension.getPostgresExecutor()),
+            new PostgresModSeqProvider(mailboxDAO),
+            blobStore, blobIdFactory, updatableTickingClock);
     }
 
     @Override
@@ -105,18 +120,28 @@ public class PostgresMapperProvider implements 
MapperProvider {
     }
 
     @Override
-    public MessageUid generateMessageUid() {
-        throw new NotImplementedException("not implemented");
+    public MessageUid generateMessageUid(Mailbox mailbox) {
+        try {
+            return messageUidProvider.nextUid(mailbox);
+        } catch (MailboxException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public ModSeq generateModSeq(Mailbox mailbox) {
-        throw new NotImplementedException("not implemented");
+        try {
+            return new PostgresModSeqProvider(new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor()))
+                .nextModSeq(mailbox);
+        } catch (MailboxException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public ModSeq highestModSeq(Mailbox mailbox) {
-        throw new NotImplementedException("not implemented");
+        return new PostgresModSeqProvider(new 
PostgresMailboxDAO(postgresExtension.getPostgresExecutor()))
+            .highestModSeq(mailbox);
     }
 
     @Override
@@ -132,4 +157,4 @@ public class PostgresMapperProvider implements 
MapperProvider {
     public UpdatableTickingClock getUpdatableTickingClock() {
         return updatableTickingClock;
     }
-}
+}
\ No newline at end of file
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java
new file mode 100644
index 0000000000..873e7b6633
--- /dev/null
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java
@@ -0,0 +1,45 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule;
+import org.apache.james.mailbox.store.mail.model.MapperProvider;
+import org.apache.james.mailbox.store.mail.model.MessageIdMapperTest;
+import org.apache.james.utils.UpdatableTickingClock;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class PostgresMessageIdMapperTest extends MessageIdMapperTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
+
+    private PostgresMapperProvider postgresMapperProvider;
+
+    @Override
+    protected MapperProvider provideMapper() {
+        postgresMapperProvider = new PostgresMapperProvider(postgresExtension);
+        return postgresMapperProvider;
+    }
+
+    @Override
+    protected UpdatableTickingClock updatableTickingClock() {
+        return postgresMapperProvider.getUpdatableTickingClock();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to