This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch postgresql in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8ee446e10a6442b78c3c20eb1491101810f817f6 Author: TungTV <vtt...@linagora.com> AuthorDate: Thu Nov 28 09:05:41 2024 +0700 JAMES-2586 - Postgres mailbox messages support batch/pagination for some methods & introduce eager fetch mode --- .../backends/postgres/utils/PostgresExecutor.java | 43 +++- .../backends/postgres/utils/PostgresUtils.java | 12 + .../james/backends/postgres/PostgresExtension.java | 4 +- .../postgres/mail/dao/PostgresMailboxDAO.java | 19 +- .../mail/dao/PostgresMailboxMessageDAO.java | 286 +++++++++++++-------- .../postgres/PostgresMailboxManagerTest.java | 42 +++ .../mail/dao/PostgresMailboxMessageDAOTest.java | 215 ++++++++++++++++ .../sample-configuration/jvm.properties | 5 +- 8 files changed, 504 insertions(+), 122 deletions(-) diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java index aaa3fadf61..cbe00f8023 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java @@ -62,6 +62,7 @@ public class PostgresExecutor { public static final Duration MIN_BACKOFF = Duration.ofMillis(1); private static final Logger LOGGER = LoggerFactory.getLogger(PostgresExecutor.class); private static final String JOOQ_TIMEOUT_ERROR_LOG = "Time out executing Postgres query. May need to check either jOOQ reactive issue or Postgres DB performance."; + public static final boolean EAGER_FETCH = true; public static class Factory { @@ -125,16 +126,40 @@ public class PostgresExecutor { } public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) { + return executeRows(queryFunction, !EAGER_FETCH); + } + + /** + * @param isEagerFetch + * Because an R2DBC postgres connection can only execute one query at a time, when `isEagerFetch` is set to `true`, all elements are fetched and stored in a list (using the `.collectList` API). + * <p> + * This approach allows the connection to early complete its current query and return to the pool, helping to avoid Reactor pipeline hanging caused by depleting the available connection pool. + * <p> + * It is recommended to set `isEagerFetch` to `true` when the number of elements to fetch is small, such as the number of mailboxes for a single user. + * <p> + * In cases where the number of elements is large, consider using pagination-based queries. + * <p> + * Reference: + * - <a href="https://github.com/apache/james-project/pull/2514#issuecomment-2490348291">james-project</a> + * - <a href="https://github.com/pgjdbc/r2dbc-postgresql/issues/650">r2dbc-postgresql</a> + */ + public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction, boolean isEagerFetch) { return Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution", Flux.usingWhen(getConnection(domain), - connection -> dslContext(connection) - .flatMapMany(queryFunction) - .timeout(postgresConfiguration.getJooqReactiveTimeout()) - .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .collectList() - .flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556 - .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) - .filter(preparedStatementConflictException())), + connection -> { + Flux<Record> recordFlux = dslContext(connection) + .flatMapMany(queryFunction) + .timeout(postgresConfiguration.getJooqReactiveTimeout()) + .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) + .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) + .filter(preparedStatementConflictException())); + + if (isEagerFetch) { + return recordFlux.collectList().flatMapIterable(list -> list); + } else { + return recordFlux; + } + }, jamesPostgresConnectionFactory::closeConnection))); } @@ -145,8 +170,6 @@ public class PostgresExecutor { .flatMapMany(queryFunction) .timeout(postgresConfiguration.getJooqReactiveTimeout()) .doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e)) - .collectList() - .flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055 .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF) .filter(preparedStatementConflictException())), jamesPostgresConnectionFactory::closeConnection))); diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java index 9f8b075c14..9a6310e00e 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java @@ -19,13 +19,25 @@ package org.apache.james.backends.postgres.utils; +import java.util.Optional; import java.util.function.Predicate; import org.jooq.exception.DataAccessException; +import com.google.common.base.Preconditions; + public class PostgresUtils { private static final String UNIQUE_CONSTRAINT_VIOLATION_MESSAGE = "duplicate key value violates unique constraint"; public static final Predicate<Throwable> UNIQUE_CONSTRAINT_VIOLATION_PREDICATE = throwable -> throwable instanceof DataAccessException && throwable.getMessage().contains(UNIQUE_CONSTRAINT_VIOLATION_MESSAGE); + + public static final int QUERY_BATCH_SIZE_DEFAULT_VALUE = 5000; + public static final int QUERY_BATCH_SIZE = Optional.ofNullable(System.getProperty("james.postgresql.query.batch.size")) + .map(Integer::valueOf) + .map(batchSize -> { + Preconditions.checkArgument(batchSize > 0, "james.postgresql.query.batch.size must be positive"); + return batchSize; + }) + .orElse(QUERY_BATCH_SIZE_DEFAULT_VALUE); } diff --git a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java index dc304746f6..85159832a6 100644 --- a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java +++ b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java @@ -50,8 +50,8 @@ import reactor.core.publisher.Mono; public class PostgresExtension implements GuiceModuleTestExtension { public enum PoolSize { - SMALL(1, 2), - LARGE(10, 20); + SMALL(10, 20), + LARGE(20, 40); private final int min; private final int max; diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java index 4c443deed9..616e4bd1e2 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java @@ -53,9 +53,11 @@ import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.search.MailboxQuery; +import org.apache.james.mailbox.model.search.Wildcard; import org.apache.james.mailbox.postgres.PostgresMailboxId; import org.apache.james.mailbox.postgres.mail.PostgresMailbox; import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility; +import org.jooq.Condition; import org.jooq.Record; import org.jooq.impl.DSL; import org.jooq.impl.DefaultDataType; @@ -202,15 +204,20 @@ public class PostgresMailboxDAO { public Flux<PostgresMailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query); + Function<MailboxQuery.UserBound, Condition> getQueryCondition = mailboxQuery -> { + Condition baseCondition = USER_NAME.eq(mailboxQuery.getFixedUser().asString()) + .and(MAILBOX_NAMESPACE.eq(mailboxQuery.getFixedNamespace())); + + if (Wildcard.INSTANCE.equals(mailboxQuery.getMailboxNameExpression())) { + return baseCondition; + } + return baseCondition.and(MAILBOX_NAME.like(pathLike)); + }; return postgresExecutor.executeRows(dsl -> Flux.from(dsl.selectFrom(TABLE_NAME) - .where(MAILBOX_NAME.like(pathLike) - .and(USER_NAME.eq(query.getFixedUser().asString())) - .and(MAILBOX_NAMESPACE.eq(query.getFixedNamespace()))))) + .where(getQueryCondition.apply(query))), PostgresExecutor.EAGER_FETCH) .map(RECORD_TO_POSTGRES_MAILBOX_FUNCTION) - .filter(query::matches) - .collectList() - .flatMapIterable(Function.identity()); + .filter(query::matches); } public Mono<Boolean> hasChildren(Mailbox mailbox, char delimiter) { diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java index f40dc4e5a2..e2055a1297 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java @@ -19,11 +19,11 @@ package org.apache.james.mailbox.postgres.mail.dao; - import static org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME; import static org.apache.james.backends.postgres.PostgresCommons.IN_CLAUSE_MAX_SIZE; import static org.apache.james.backends.postgres.PostgresCommons.UNNEST_FIELD; import static org.apache.james.backends.postgres.PostgresCommons.tableField; +import static org.apache.james.backends.postgres.utils.PostgresExecutor.EAGER_FETCH; import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE; import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE; import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED; @@ -61,6 +61,7 @@ import jakarta.mail.Flags; import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.backends.postgres.utils.PostgresUtils; import org.apache.james.core.Domain; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.ModSeq; @@ -80,15 +81,11 @@ import org.jooq.Condition; import org.jooq.DSLContext; import org.jooq.Name; import org.jooq.Record; -import org.jooq.Record1; -import org.jooq.SelectFinalStep; -import org.jooq.SelectSeekStep1; import org.jooq.SortField; import org.jooq.TableOnConditionStep; import org.jooq.UpdateConditionStep; import org.jooq.UpdateSetStep; import org.jooq.impl.DSL; -import org.jooq.util.postgres.PostgresDSL; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -117,15 +114,7 @@ public class PostgresMailboxMessageDAO { public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); - private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { - SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq((mailboxId.asUuid()))) - .and(extraCondition) - .orderBy(MESSAGE_UID.asc()); - return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) - .orElse(queryWithoutLimit); - } + private static final int QUERY_BATCH_SIZE = PostgresUtils.QUERY_BATCH_SIZE; private final PostgresExecutor postgresExecutor; @@ -134,60 +123,66 @@ public class PostgresMailboxMessageDAO { } public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.limit(1), dslContext))) + return postgresExecutor.executeRow(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(1))) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.unlimited(), dslContext))) + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, MessageRange range) { - switch (range.getType()) { - case ALL: - return listUnseen(mailboxId); - case FROM: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case RANGE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case ONE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - default: - throw new RuntimeException("Unsupported range type " + range.getType()); - } + return switch (range.getType()) { + case ALL -> listUnseen(mailboxId); + case FROM -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case RANGE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case ONE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + }; } public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_RECENT.eq(true), Limit.unlimited(), dslContext))) + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_RECENT.eq(true)) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - DSL.noCondition(), Limit.unlimited(), dslContext))) + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -199,12 +194,12 @@ public class PostgresMailboxMessageDAO { } private Flux<MessageUid> doListUids(PostgresMailboxId mailboxId, MessageRange range) { - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -238,7 +233,9 @@ public class PostgresMailboxMessageDAO { return postgresExecutor.executeDeleteAndReturnList(dslContext -> dslContext.deleteFrom(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .returning(MESSAGE_ID)) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))) + .collectList() + .flatMapMany(Flux::fromIterable); } public Mono<Void> deleteByMessageIdAndMailboxIds(PostgresMessageId messageId, Collection<PostgresMailboxId> mailboxIds) { @@ -273,31 +270,66 @@ public class PostgresMailboxMessageDAO { } public Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagesByMailboxId(PostgresMailboxId mailboxId, Limit limit, MessageMapper.FetchType fetchType) { - PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); - Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select(fetchStrategy.fetchFields()) - .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .orderBy(DEFAULT_SORT_ORDER_BY); + if (limit.isUnlimited()) { + return Flux.defer(() -> findMessagesByMailboxIdBatch(mailboxId, fetchType, Optional.empty(), QUERY_BATCH_SIZE)) + .expand(messages -> { + if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) { + return Mono.empty(); + } + return findMessagesByMailboxIdBatch(mailboxId, fetchType, Optional.of(messages.getLast().getRight().get(MESSAGE_UID)), QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } else { + return findMessagesByMailboxIdBatch(mailboxId, fetchType, Optional.empty(), limit.getLimit().get()) + .flatMapIterable(Function.identity()); + } + } - return postgresExecutor.executeRows(dslContext -> limit.getLimit() - .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) - .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) - .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)); + private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> findMessagesByMailboxIdBatch(PostgresMailboxId mailboxId, MessageMapper.FetchType fetchType, + Optional<Long> messageUidFrom, int batchSize) { + PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(fetchStrategy.fetchFields()) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(messageUidFrom.map(MESSAGE_UID::greaterThan).orElseGet(DSL::noCondition)) + .orderBy(MESSAGE_UID.asc()) + .limit(batchSize))) + .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, MessageUid from, MessageUid to, Limit limit, FetchType fetchType) { - PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); - Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select(fetchStrategy.fetchFields()) - .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(MESSAGE_UID.greaterOrEqual(from.asLong())) - .and(MESSAGE_UID.lessOrEqual(to.asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY); + if (limit.isUnlimited()) { + return Flux.defer(() -> findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, MESSAGE_UID.greaterOrEqual(from.asLong()), to, fetchType, QUERY_BATCH_SIZE)) + .expand(messages -> { + if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) { + return Mono.empty(); + } + MessageUid messageUidFrom = MessageUid.of(messages.getLast().getRight().get(MESSAGE_UID)); + return findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, MESSAGE_UID.greaterThan(messageUidFrom.asLong()), to, fetchType, QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } else { + return findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, MESSAGE_UID.greaterOrEqual(from.asLong()), to, fetchType, limit.getLimit().get()) + .flatMapIterable(Function.identity()); + } + } - return postgresExecutor.executeRows(dslContext -> limit.getLimit() - .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) - .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) - .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)); + private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> findMessagesByMailboxIdAndBetweenUIDsBatch(PostgresMailboxId mailboxId, Condition messageUidFromCondition, + MessageUid to, + FetchType fetchType, int batchSize) { + PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(fetchStrategy.fetchFields()) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(messageUidFromCondition) + .and(MESSAGE_UID.lessOrEqual(to.asLong())) + .orderBy(MESSAGE_UID.asc()) + .limit(batchSize))) + .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Mono<Pair<SimpleMailboxMessage.Builder, Record>> findMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid uid, FetchType fetchType) { @@ -310,17 +342,35 @@ public class PostgresMailboxMessageDAO { } public Flux<Pair<SimpleMailboxMessage.Builder, Record>> findMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, MessageUid from, Limit limit, FetchType fetchType) { - PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); - Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit = dslContext -> dslContext.select(fetchStrategy.fetchFields()) - .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(MESSAGE_UID.greaterOrEqual(from.asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY); + if (limit.isUnlimited()) { + return Flux.defer(() -> findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, MESSAGE_UID.greaterOrEqual(from.asLong()), fetchType, QUERY_BATCH_SIZE)) + .expand(messages -> { + if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) { + return Mono.empty(); + } + MessageUid messageUidFrom = MessageUid.of(messages.getLast().getRight().get(MESSAGE_UID)); + return findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, MESSAGE_UID.greaterThan(messageUidFrom.asLong()), fetchType, QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } else { + return findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, MESSAGE_UID.greaterOrEqual(from.asLong()), fetchType, limit.getLimit().get()) + .flatMapIterable(Function.identity()); + } + } - return postgresExecutor.executeRows(dslContext -> limit.getLimit() - .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> step.limit(limitValue)).apply(dslContext))) - .orElse(Flux.from(queryWithoutLimit.apply(dslContext)))) - .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)); + private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> findMessagesByMailboxIdAndAfterUIDBatch(PostgresMailboxId mailboxId, + Condition messageUidFromCondition, + FetchType fetchType, int batchSize) { + PostgresMailboxMessageFetchStrategy fetchStrategy = FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType); + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(fetchStrategy.fetchFields()) + .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(messageUidFromCondition) + .orderBy(MESSAGE_UID.asc()) + .limit(batchSize))) + .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<SimpleMailboxMessage.Builder> findMessagesByMailboxIdAndUIDs(PostgresMailboxId mailboxId, List<MessageUid> uids) { @@ -333,7 +383,7 @@ public class PostgresMailboxMessageDAO { .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new))) - .orderBy(DEFAULT_SORT_ORDER_BY))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(fetchStrategy.toMessageBuilder()); if (uids.size() <= IN_CLAUSE_MAX_SIZE) { @@ -349,7 +399,7 @@ public class PostgresMailboxMessageDAO { .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(IS_DELETED.eq(true)) - .orderBy(DEFAULT_SORT_ORDER_BY))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -360,7 +410,7 @@ public class PostgresMailboxMessageDAO { .and(IS_DELETED.eq(true)) .and(MESSAGE_UID.greaterOrEqual(from.asLong())) .and(MESSAGE_UID.lessOrEqual(to.asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -370,7 +420,7 @@ public class PostgresMailboxMessageDAO { .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(IS_DELETED.eq(true)) .and(MESSAGE_UID.greaterOrEqual(from.asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -389,8 +439,8 @@ public class PostgresMailboxMessageDAO { .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .filter(record -> !record.get(IS_DELETED)) + .and(IS_DELETED.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } @@ -401,22 +451,52 @@ public class PostgresMailboxMessageDAO { } public Flux<ComposedMessageIdWithMetaData> findMessagesMetadata(PostgresMailboxId mailboxId, MessageRange range) { + return Flux.defer(() -> findMessagesMetadataBatch(mailboxId, range.getUidTo(), MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()), QUERY_BATCH_SIZE)) + .expand(messages -> { + if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) { + return Mono.empty(); + } + MessageUid messageUidFrom = messages.getLast().getComposedMessageId().getUid(); + return findMessagesMetadataBatch(mailboxId, range.getUidTo(), MESSAGE_UID.greaterThan(messageUidFrom.asLong()), QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } + + private Mono<List<ComposedMessageIdWithMetaData>> findMessagesMetadataBatch(PostgresMailboxId mailboxId, MessageUid messageUidTo, Condition messageUidFromCondition, int batchSize) { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + .and(messageUidFromCondition) + .and(MESSAGE_UID.lessOrEqual(messageUidTo.asLong())) + .orderBy(MESSAGE_UID.asc()) + .limit(batchSize))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<ComposedMessageIdWithMetaData> findAllRecentMessageMetadata(PostgresMailboxId mailboxId) { + return Flux.defer(() -> findAllRecentMessageMetadataBatch(mailboxId, Optional.empty(), QUERY_BATCH_SIZE)) + .expand(messages -> { + if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) { + return Mono.empty(); + } + return findAllRecentMessageMetadataBatch(mailboxId, Optional.of(messages.getLast().getComposedMessageId().getUid()), QUERY_BATCH_SIZE); + }) + .flatMapIterable(Function.identity()); + } + + private Mono<List<ComposedMessageIdWithMetaData>> findAllRecentMessageMetadataBatch(PostgresMailboxId mailboxId, Optional<MessageUid> messageUidFrom, int batchSize) { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(IS_RECENT.eq(true)) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); + .and(messageUidFrom.map(messageUid -> MESSAGE_UID.greaterThan(messageUid.asLong())).orElseGet(DSL::noCondition)) + .orderBy(MESSAGE_UID.asc()) + .limit(batchSize))) + .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Mono<Flags> replaceFlags(PostgresMailboxId mailboxId, MessageUid uid, Flags newFlags, ModSeq newModSeq) { @@ -536,7 +616,7 @@ public class PostgresMailboxMessageDAO { } public Mono<Void> insert(MailboxMessage mailboxMessage) { - return insert(mailboxMessage, PostgresMailboxId.class.cast(mailboxMessage.getMailboxId())); + return insert(mailboxMessage, (PostgresMailboxId) mailboxMessage.getMailboxId()); } public Mono<Void> insert(MailboxMessage mailboxMessage, PostgresMailboxId mailboxId) { @@ -561,7 +641,7 @@ public class PostgresMailboxMessageDAO { public Flux<MailboxId> findMailboxes(PostgresMessageId messageId) { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MAILBOX_ID) .from(TABLE_NAME) - .where(MESSAGE_ID.eq(messageId.asUuid())))) + .where(MESSAGE_ID.eq(messageId.asUuid()))), EAGER_FETCH) .map(record -> PostgresMailboxId.of(record.get(MAILBOX_ID))); } @@ -574,14 +654,14 @@ public class PostgresMailboxMessageDAO { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(fetchStrategy.fetchFields()) .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP) .where(DSL.field(TABLE_NAME.getName() + "." + MESSAGE_ID.getName()) - .in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList()))))) + .in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList())))), EAGER_FETCH) .map(record -> Pair.of(fetchStrategy.toMessageBuilder().apply(record), record)); } public Flux<ComposedMessageIdWithMetaData> findMetadataByMessageId(PostgresMessageId messageId) { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() .from(TABLE_NAME) - .where(MESSAGE_ID.eq(messageId.asUuid())))) + .where(MESSAGE_ID.eq(messageId.asUuid()))), EAGER_FETCH) .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); } @@ -589,7 +669,7 @@ public class PostgresMailboxMessageDAO { return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select() .from(TABLE_NAME) .where(MESSAGE_ID.eq(messageId.asUuid())) - .and(MAILBOX_ID.eq(mailboxId.asUuid())))) + .and(MAILBOX_ID.eq(mailboxId.asUuid()))), EAGER_FETCH) .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java index f7d3436214..3acf0faf7f 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java @@ -18,17 +18,34 @@ ****************************************************************/ package org.apache.james.mailbox.postgres; +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Optional; +import jakarta.mail.Flags; + import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.core.Username; import org.apache.james.events.EventBus; import org.apache.james.mailbox.MailboxManagerTest; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageManager; +import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.SubscriptionManager; +import org.apache.james.mailbox.model.MailboxPath; +import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.store.PreDeletionHooks; import org.apache.james.mailbox.store.StoreSubscriptionManager; import org.apache.james.metrics.tests.RecordingMetricFactory; +import org.apache.james.mime4j.dom.Message; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + class PostgresMailboxManagerTest extends MailboxManagerTest<PostgresMailboxManager> { @RegisterExtension static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE); @@ -53,4 +70,29 @@ class PostgresMailboxManagerTest extends MailboxManagerTest<PostgresMailboxManag protected EventBus retrieveEventBus(PostgresMailboxManager mailboxManager) { return mailboxManager.getEventBus(); } + + @Test + void expungeMessageShouldCorrectWhenALotOfMessages() throws Exception { + // Given a mailbox with 6000 messages + Username username = Username.of("tung"); + PostgresMailboxManager postgresMailboxManager = mailboxManager.get(); + MailboxSession session = postgresMailboxManager.createSystemSession(username); + postgresMailboxManager.createMailbox(MailboxPath.inbox(username), session).get(); + MessageManager inboxManager = postgresMailboxManager.getMailbox(MailboxPath.inbox(session), session); + + int totalMessages = 6000; + Flux.range(0, totalMessages) + .flatMap(i -> Mono.fromCallable(() -> inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(Message.Builder.of() + .setSubject("test" + i) + .setBody("testmail" + i, StandardCharsets.UTF_8)), session)), 100) + .collectList().block(); + // When expunge all messages + inboxManager.setFlags(new Flags(Flags.Flag.DELETED), MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session); + + List<MessageUid> expungeList = inboxManager.expungeReactive(MessageRange.all(), session) + .collectList().block(); + + // Then all messages are expunged + assertThat(expungeList).hasSize(totalMessages); + } } diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java new file mode 100644 index 0000000000..36785cc3d6 --- /dev/null +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java @@ -0,0 +1,215 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.postgres.mail.dao; + +import static org.apache.james.mailbox.store.mail.MessageMapper.FetchType.METADATA; +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.UUID; + +import jakarta.mail.Flags; + +import org.apache.james.backends.postgres.PostgresExtension; +import org.apache.james.backends.postgres.PostgresModule; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.PlainBlobId; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.ModSeq; +import org.apache.james.mailbox.model.ByteContent; +import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.ThreadId; +import org.apache.james.mailbox.postgres.PostgresMailboxId; +import org.apache.james.mailbox.postgres.PostgresMessageId; +import org.apache.james.mailbox.postgres.mail.PostgresMailboxModule; +import org.apache.james.mailbox.postgres.mail.PostgresMessageModule; +import org.apache.james.mailbox.store.mail.model.DelegatingMailboxMessage; +import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.ReactorUtils; +import org.apache.james.util.streams.Limit; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.core.publisher.Flux; + +public class PostgresMailboxMessageDAOTest { + @RegisterExtension + static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity( + PostgresModule.aggregateModules(PostgresMailboxModule.MODULE, + PostgresMessageModule.MODULE)); + + private final MessageId.Factory messageIdFactory = new PostgresMessageId.Factory(); + private final BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); + + private PostgresMailboxMessageDAO testee; + private PostgresMessageDAO messageDAO; + + @BeforeAll + static void setUpClass() { + // We set the batch size to 10 to test the batching + System.setProperty("james.postgresql.query.batch.size", "10"); + } + + @AfterAll + static void tearDownClass() { + System.clearProperty("james.postgresql.query.batch.size"); + } + + @BeforeEach + void setUp() { + testee = new PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor()); + messageDAO = new PostgresMessageDAO(postgresExtension.getDefaultPostgresExecutor(), blobIdFactory); + } + + @Test + void findAllRecentMessageMetadataShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries() { + // Given 100 entries + int sampleSize = 100; + PostgresMailboxId mailboxId = PostgresMailboxId.generate(); + ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, mailboxId); + + // When retrieve all entries + List<ComposedMessageIdWithMetaData> listResult = testee.findAllRecentMessageMetadata(mailboxId) + .collectList().block(); + + // Then return all entries + assertThat(listResult).hasSize(sampleSize); + + assertThat(listResult.stream().map(metaData -> metaData.getComposedMessageId().getMessageId()).toList()) + .containsExactly(messageIds.toArray(MessageId[]::new)); + } + + private @NotNull ArrayList<MessageId> provisionMailboxMessage(int sampleSize, PostgresMailboxId mailboxId) { + ArrayList<MessageId> messageIds = new ArrayList<>(); + Flux.range(1, sampleSize) + .map(index -> { + SimpleMailboxMessage mailboxMessage = generateSimpleMailboxMessage(index, mailboxId); + messageIds.add(mailboxMessage.getMessageId()); + return mailboxMessage; + }) + .flatMap(message -> messageDAO.insert(message, UUID.randomUUID().toString()) + .then(testee.insert(message)), ReactorUtils.DEFAULT_CONCURRENCY) + .then().block(); + return messageIds; + } + + @Test + void findMessagesByMailboxIdShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries() { + // Given 100 entries + int sampleSize = 100; + PostgresMailboxId mailboxId = PostgresMailboxId.generate(); + ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, mailboxId); + + // When retrieve all entries + List<SimpleMailboxMessage> listResult = testee.findMessagesByMailboxId(mailboxId, Limit.unlimited(), METADATA) + .map(e -> e.getKey().build()) + .collectList().block(); + + // Then return all entries + assertThat(listResult).hasSize(sampleSize); + + assertThat(listResult.stream().map(message -> message.getMessageId()).toList()) + .containsExactly(messageIds.toArray(MessageId[]::new)); + } + + @Test + void findMessagesByMailboxIdAndBetweenUIDsShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries() { + // Given 100 entries + int sampleSize = 100; + PostgresMailboxId mailboxId = PostgresMailboxId.generate(); + ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, mailboxId); + + // When retrieve all entries + List<SimpleMailboxMessage> listResult = testee.findMessagesByMailboxIdAndBetweenUIDs(mailboxId, MessageUid.of(0), MessageUid.of(sampleSize + 1), Limit.unlimited(), METADATA) + .map(e -> e.getKey().build()) + .collectList().block(); + + // Then return all entries + assertThat(listResult).hasSize(sampleSize); + + assertThat(listResult.stream().map(DelegatingMailboxMessage::getMessageId).toList()) + .containsExactly(messageIds.toArray(MessageId[]::new)); + } + + @Test + void findMessagesByMailboxIdAndAfterUIDShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries() { + // Given 100 entries + int sampleSize = 100; + PostgresMailboxId mailboxId = PostgresMailboxId.generate(); + ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, mailboxId); + + // When retrieve all entries + List<SimpleMailboxMessage> listResult = testee.findMessagesByMailboxIdAndAfterUID(mailboxId, MessageUid.of(0), Limit.unlimited(), METADATA) + .map(e -> e.getKey().build()) + .collectList().block(); + + // Then return all entries + assertThat(listResult).hasSize(sampleSize); + + assertThat(listResult.stream().map(DelegatingMailboxMessage::getMessageId).toList()) + .containsExactly(messageIds.toArray(MessageId[]::new)); + } + + @Test + void findMessagesMetadataShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries() { + // Given 100 entries + int sampleSize = 100; + PostgresMailboxId mailboxId = PostgresMailboxId.generate(); + ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, mailboxId); + + // When retrieve all entries + List<ComposedMessageIdWithMetaData> listResult = testee.findMessagesMetadata(mailboxId, MessageRange.all()) + .collectList().block(); + + // Then return all entries + assertThat(listResult).hasSize(sampleSize); + + assertThat(listResult.stream().map(metaData -> metaData.getComposedMessageId().getMessageId()).toList()) + .containsExactly(messageIds.toArray(MessageId[]::new)); + } + + private SimpleMailboxMessage generateSimpleMailboxMessage(int index, PostgresMailboxId mailboxId) { + MessageId messageId = messageIdFactory.generate(); + String messageContent = "Simple message content" + index; + return SimpleMailboxMessage.builder() + .messageId(messageId) + .threadId(ThreadId.fromBaseMessageId(messageId)) + .uid(MessageUid.of(index)) + .content(new ByteContent((messageContent.getBytes(StandardCharsets.UTF_8)))) + .size(messageContent.length()) + .internalDate(new Date()) + .bodyStartOctet(0) + .flags(new Flags(Flags.Flag.RECENT)) + .properties(new PropertyBuilder()) + .mailboxId(mailboxId) + .modseq(ModSeq.of(index)) + .build(); + } +} diff --git a/server/apps/postgres-app/sample-configuration/jvm.properties b/server/apps/postgres-app/sample-configuration/jvm.properties index 73b964c9b4..186fbe4b2c 100644 --- a/server/apps/postgres-app/sample-configuration/jvm.properties +++ b/server/apps/postgres-app/sample-configuration/jvm.properties @@ -50,4 +50,7 @@ james.jmx.credential.generation=true # Disable Remote Code Execution feature from JMX # CF https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.management/share/classes/com/sun/jmx/remote/security/MBeanServerAccessController.java#L646 jmx.remote.x.mlet.allow.getMBeansFromURL=false -openjpa.Multithreaded=true \ No newline at end of file +openjpa.Multithreaded=true + +# Integer. Optional, defaults to 5000. In case of large data, this argument specifies the maximum number of rows to return in a single batch set when executing query. +#query.batch.size=5000 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org