This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 17b5e2bd2f9bafe619358d465efa9447327470ca Author: hung phan <[email protected]> AuthorDate: Fri Jan 12 17:49:21 2024 +0700 JAMES-2586 Implement PostgresMessageIdMapper --- .../PostgresMailboxSessionMapperFactory.java | 12 +- .../mail/MailboxDeleteDuringUpdateException.java | 23 ++ .../postgres/mail/PostgresMessageIdMapper.java | 274 +++++++++++++++++++++ .../postgres/mail/PostgresMessageMapper.java | 7 +- .../mail/dao/PostgresMailboxMessageDAO.java | 68 +++-- .../postgres/mail/PostgresMapperProvider.java | 39 ++- .../postgres/mail/PostgresMessageIdMapperTest.java | 45 ++++ 7 files changed, 440 insertions(+), 28 deletions(-) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java index 7d78d275f4..7f54b43501 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java @@ -29,11 +29,14 @@ import org.apache.james.blob.api.BlobStore; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.postgres.mail.PostgresAnnotationMapper; import org.apache.james.mailbox.postgres.mail.PostgresMailboxMapper; +import org.apache.james.mailbox.postgres.mail.PostgresMessageIdMapper; import org.apache.james.mailbox.postgres.mail.PostgresMessageMapper; import org.apache.james.mailbox.postgres.mail.PostgresModSeqProvider; import org.apache.james.mailbox.postgres.mail.PostgresUidProvider; import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxAnnotationDAO; import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO; import org.apache.james.mailbox.postgres.user.PostgresSubscriptionDAO; import org.apache.james.mailbox.postgres.user.PostgresSubscriptionMapper; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; @@ -45,7 +48,6 @@ import org.apache.james.mailbox.store.mail.MessageIdMapper; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.user.SubscriptionMapper; - public class PostgresMailboxSessionMapperFactory extends MailboxSessionMapperFactory implements AttachmentMapperFactory { private final PostgresExecutor.Factory executorFactory; @@ -82,7 +84,13 @@ public class PostgresMailboxSessionMapperFactory extends MailboxSessionMapperFac @Override public MessageIdMapper createMessageIdMapper(MailboxSession session) { - throw new NotImplementedException("not implemented"); + return new PostgresMessageIdMapper(new PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())), + new PostgresMessageDAO(executorFactory.create(session.getUser().getDomainPart()), blobIdFactory), + new PostgresMailboxMessageDAO(executorFactory.create(session.getUser().getDomainPart())), + getModSeqProvider(session), + blobStore, + blobIdFactory, + clock); } @Override diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java new file mode 100644 index 0000000000..e738905441 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/MailboxDeleteDuringUpdateException.java @@ -0,0 +1,23 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +public class MailboxDeleteDuringUpdateException extends Exception { +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java new file mode 100644 index 0000000000..b3233f8345 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java @@ -0,0 +1,274 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; + +import java.io.IOException; +import java.io.InputStream; +import java.time.Clock; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.function.Function; + +import javax.mail.Flags; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.postgres.utils.PostgresUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.mailbox.MessageManager; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.exception.MailboxNotFoundException; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.Content; +import org.apache.james.mailbox.model.HeaderAndBodyByteContent; +import org.apache.james.mailbox.model.Mailbox; +import org.apache.james.mailbox.model.MailboxId; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.UpdatedFlags; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO; +import org.apache.james.mailbox.store.FlagsUpdateCalculator; +import org.apache.james.mailbox.store.MailboxReactorUtils; +import org.apache.james.mailbox.store.mail.MessageIdMapper; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.ReactorUtils; +import org.jooq.Record; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresMessageIdMapper implements MessageIdMapper { + private static final Function<MailboxMessage, ByteSource> MESSAGE_BODY_CONTENT_LOADER = (mailboxMessage) -> new ByteSource() { + @Override + public InputStream openStream() { + try { + return mailboxMessage.getBodyContent(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public long size() { + return mailboxMessage.getBodyOctets(); + } + }; + + public static final int NUM_RETRIES = 5; + public static final Logger LOGGER = LoggerFactory.getLogger(PostgresMessageIdMapper.class); + + private final PostgresMailboxDAO mailboxDAO; + private final PostgresMessageDAO messageDAO; + private final PostgresMailboxMessageDAO mailboxMessageDAO; + private final PostgresModSeqProvider modSeqProvider; + private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + private final Clock clock; + + public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO, PostgresMessageDAO messageDAO, + PostgresMailboxMessageDAO mailboxMessageDAO, PostgresModSeqProvider modSeqProvider, + BlobStore blobStore, BlobId.Factory blobIdFactory, + Clock clock) { + this.mailboxDAO = mailboxDAO; + this.messageDAO = messageDAO; + this.mailboxMessageDAO = mailboxMessageDAO; + this.modSeqProvider = modSeqProvider; + this.blobStore = blobStore; + this.blobIdFactory = blobIdFactory; + this.clock = clock; + } + + @Override + public List<MailboxMessage> find(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { + return findReactive(messageIds, fetchType) + .collectList() + .block(); + } + + @Override + public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) { + return mailboxMessageDAO.findMetadataByMessageId(PostgresMessageId.class.cast(messageId)); + } + + @Override + public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { + return mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), + fetchType) + .flatMap(messageBuilderAndRecord -> { + SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); + if (fetchType == MessageMapper.FetchType.FULL) { + return retrieveFullContent(messageBuilderAndRecord.getRight()) + .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); + } + return Mono.just(messageBuilder.build()); + }, ReactorUtils.DEFAULT_CONCURRENCY); + } + + @Override + public List<MailboxId> findMailboxes(MessageId messageId) { + return mailboxMessageDAO.findMailboxes(PostgresMessageId.class.cast(messageId)) + .collect(ImmutableList.toImmutableList()) + .block(); + } + + @Override + public void save(MailboxMessage mailboxMessage) throws MailboxException { + PostgresMailboxId mailboxId = PostgresMailboxId.class.cast(mailboxMessage.getMailboxId()); + mailboxMessage.setSaveDate(Date.from(clock.instant())); + MailboxReactorUtils.block(mailboxDAO.findMailboxById(mailboxId) + .switchIfEmpty(Mono.error(() -> new MailboxNotFoundException(mailboxId))) + .then(saveBodyContent(mailboxMessage)) + .flatMap(blobId -> messageDAO.insert(mailboxMessage, blobId.asString()) + .onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> Mono.empty())) + .then(mailboxMessageDAO.insert(mailboxMessage))); + } + + @Override + public void copyInMailbox(MailboxMessage mailboxMessage, Mailbox mailbox) throws MailboxException { + MailboxReactorUtils.block(copyInMailboxReactive(mailboxMessage, mailbox)); + } + + @Override + public Mono<Void> copyInMailboxReactive(MailboxMessage mailboxMessage, Mailbox mailbox) { + mailboxMessage.setSaveDate(Date.from(clock.instant())); + PostgresMailboxId mailboxId = (PostgresMailboxId) mailbox.getMailboxId(); + return mailboxMessageDAO.insert(mailboxMessage, mailboxId) + .onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> Mono.empty()); + } + + @Override + public void delete(MessageId messageId) { + mailboxMessageDAO.deleteByMessageId((PostgresMessageId) messageId).block(); + } + + @Override + public void delete(MessageId messageId, Collection<MailboxId> mailboxIds) { + mailboxMessageDAO.deleteByMessageIdAndMailboxIds((PostgresMessageId) messageId, + mailboxIds.stream().map(PostgresMailboxId.class::cast).collect(ImmutableList.toImmutableList())).block(); + } + + @Override + public Mono<Multimap<MailboxId, UpdatedFlags>> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) { + return Flux.fromIterable(mailboxIds) + .distinct() + .map(PostgresMailboxId.class::cast) + .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId)) + .collect(ImmutableListMultimap.toImmutableListMultimap(Pair::getLeft, Pair::getRight)); + } + + private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) { + return updateFlags(mailboxId, messageId, newState, updateMode) + .retry(NUM_RETRIES) + .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> { + LOGGER.info("Mailbox {} was deleted during flag update", mailboxId); + return Mono.empty(); + }) + .flatMapIterable(Function.identity()) + .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); + } + + private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { + return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), + UpdatedFlags.builder() + .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) + .messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()) + .modSeq(composedMessageIdWithMetaData.getModSeq()) + .oldFlags(oldFlags) + .newFlags(composedMessageIdWithMetaData.getFlags()) + .build()); + } + + private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) { + PostgresMailboxId postgresMailboxId = (PostgresMailboxId) mailboxId; + PostgresMessageId postgresMessageId = (PostgresMessageId) messageId; + return mailboxMessageDAO.findMetadataByMessageId(postgresMessageId, postgresMailboxId) + .flatMap(oldComposedId -> updateFlags(newState, updateMode, postgresMailboxId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY) + .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) + .collectList(); + } + + private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, PostgresMailboxId mailboxId, ComposedMessageIdWithMetaData oldComposedId) { + FlagsUpdateCalculator flagsUpdateCalculator = new FlagsUpdateCalculator(newState, updateMode); + Flags newFlags = flagsUpdateCalculator.buildNewFlags(oldComposedId.getFlags()); + if (identicalFlags(oldComposedId, newFlags)) { + return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId)); + } else { + return modSeqProvider.nextModSeqReactive(mailboxId) + .flatMap(newModSeq -> updateFlags(mailboxId, flagsUpdateCalculator, newModSeq, oldComposedId.getComposedMessageId().getUid()) + .map(flags -> Pair.of(oldComposedId.getFlags(), new ComposedMessageIdWithMetaData( + oldComposedId.getComposedMessageId(), + flags, + newModSeq, + oldComposedId.getThreadId())))); + } + } + + private Mono<Flags> updateFlags(PostgresMailboxId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, ModSeq newModSeq, MessageUid uid) { + + switch (flagsUpdateCalculator.getMode()) { + case ADD: + return mailboxMessageDAO.addFlags(mailboxId, uid, flagsUpdateCalculator.providedFlags(), newModSeq); + case REMOVE: + return mailboxMessageDAO.removeFlags(mailboxId, uid, flagsUpdateCalculator.providedFlags(), newModSeq); + case REPLACE: + return mailboxMessageDAO.replaceFlags(mailboxId, uid, flagsUpdateCalculator.providedFlags(), newModSeq); + default: + return Mono.error(() -> new RuntimeException("Unknown MessageRange type " + flagsUpdateCalculator.getMode())); + } + } + + private boolean identicalFlags(ComposedMessageIdWithMetaData oldComposedId, Flags newFlags) { + return oldComposedId.getFlags().equals(newFlags); + } + + private Mono<Content> retrieveFullContent(Record messageRecord) { + byte[] headerBytes = messageRecord.get(HEADER_CONTENT); + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), + blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)), + SIZE_BASED)) + .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, bodyBytes)); + } + + private Mono<BlobId> saveBodyContent(MailboxMessage message) { + return Mono.fromCallable(() -> MESSAGE_BODY_CONTENT_LOADER.apply(message)) + .flatMap(bodyByteSource -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST))); + } +} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java index 6c45e89432..7d4385995e 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java @@ -39,6 +39,7 @@ import javax.mail.Flags; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.backends.postgres.utils.PostgresUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.mailbox.ApplicableFlagBuilder; @@ -64,6 +65,7 @@ import org.apache.james.mailbox.store.MailboxReactorUtils; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.Limit; import org.jooq.Record; @@ -138,7 +140,7 @@ public class PostgresMessageMapper implements MessageMapper { SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); return retrieveFullContent(messageBuilderAndRecord.getRight()) .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); - }) + }, ReactorUtils.DEFAULT_CONCURRENCY) .sort(Comparator.comparing(MailboxMessage::getUid)) .map(message -> message); } else { @@ -278,7 +280,8 @@ public class PostgresMessageMapper implements MessageMapper { }) .flatMap(this::setNewUidAndModSeq) .then(saveBodyContent(message) - .flatMap(bodyBlobId -> messageDAO.insert(message, bodyBlobId.asString()))) + .flatMap(bodyBlobId -> messageDAO.insert(message, bodyBlobId.asString()) + .onErrorResume(PostgresUtils.UNIQUE_CONSTRAINT_VIOLATION_PREDICATE, e -> Mono.empty()))) .then(Mono.defer(() -> mailboxMessageDAO.insert(message))) .then(Mono.fromCallable(message::metaData)); } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java index a267dfc3aa..61e48ea637 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java @@ -49,6 +49,7 @@ import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageD import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_METADATA_FUNCTION; import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_UID_FUNCTION; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -64,6 +65,7 @@ import org.apache.james.core.Domain; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.postgres.PostgresMailboxId; @@ -88,6 +90,7 @@ import org.jooq.UpdateSetStep; import org.jooq.impl.DSL; import org.jooq.util.postgres.PostgresDSL; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import reactor.core.publisher.Flux; @@ -235,6 +238,17 @@ public class PostgresMailboxMessageDAO { .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); } + public Mono<Void> deleteByMessageIdAndMailboxIds(PostgresMessageId messageId, Collection<PostgresMailboxId> mailboxIds) { + return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())) + .and(MAILBOX_ID.in(mailboxIds.stream().map(PostgresMailboxId::asUuid).collect(ImmutableList.toImmutableList()))))); + } + + public Mono<Void> deleteByMessageId(PostgresMessageId messageId) { + return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.deleteFrom(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())))); + } + public Mono<Integer> countTotalMessagesByMailboxId(PostgresMailboxId mailboxId) { return postgresExecutor.executeCount(dslContext -> Mono.from(dslContext.selectCount() .from(TABLE_NAME) @@ -388,22 +402,6 @@ public class PostgresMailboxMessageDAO { .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); } - public Flux<ComposedMessageIdWithMetaData> findMessagesMetadata(PostgresMailboxId mailboxId, List<MessageUid> messageUids) { - Function<List<MessageUid>, Flux<ComposedMessageIdWithMetaData>> queryPublisherFunction = uidsToFetch -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new))) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); - - if (messageUids.size() <= IN_CLAUSE_MAX_SIZE) { - return queryPublisherFunction.apply(messageUids); - } else { - return Flux.fromIterable(Iterables.partition(messageUids, IN_CLAUSE_MAX_SIZE)) - .flatMap(queryPublisherFunction); - } - } - public Flux<ComposedMessageIdWithMetaData> findAllRecentMessageMetadata(PostgresMailboxId mailboxId) { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() .from(TABLE_NAME) @@ -527,8 +525,12 @@ public class PostgresMailboxMessageDAO { } public Mono<Void> insert(MailboxMessage mailboxMessage) { + return insert(mailboxMessage, PostgresMailboxId.class.cast(mailboxMessage.getMailboxId())); + } + + public Mono<Void> insert(MailboxMessage mailboxMessage, PostgresMailboxId mailboxId) { return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.insertInto(TABLE_NAME) - .set(MAILBOX_ID, ((PostgresMailboxId) mailboxMessage.getMailboxId()).asUuid()) + .set(MAILBOX_ID, mailboxId.asUuid()) .set(MESSAGE_UID, mailboxMessage.getUid().asLong()) .set(MOD_SEQ, mailboxMessage.getModSeq().asLong()) .set(MESSAGE_ID, ((PostgresMessageId) mailboxMessage.getMessageId()).asUuid()) @@ -545,4 +547,36 @@ public class PostgresMailboxMessageDAO { .set(SAVE_DATE, mailboxMessage.getSaveDate().map(DATE_TO_LOCAL_DATE_TIME).orElse(null)))); } + public Flux<MailboxId> findMailboxes(PostgresMessageId messageId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MAILBOX_ID) + .from(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())))) + .map(record -> PostgresMailboxId.of(record.get(MAILBOX_ID))); + } + + public Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagesByMessageIds(Collection<PostgresMessageId> messageIds, MessageMapper.FetchType fetchType) { + PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); + + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(fetchStrategy.fetchFields()) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(DSL.field(TABLE_NAME.getName() + "." + MESSAGE_ID.getName()) + .in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList()))))) + .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)); + } + + public Flux<ComposedMessageIdWithMetaData> findMetadataByMessageId(PostgresMessageId messageId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + } + + public Flux<ComposedMessageIdWithMetaData> findMetadataByMessageId(PostgresMessageId messageId, PostgresMailboxId mailboxId) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() + .from(TABLE_NAME) + .where(MESSAGE_ID.eq(messageId.asUuid())) + .and(MAILBOX_ID.eq(mailboxId.asUuid())))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + } + } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java index c4705bf259..ebd3a51cf0 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMapperProvider.java @@ -31,20 +31,27 @@ import org.apache.james.blob.api.HashBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.postgres.PostgresMailboxId; import org.apache.james.mailbox.postgres.PostgresMessageId; import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO; +import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO; import org.apache.james.mailbox.store.mail.AttachmentMapper; import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.MessageIdMapper; import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.UidProvider; import org.apache.james.mailbox.store.mail.model.MapperProvider; +import org.apache.james.mailbox.store.mail.model.MessageUidProvider; import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.apache.james.utils.UpdatableTickingClock; +import org.testcontainers.utility.ThrowingFunction; +import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; public class PostgresMapperProvider implements MapperProvider { @@ -54,6 +61,7 @@ public class PostgresMapperProvider implements MapperProvider { private final UpdatableTickingClock updatableTickingClock; private final BlobStore blobStore; private final BlobId.Factory blobIdFactory; + private final UidProvider messageUidProvider; public PostgresMapperProvider(PostgresExtension postgresExtension) { this.postgresExtension = postgresExtension; @@ -61,11 +69,13 @@ public class PostgresMapperProvider implements MapperProvider { this.messageIdFactory = new PostgresMessageId.Factory(); this.blobIdFactory = new HashBlobId.Factory(); this.blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory); + this.messageUidProvider = new PostgresUidProvider(new PostgresMailboxDAO(postgresExtension.getPostgresExecutor())); } @Override public List<Capabilities> getSupportedCapabilities() { - return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, Capabilities.MESSAGE, Capabilities.MOVE, Capabilities.ATTACHMENT, Capabilities.THREAD_SAFE_FLAGS_UPDATE); + return ImmutableList.of(Capabilities.ANNOTATION, Capabilities.MAILBOX, Capabilities.MESSAGE, Capabilities.MOVE, + Capabilities.ATTACHMENT, Capabilities.THREAD_SAFE_FLAGS_UPDATE, Capabilities.UNIQUE_MESSAGE_ID); } @Override @@ -91,7 +101,12 @@ public class PostgresMapperProvider implements MapperProvider { @Override public MessageIdMapper createMessageIdMapper() { - throw new NotImplementedException("not implemented"); + PostgresMailboxDAO mailboxDAO = new PostgresMailboxDAO(postgresExtension.getPostgresExecutor()); + return new PostgresMessageIdMapper(mailboxDAO, + new PostgresMessageDAO(postgresExtension.getPostgresExecutor(), blobIdFactory), + new PostgresMailboxMessageDAO(postgresExtension.getPostgresExecutor()), + new PostgresModSeqProvider(mailboxDAO), + blobStore, blobIdFactory, updatableTickingClock); } @Override @@ -105,18 +120,28 @@ public class PostgresMapperProvider implements MapperProvider { } @Override - public MessageUid generateMessageUid() { - throw new NotImplementedException("not implemented"); + public MessageUid generateMessageUid(Mailbox mailbox) { + try { + return messageUidProvider.nextUid(mailbox); + } catch (MailboxException e) { + throw new RuntimeException(e); + } } @Override public ModSeq generateModSeq(Mailbox mailbox) { - throw new NotImplementedException("not implemented"); + try { + return new PostgresModSeqProvider(new PostgresMailboxDAO(postgresExtension.getPostgresExecutor())) + .nextModSeq(mailbox); + } catch (MailboxException e) { + throw new RuntimeException(e); + } } @Override public ModSeq highestModSeq(Mailbox mailbox) { - throw new NotImplementedException("not implemented"); + return new PostgresModSeqProvider(new PostgresMailboxDAO(postgresExtension.getPostgresExecutor())) + .highestModSeq(mailbox); } @Override @@ -132,4 +157,4 @@ public class PostgresMapperProvider implements MapperProvider { public UpdatableTickingClock getUpdatableTickingClock() { return updatableTickingClock; } -} +} \ No newline at end of file diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java new file mode 100644 index 0000000000..873e7b6633 --- /dev/null +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapperTest.java @@ -0,0 +1,45 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.mailbox.postgres.PostgresMailboxAggregateModule; +import org.apache.james.mailbox.store.mail.model.MapperProvider; +import org.apache.james.mailbox.store.mail.model.MessageIdMapperTest; +import org.apache.james.utils.UpdatableTickingClock; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class PostgresMessageIdMapperTest extends MessageIdMapperTest { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE); + + private PostgresMapperProvider postgresMapperProvider; + + @Override + protected MapperProvider provideMapper() { + postgresMapperProvider = new PostgresMapperProvider(postgresExtension); + return postgresMapperProvider; + } + + @Override + protected UpdatableTickingClock updatableTickingClock() { + return postgresMapperProvider.getUpdatableTickingClock(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
