This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f8b5e8c0a27377646c7fd89e8521160243acad60 Author: Tung Tran <[email protected]> AuthorDate: Mon Mar 18 20:48:06 2024 +0700 JAMES-2586 [REFACTORING] - Extract dedicated class for retrieving Postgres Message in order to remove duplicated code: - Introduce PostgresMessageRetriever (= AttachmentLoader + Retrieve byte message content) --- .../mailbox/postgres/mail/AttachmentLoader.java | 84 ------------ .../postgres/mail/PostgresMessageIdMapper.java | 38 +----- .../postgres/mail/PostgresMessageMapper.java | 39 +----- .../postgres/mail/PostgresMessageRetriever.java | 142 +++++++++++++++++++++ 4 files changed, 151 insertions(+), 152 deletions(-) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java deleted file mode 100644 index 4927867ce4..0000000000 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/AttachmentLoader.java +++ /dev/null @@ -1,84 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.mailbox.postgres.mail; - -import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.ATTACHMENT_METADATA; - -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.mailbox.model.AttachmentId; -import org.apache.james.mailbox.model.AttachmentMetadata; -import org.apache.james.mailbox.model.MessageAttachmentMetadata; -import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO; -import org.apache.james.mailbox.store.mail.MessageMapper; -import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.jooq.Record; - -import com.google.common.collect.ImmutableMap; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class AttachmentLoader { - - private final PostgresAttachmentMapper attachmentMapper; - - public AttachmentLoader(PostgresAttachmentMapper attachmentMapper) { - this.attachmentMapper = attachmentMapper; - } - - - public Flux<Pair<SimpleMailboxMessage.Builder, Record>> addAttachmentToMessage(Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagePublisher, MessageMapper.FetchType fetchType) { - if (fetchType != MessageMapper.FetchType.FULL && fetchType != MessageMapper.FetchType.ATTACHMENTS_METADATA) { - return findMessagePublisher; - } - - return findMessagePublisher.collectList() // convert to list to avoid hanging the database connection with Jooq - .flatMapMany(list -> Flux.fromIterable(list) - .flatMapSequential(pair -> Mono.fromCallable(() -> toMap(pair.getRight().get(ATTACHMENT_METADATA))) - .flatMap(this::getAttachments) - .map(messageAttachmentMetadata -> { - pair.getLeft().addAttachments(messageAttachmentMetadata); - return pair; - }).switchIfEmpty(Mono.just(pair)))); - } - - private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> toMap(AttachmentsDTO attachmentRepresentations) { - return attachmentRepresentations.stream().collect(ImmutableMap.toImmutableMap(MessageRepresentation.AttachmentRepresentation::getAttachmentId, obj -> obj)); - } - - private Mono<List<MessageAttachmentMetadata>> getAttachments(Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> mapAttachmentIdToAttachmentRepresentation) { - return Mono.fromCallable(mapAttachmentIdToAttachmentRepresentation::keySet) - .flatMapMany(attachmentMapper::getAttachmentsReactive) - .map(attachmentMetadata -> constructMessageAttachment(attachmentMetadata, mapAttachmentIdToAttachmentRepresentation.get(attachmentMetadata.getAttachmentId()))) - .collectList(); - } - - private MessageAttachmentMetadata constructMessageAttachment(AttachmentMetadata attachment, MessageRepresentation.AttachmentRepresentation messageAttachmentRepresentation) { - return MessageAttachmentMetadata.builder() - .attachment(attachment) - .name(messageAttachmentRepresentation.getName().orElse(null)) - .cid(messageAttachmentRepresentation.getCid()) - .isInline(messageAttachmentRepresentation.isInline()) - .build(); - } -} diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java index 01b7304eda..961b51fb53 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java @@ -20,9 +20,6 @@ package org.apache.james.mailbox.postgres.mail; import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; -import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; -import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID; -import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; import java.io.IOException; import java.io.InputStream; @@ -44,8 +41,6 @@ import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.exception.MailboxNotFoundException; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; -import org.apache.james.mailbox.model.Content; -import org.apache.james.mailbox.model.HeaderAndBodyByteContent; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; @@ -100,9 +95,8 @@ public class PostgresMessageIdMapper implements MessageIdMapper { private final PostgresMailboxMessageDAO mailboxMessageDAO; private final PostgresModSeqProvider modSeqProvider; private final BlobStore blobStore; - private final BlobId.Factory blobIdFactory; private final Clock clock; - private final AttachmentLoader attachmentLoader; + private final PostgresMessageRetriever messageRetriever; public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO, PostgresMessageDAO messageDAO, @@ -117,9 +111,8 @@ public class PostgresMessageIdMapper implements MessageIdMapper { this.mailboxMessageDAO = mailboxMessageDAO; this.modSeqProvider = modSeqProvider; this.blobStore = blobStore; - this.blobIdFactory = blobIdFactory; this.clock = clock; - this.attachmentLoader = new AttachmentLoader(attachmentMapper);; + this.messageRetriever = new PostgresMessageRetriever(blobStore, blobIdFactory, attachmentMapper); } @Override @@ -136,23 +129,8 @@ public class PostgresMessageIdMapper implements MessageIdMapper { @Override public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = - mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), fetchType) - .transform(pairFlux -> attachmentLoader.addAttachmentToMessage(pairFlux, fetchType)); - - if (fetchType == MessageMapper.FetchType.FULL) { - return fetchMessagePublisher - .flatMapSequential(messageBuilderAndRecord -> { - SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); - return retrieveFullContent(messageBuilderAndRecord.getRight()) - .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); - }, ReactorUtils.DEFAULT_CONCURRENCY) - .map(message -> message); - } else { - return fetchMessagePublisher - .map(messageBuilderAndBlobId -> messageBuilderAndBlobId.getLeft() - .build()); - } + Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), fetchType); + return messageRetriever.get(fetchType, fetchMessagePublisher); } @Override @@ -272,14 +250,6 @@ public class PostgresMessageIdMapper implements MessageIdMapper { return oldComposedId.getFlags().equals(newFlags); } - private Mono<Content> retrieveFullContent(Record messageRecord) { - byte[] headerBytes = messageRecord.get(HEADER_CONTENT); - return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), - blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)), - SIZE_BASED)) - .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, bodyBytes)); - } - private Mono<BlobId> saveBodyContent(MailboxMessage message) { return Mono.fromCallable(() -> MESSAGE_BODY_CONTENT_LOADER.apply(message)) .flatMap(bodyByteSource -> Mono.from(blobStore.save(blobStore.getDefaultBucketName(), bodyByteSource, LOW_COST))); diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java index ef9bb0afe9..ff00ee4e2f 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java @@ -20,9 +20,6 @@ package org.apache.james.mailbox.postgres.mail; import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; -import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; -import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID; -import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; import java.io.IOException; import java.io.InputStream; @@ -48,8 +45,6 @@ import org.apache.james.mailbox.ModSeq; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; -import org.apache.james.mailbox.model.Content; -import org.apache.james.mailbox.model.HeaderAndBodyByteContent; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxCounters; import org.apache.james.mailbox.model.MessageMetaData; @@ -65,7 +60,6 @@ import org.apache.james.mailbox.store.MailboxReactorUtils; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.Limit; import org.jooq.Record; @@ -100,8 +94,7 @@ public class PostgresMessageMapper implements MessageMapper { private final PostgresUidProvider uidProvider; private final BlobStore blobStore; private final Clock clock; - private final BlobId.Factory blobIdFactory; - private final AttachmentLoader attachmentLoader; + private final PostgresMessageRetriever messageRetriever; public PostgresMessageMapper(PostgresExecutor postgresExecutor, PostgresModSeqProvider modSeqProvider, @@ -116,8 +109,8 @@ public class PostgresMessageMapper implements MessageMapper { this.uidProvider = uidProvider; this.blobStore = blobStore; this.clock = clock; - this.blobIdFactory = blobIdFactory; - this.attachmentLoader = new AttachmentLoader(new PostgresAttachmentMapper(new PostgresAttachmentDAO(postgresExecutor, blobIdFactory), blobStore)); + PostgresAttachmentMapper attachmentMapper = new PostgresAttachmentMapper(new PostgresAttachmentDAO(postgresExecutor, blobIdFactory), blobStore); + this.messageRetriever = new PostgresMessageRetriever(blobStore, blobIdFactory, attachmentMapper); } @@ -135,22 +128,8 @@ public class PostgresMessageMapper implements MessageMapper { @Override public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType fetchType, int limitAsInt) { - Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt) - .transform(pairFlux -> attachmentLoader.addAttachmentToMessage(pairFlux, fetchType)); - - if (fetchType == FetchType.FULL) { - return fetchMessagePublisher - .flatMapSequential(messageBuilderAndRecord -> { - SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft(); - return retrieveFullContent(messageBuilderAndRecord.getRight()) - .map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build()); - }, ReactorUtils.DEFAULT_CONCURRENCY) - .map(message -> message); - } else { - return fetchMessagePublisher - .map(messageBuilderAndBlobId -> messageBuilderAndBlobId.getLeft() - .build()); - } + Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = fetchMessageWithoutFullContent(mailbox, messageRange, fetchType, limitAsInt); + return messageRetriever.get(fetchType, fetchMessagePublisher); } private Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessageWithoutFullContent(Mailbox mailbox, MessageRange messageRange, FetchType fetchType, int limitAsInt) { @@ -173,14 +152,6 @@ public class PostgresMessageMapper implements MessageMapper { }); } - private Mono<Content> retrieveFullContent(Record messageRecord) { - byte[] headerBytes = messageRecord.get(HEADER_CONTENT); - return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), - blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)), - SIZE_BASED)) - .map(bodyBytes -> new HeaderAndBodyByteContent(headerBytes, bodyBytes)); - } - @Override public List<MessageUid> retrieveMessagesMarkedForDeletion(Mailbox mailbox, MessageRange messageRange) { return retrieveMessagesMarkedForDeletionReactive(mailbox, messageRange) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java new file mode 100644 index 0000000000..b415b780f2 --- /dev/null +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageRetriever.java @@ -0,0 +1,142 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.ATTACHMENT_METADATA; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.BODY_BLOB_ID; +import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.HEADER_CONTENT; + +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.mailbox.model.AttachmentId; +import org.apache.james.mailbox.model.AttachmentMetadata; +import org.apache.james.mailbox.model.Content; +import org.apache.james.mailbox.model.HeaderAndBodyByteContent; +import org.apache.james.mailbox.model.MessageAttachmentMetadata; +import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.ReactorUtils; +import org.jooq.Record; + +import com.google.common.collect.ImmutableMap; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class PostgresMessageRetriever { + + interface PartRetriever { + + boolean isApplicable(MessageMapper.FetchType fetchType); + + Flux<Pair<SimpleMailboxMessage.Builder, Record>> doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain); + } + + class AttachmentPartRetriever implements PartRetriever { + + @Override + public boolean isApplicable(MessageMapper.FetchType fetchType) { + return fetchType == MessageMapper.FetchType.FULL || fetchType == MessageMapper.FetchType.ATTACHMENTS_METADATA; + } + + @Override + public Flux<Pair<SimpleMailboxMessage.Builder, Record>> doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain) { + return chain.collectList() // convert to list to avoid hanging the database connection with Jooq + .flatMapMany(list -> Flux.fromIterable(list) + .flatMapSequential(pair -> Mono.fromCallable(() -> toMap(pair.getRight().get(ATTACHMENT_METADATA))) + .flatMap(this::getAttachments) + .map(messageAttachmentMetadata -> { + pair.getLeft().addAttachments(messageAttachmentMetadata); + return pair; + }).switchIfEmpty(Mono.just(pair)))); + } + + private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> toMap(AttachmentsDTO attachmentRepresentations) { + return attachmentRepresentations.stream().collect(ImmutableMap.toImmutableMap(MessageRepresentation.AttachmentRepresentation::getAttachmentId, obj -> obj)); + } + + private Mono<List<MessageAttachmentMetadata>> getAttachments(Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> mapAttachmentIdToAttachmentRepresentation) { + return Mono.fromCallable(mapAttachmentIdToAttachmentRepresentation::keySet) + .flatMapMany(attachmentMapper::getAttachmentsReactive) + .map(attachmentMetadata -> constructMessageAttachment(attachmentMetadata, mapAttachmentIdToAttachmentRepresentation.get(attachmentMetadata.getAttachmentId()))) + .collectList(); + } + + private MessageAttachmentMetadata constructMessageAttachment(AttachmentMetadata attachment, MessageRepresentation.AttachmentRepresentation messageAttachmentRepresentation) { + return MessageAttachmentMetadata.builder() + .attachment(attachment) + .name(messageAttachmentRepresentation.getName().orElse(null)) + .cid(messageAttachmentRepresentation.getCid()) + .isInline(messageAttachmentRepresentation.isInline()) + .build(); + } + } + + class BlobContentPartRetriever implements PartRetriever { + + @Override + public boolean isApplicable(MessageMapper.FetchType fetchType) { + return fetchType == MessageMapper.FetchType.FULL; + } + + @Override + public Flux<Pair<SimpleMailboxMessage.Builder, Record>> doRetrieve(Flux<Pair<SimpleMailboxMessage.Builder, Record>> chain) { + return chain + .flatMapSequential(pair -> retrieveFullContent(pair.getRight()) + .map(headerAndBodyContent -> Pair.of(pair.getLeft().content(headerAndBodyContent), pair.getRight())), + ReactorUtils.DEFAULT_CONCURRENCY); + } + + private Mono<Content> retrieveFullContent(Record messageRecord) { + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), + blobIdFactory.from(messageRecord.get(BODY_BLOB_ID)), + SIZE_BASED)) + .map(bodyBytes -> new HeaderAndBodyByteContent(messageRecord.get(HEADER_CONTENT), bodyBytes)); + } + } + + private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + private final PostgresAttachmentMapper attachmentMapper; + private final List<PartRetriever> partRetrievers = List.of(new AttachmentPartRetriever(), new BlobContentPartRetriever()); + + public PostgresMessageRetriever(BlobStore blobStore, + BlobId.Factory blobIdFactory, + PostgresAttachmentMapper attachmentMapper) { + this.blobStore = blobStore; + this.blobIdFactory = blobIdFactory; + this.attachmentMapper = attachmentMapper; + } + + public Flux<MailboxMessage> get(MessageMapper.FetchType fetchType, Flux<Pair<SimpleMailboxMessage.Builder, Record>> initialFlux) { + return Flux.fromIterable(partRetrievers) + .filter(partRetriever -> partRetriever.isApplicable(fetchType)) + .reduce(initialFlux, (flux, partRetriever) -> partRetriever.doRetrieve(flux)) + .flatMapMany(flux -> flux) + .map(pair -> pair.getLeft().build()); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
