This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8ee446e10a6442b78c3c20eb1491101810f817f6
Author: TungTV <vtt...@linagora.com>
AuthorDate: Thu Nov 28 09:05:41 2024 +0700

    JAMES-2586 - Postgres mailbox messages support batch/pagination for some 
methods & introduce eager fetch mode
---
 .../backends/postgres/utils/PostgresExecutor.java  |  43 +++-
 .../backends/postgres/utils/PostgresUtils.java     |  12 +
 .../james/backends/postgres/PostgresExtension.java |   4 +-
 .../postgres/mail/dao/PostgresMailboxDAO.java      |  19 +-
 .../mail/dao/PostgresMailboxMessageDAO.java        | 286 +++++++++++++--------
 .../postgres/PostgresMailboxManagerTest.java       |  42 +++
 .../mail/dao/PostgresMailboxMessageDAOTest.java    | 215 ++++++++++++++++
 .../sample-configuration/jvm.properties            |   5 +-
 8 files changed, 504 insertions(+), 122 deletions(-)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
index aaa3fadf61..cbe00f8023 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresExecutor.java
@@ -62,6 +62,7 @@ public class PostgresExecutor {
     public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PostgresExecutor.class);
     private static final String JOOQ_TIMEOUT_ERROR_LOG = "Time out executing 
Postgres query. May need to check either jOOQ reactive issue or Postgres DB 
performance.";
+    public static final boolean EAGER_FETCH = true;
 
     public static class Factory {
 
@@ -125,16 +126,40 @@ public class PostgresExecutor {
     }
 
     public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> 
queryFunction) {
+        return executeRows(queryFunction, !EAGER_FETCH);
+    }
+
+    /**
+     * @param isEagerFetch
+     * Because an R2DBC postgres connection can only execute one query at a 
time, when `isEagerFetch` is set to `true`, all elements are fetched and stored 
in a list (using the `.collectList` API).
+     * <p>
+     * This approach allows the connection to early complete its current query 
and return to the pool, helping to avoid Reactor pipeline hanging caused by 
depleting the available connection pool.
+     * <p>
+     * It is recommended to set `isEagerFetch` to `true` when the number of 
elements to fetch is small, such as the number of mailboxes for a single user.
+     * <p>
+     * In cases where the number of elements is large, consider using 
pagination-based queries.
+     * <p>
+     * Reference:
+     * - <a 
href="https://github.com/apache/james-project/pull/2514#issuecomment-2490348291";>james-project</a>
+     * - <a 
href="https://github.com/pgjdbc/r2dbc-postgresql/issues/650";>r2dbc-postgresql</a>
+     */
+    public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> 
queryFunction, boolean isEagerFetch) {
         return 
Flux.from(metricFactory.decoratePublisherWithTimerMetric("postgres-execution",
             Flux.usingWhen(getConnection(domain),
-                connection -> dslContext(connection)
-                    .flatMapMany(queryFunction)
-                    .timeout(postgresConfiguration.getJooqReactiveTimeout())
-                    .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                    .collectList()
-                    .flatMapIterable(list -> list) // Mitigation fix for 
https://github.com/jOOQ/jOOQ/issues/16556
-                    .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
-                        .filter(preparedStatementConflictException())),
+                connection -> {
+                    Flux<Record> recordFlux = dslContext(connection)
+                        .flatMapMany(queryFunction)
+                        
.timeout(postgresConfiguration.getJooqReactiveTimeout())
+                        .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
+                        .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, 
MIN_BACKOFF)
+                            .filter(preparedStatementConflictException()));
+
+                    if (isEagerFetch) {
+                        return recordFlux.collectList().flatMapIterable(list 
-> list);
+                    } else {
+                        return recordFlux;
+                    }
+                },
                 jamesPostgresConnectionFactory::closeConnection)));
     }
 
@@ -145,8 +170,6 @@ public class PostgresExecutor {
                     .flatMapMany(queryFunction)
                     .timeout(postgresConfiguration.getJooqReactiveTimeout())
                     .doOnError(TimeoutException.class, e -> 
LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
-                    .collectList()
-                    .flatMapIterable(list -> list) // The convert Flux -> 
Mono<List> -> Flux to avoid a hanging issue. See: 
https://github.com/jOOQ/jOOQ/issues/16055
                     .retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
                         .filter(preparedStatementConflictException())),
                 jamesPostgresConnectionFactory::closeConnection)));
diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java
index 9f8b075c14..9a6310e00e 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/utils/PostgresUtils.java
@@ -19,13 +19,25 @@
 
 package org.apache.james.backends.postgres.utils;
 
+import java.util.Optional;
 import java.util.function.Predicate;
 
 import org.jooq.exception.DataAccessException;
 
+import com.google.common.base.Preconditions;
+
 public class PostgresUtils {
     private static final String UNIQUE_CONSTRAINT_VIOLATION_MESSAGE = 
"duplicate key value violates unique constraint";
 
     public static final Predicate<Throwable> 
UNIQUE_CONSTRAINT_VIOLATION_PREDICATE =
         throwable -> throwable instanceof DataAccessException && 
throwable.getMessage().contains(UNIQUE_CONSTRAINT_VIOLATION_MESSAGE);
+
+    public static final int QUERY_BATCH_SIZE_DEFAULT_VALUE = 5000;
+    public static final int QUERY_BATCH_SIZE = 
Optional.ofNullable(System.getProperty("james.postgresql.query.batch.size"))
+        .map(Integer::valueOf)
+        .map(batchSize -> {
+            Preconditions.checkArgument(batchSize > 0, 
"james.postgresql.query.batch.size must be positive");
+            return batchSize;
+        })
+        .orElse(QUERY_BATCH_SIZE_DEFAULT_VALUE);
 }
diff --git 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
index dc304746f6..85159832a6 100644
--- 
a/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
+++ 
b/backends-common/postgres/src/test/java/org/apache/james/backends/postgres/PostgresExtension.java
@@ -50,8 +50,8 @@ import reactor.core.publisher.Mono;
 
 public class PostgresExtension implements GuiceModuleTestExtension {
     public enum PoolSize {
-        SMALL(1, 2),
-        LARGE(10, 20);
+        SMALL(10, 20),
+        LARGE(20, 40);
 
         private final int min;
         private final int max;
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 4c443deed9..616e4bd1e2 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -53,9 +53,11 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
+import org.apache.james.mailbox.model.search.Wildcard;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
 import org.apache.james.mailbox.postgres.mail.PostgresMailbox;
 import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility;
+import org.jooq.Condition;
 import org.jooq.Record;
 import org.jooq.impl.DSL;
 import org.jooq.impl.DefaultDataType;
@@ -202,15 +204,20 @@ public class PostgresMailboxDAO {
 
     public Flux<PostgresMailbox> 
findMailboxWithPathLike(MailboxQuery.UserBound query) {
         String pathLike = 
MailboxExpressionBackwardCompatibility.getPathLike(query);
+        Function<MailboxQuery.UserBound, Condition> getQueryCondition = 
mailboxQuery -> {
+            Condition baseCondition = 
USER_NAME.eq(mailboxQuery.getFixedUser().asString())
+                .and(MAILBOX_NAMESPACE.eq(mailboxQuery.getFixedNamespace()));
+
+            if 
(Wildcard.INSTANCE.equals(mailboxQuery.getMailboxNameExpression())) {
+                return baseCondition;
+            }
+            return baseCondition.and(MAILBOX_NAME.like(pathLike));
+        };
 
         return postgresExecutor.executeRows(dsl -> 
Flux.from(dsl.selectFrom(TABLE_NAME)
-            .where(MAILBOX_NAME.like(pathLike)
-                .and(USER_NAME.eq(query.getFixedUser().asString()))
-                .and(MAILBOX_NAMESPACE.eq(query.getFixedNamespace())))))
+                .where(getQueryCondition.apply(query))), 
PostgresExecutor.EAGER_FETCH)
             .map(RECORD_TO_POSTGRES_MAILBOX_FUNCTION)
-            .filter(query::matches)
-            .collectList()
-            .flatMapIterable(Function.identity());
+            .filter(query::matches);
     }
 
     public Mono<Boolean> hasChildren(Mailbox mailbox, char delimiter) {
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
index f40dc4e5a2..e2055a1297 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java
@@ -19,11 +19,11 @@
 
 package org.apache.james.mailbox.postgres.mail.dao;
 
-
 import static 
org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME;
 import static 
org.apache.james.backends.postgres.PostgresCommons.IN_CLAUSE_MAX_SIZE;
 import static org.apache.james.backends.postgres.PostgresCommons.UNNEST_FIELD;
 import static org.apache.james.backends.postgres.PostgresCommons.tableField;
+import static 
org.apache.james.backends.postgres.utils.PostgresExecutor.EAGER_FETCH;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.INTERNAL_DATE;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.SIZE;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageToMailboxTable.IS_ANSWERED;
@@ -61,6 +61,7 @@ import jakarta.mail.Flags;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.backends.postgres.utils.PostgresUtils;
 import org.apache.james.core.Domain;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
@@ -80,15 +81,11 @@ import org.jooq.Condition;
 import org.jooq.DSLContext;
 import org.jooq.Name;
 import org.jooq.Record;
-import org.jooq.Record1;
-import org.jooq.SelectFinalStep;
-import org.jooq.SelectSeekStep1;
 import org.jooq.SortField;
 import org.jooq.TableOnConditionStep;
 import org.jooq.UpdateConditionStep;
 import org.jooq.UpdateSetStep;
 import org.jooq.impl.DSL;
-import org.jooq.util.postgres.PostgresDSL;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
@@ -117,15 +114,7 @@ public class PostgresMailboxMessageDAO {
 
     public static final SortField<Long> DEFAULT_SORT_ORDER_BY = 
MESSAGE_UID.asc();
 
-    private static SelectFinalStep<Record1<Long>> 
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, 
Condition extraCondition, Limit limit, DSLContext dslContext) {
-        SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = 
dslContext.select(MESSAGE_UID)
-            .from(TABLE_NAME)
-            .where(MAILBOX_ID.eq((mailboxId.asUuid())))
-            .and(extraCondition)
-            .orderBy(MESSAGE_UID.asc());
-        return limit.getLimit().map(limitValue -> 
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
-            .orElse(queryWithoutLimit);
-    }
+    private static final int QUERY_BATCH_SIZE = PostgresUtils.QUERY_BATCH_SIZE;
 
     private final PostgresExecutor postgresExecutor;
 
@@ -134,60 +123,66 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRow(dslContext -> 
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+        return postgresExecutor.executeRow(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(1)))
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.unlimited(), dslContext)))
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, 
MessageRange range) {
-        switch (range.getType()) {
-            case ALL:
-                return listUnseen(mailboxId);
-            case FROM:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case RANGE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        
.and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case ONE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            default:
-                throw new RuntimeException("Unsupported range type " + 
range.getType());
-        }
+        return switch (range.getType()) {
+            case ALL -> listUnseen(mailboxId);
+            case FROM -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case RANGE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case ONE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        };
     }
 
     public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_RECENT.eq(true), Limit.unlimited(), dslContext)))
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_RECENT.eq(true))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                DSL.noCondition(), Limit.unlimited(), dslContext)))
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -199,12 +194,12 @@ public class PostgresMailboxMessageDAO {
     }
 
     private Flux<MessageUid> doListUids(PostgresMailboxId mailboxId, 
MessageRange range) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+        return  postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
                 .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -238,7 +233,9 @@ public class PostgresMailboxMessageDAO {
         return postgresExecutor.executeDeleteAndReturnList(dslContext -> 
dslContext.deleteFrom(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .returning(MESSAGE_ID))
-            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+            .map(record -> 
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)))
+            .collectList()
+            .flatMapMany(Flux::fromIterable);
     }
 
     public Mono<Void> deleteByMessageIdAndMailboxIds(PostgresMessageId 
messageId, Collection<PostgresMailboxId> mailboxIds) {
@@ -273,31 +270,66 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagesByMailboxId(PostgresMailboxId mailboxId, Limit limit, 
MessageMapper.FetchType fetchType) {
-        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
-        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select(fetchStrategy.fetchFields())
-            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
-            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-            .orderBy(DEFAULT_SORT_ORDER_BY);
+        if (limit.isUnlimited()) {
+            return Flux.defer(() -> findMessagesByMailboxIdBatch(mailboxId, 
fetchType, Optional.empty(), QUERY_BATCH_SIZE))
+                .expand(messages -> {
+                    if (messages.isEmpty() || messages.size() < 
QUERY_BATCH_SIZE) {
+                        return Mono.empty();
+                    }
+                    return findMessagesByMailboxIdBatch(mailboxId, fetchType, 
Optional.of(messages.getLast().getRight().get(MESSAGE_UID)), QUERY_BATCH_SIZE);
+                })
+                .flatMapIterable(Function.identity());
+        } else {
+            return findMessagesByMailboxIdBatch(mailboxId, fetchType, 
Optional.empty(), limit.getLimit().get())
+                .flatMapIterable(Function.identity());
+        }
+    }
 
-        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
-                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
-                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
-            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record));
+    private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> 
findMessagesByMailboxIdBatch(PostgresMailboxId mailboxId, 
MessageMapper.FetchType fetchType,
+                                                                               
                 Optional<Long> messageUidFrom, int batchSize) {
+        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(fetchStrategy.fetchFields())
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                
.and(messageUidFrom.map(MESSAGE_UID::greaterThan).orElseGet(DSL::noCondition))
+                .orderBy(MESSAGE_UID.asc())
+                .limit(batchSize)))
+            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record))
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagesByMailboxIdAndBetweenUIDs(PostgresMailboxId mailboxId, MessageUid 
from, MessageUid to, Limit limit, FetchType fetchType) {
-        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
-        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select(fetchStrategy.fetchFields())
-            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
-            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-            .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
-            .and(MESSAGE_UID.lessOrEqual(to.asLong()))
-            .orderBy(DEFAULT_SORT_ORDER_BY);
+        if (limit.isUnlimited()) {
+            return Flux.defer(() -> 
findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, 
MESSAGE_UID.greaterOrEqual(from.asLong()), to, fetchType, QUERY_BATCH_SIZE))
+                .expand(messages -> {
+                    if (messages.isEmpty() || messages.size() < 
QUERY_BATCH_SIZE) {
+                        return Mono.empty();
+                    }
+                    MessageUid messageUidFrom = 
MessageUid.of(messages.getLast().getRight().get(MESSAGE_UID));
+                    return 
findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, 
MESSAGE_UID.greaterThan(messageUidFrom.asLong()), to, fetchType, 
QUERY_BATCH_SIZE);
+                })
+                .flatMapIterable(Function.identity());
+        } else {
+            return findMessagesByMailboxIdAndBetweenUIDsBatch(mailboxId, 
MESSAGE_UID.greaterOrEqual(from.asLong()), to, fetchType, 
limit.getLimit().get())
+                .flatMapIterable(Function.identity());
+        }
+    }
 
-        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
-                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
-                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
-            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record));
+    private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> 
findMessagesByMailboxIdAndBetweenUIDsBatch(PostgresMailboxId mailboxId, 
Condition messageUidFromCondition,
+                                                                               
                               MessageUid to,
+                                                                               
                               FetchType fetchType, int batchSize) {
+        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(fetchStrategy.fetchFields())
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(messageUidFromCondition)
+                .and(MESSAGE_UID.lessOrEqual(to.asLong()))
+                .orderBy(MESSAGE_UID.asc())
+                .limit(batchSize)))
+            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record))
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Mono<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessageByMailboxIdAndUid(PostgresMailboxId mailboxId, MessageUid uid, 
FetchType fetchType) {
@@ -310,17 +342,35 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Flux<Pair<SimpleMailboxMessage.Builder, Record>> 
findMessagesByMailboxIdAndAfterUID(PostgresMailboxId mailboxId, MessageUid 
from, Limit limit, FetchType fetchType) {
-        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
-        Function<DSLContext, SelectSeekStep1<Record, Long>> queryWithoutLimit 
= dslContext -> dslContext.select(fetchStrategy.fetchFields())
-            .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
-            .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-            .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
-            .orderBy(DEFAULT_SORT_ORDER_BY);
+        if (limit.isUnlimited()) {
+            return Flux.defer(() -> 
findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, 
MESSAGE_UID.greaterOrEqual(from.asLong()), fetchType, QUERY_BATCH_SIZE))
+                .expand(messages -> {
+                    if (messages.isEmpty() || messages.size() < 
QUERY_BATCH_SIZE) {
+                        return Mono.empty();
+                    }
+                    MessageUid messageUidFrom = 
MessageUid.of(messages.getLast().getRight().get(MESSAGE_UID));
+                    return findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, 
MESSAGE_UID.greaterThan(messageUidFrom.asLong()), fetchType, QUERY_BATCH_SIZE);
+                })
+                .flatMapIterable(Function.identity());
+        } else {
+            return findMessagesByMailboxIdAndAfterUIDBatch(mailboxId, 
MESSAGE_UID.greaterOrEqual(from.asLong()), fetchType, limit.getLimit().get())
+                .flatMapIterable(Function.identity());
+        }
+    }
 
-        return postgresExecutor.executeRows(dslContext -> limit.getLimit()
-                .map(limitValue -> Flux.from(queryWithoutLimit.andThen(step -> 
step.limit(limitValue)).apply(dslContext)))
-                .orElse(Flux.from(queryWithoutLimit.apply(dslContext))))
-            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record));
+    private Mono<List<Pair<SimpleMailboxMessage.Builder, Record>>> 
findMessagesByMailboxIdAndAfterUIDBatch(PostgresMailboxId mailboxId,
+                                                                               
                            Condition messageUidFromCondition,
+                                                                               
                            FetchType fetchType, int batchSize) {
+        PostgresMailboxMessageFetchStrategy fetchStrategy = 
FETCH_TYPE_TO_FETCH_STRATEGY.apply(fetchType);
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(fetchStrategy.fetchFields())
+                .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
+                .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                .and(messageUidFromCondition)
+                .orderBy(MESSAGE_UID.asc())
+                .limit(batchSize)))
+            .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record))
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<SimpleMailboxMessage.Builder> 
findMessagesByMailboxIdAndUIDs(PostgresMailboxId mailboxId, List<MessageUid> 
uids) {
@@ -333,7 +383,7 @@ public class PostgresMailboxMessageDAO {
                     .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
                     .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                     
.and(MESSAGE_UID.in(uidsToFetch.stream().map(MessageUid::asLong).toArray(Long[]::new)))
-                    .orderBy(DEFAULT_SORT_ORDER_BY)))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
                 .map(fetchStrategy.toMessageBuilder());
 
         if (uids.size() <= IN_CLAUSE_MAX_SIZE) {
@@ -349,7 +399,7 @@ public class PostgresMailboxMessageDAO {
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(IS_DELETED.eq(true))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -360,7 +410,7 @@ public class PostgresMailboxMessageDAO {
                 .and(IS_DELETED.eq(true))
                 .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
                 .and(MESSAGE_UID.lessOrEqual(to.asLong()))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -370,7 +420,7 @@ public class PostgresMailboxMessageDAO {
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(IS_DELETED.eq(true))
                 .and(MESSAGE_UID.greaterOrEqual(from.asLong()))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -389,8 +439,8 @@ public class PostgresMailboxMessageDAO {
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
                 .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
-            .filter(record -> !record.get(IS_DELETED))
+                .and(IS_DELETED.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
@@ -401,22 +451,52 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Flux<ComposedMessageIdWithMetaData> 
findMessagesMetadata(PostgresMailboxId mailboxId, MessageRange range) {
+        return Flux.defer(() -> findMessagesMetadataBatch(mailboxId, 
range.getUidTo(), MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()), 
QUERY_BATCH_SIZE))
+            .expand(messages -> {
+                if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) {
+                    return Mono.empty();
+                }
+                MessageUid messageUidFrom = 
messages.getLast().getComposedMessageId().getUid();
+                return findMessagesMetadataBatch(mailboxId, range.getUidTo(), 
MESSAGE_UID.greaterThan(messageUidFrom.asLong()), QUERY_BATCH_SIZE);
+            })
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<ComposedMessageIdWithMetaData>> 
findMessagesMetadataBatch(PostgresMailboxId mailboxId, MessageUid messageUidTo, 
Condition messageUidFromCondition, int batchSize) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
-            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+                .and(messageUidFromCondition)
+                .and(MESSAGE_UID.lessOrEqual(messageUidTo.asLong()))
+                .orderBy(MESSAGE_UID.asc())
+                .limit(batchSize)))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION)
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<ComposedMessageIdWithMetaData> 
findAllRecentMessageMetadata(PostgresMailboxId mailboxId) {
+        return Flux.defer(() -> findAllRecentMessageMetadataBatch(mailboxId, 
Optional.empty(), QUERY_BATCH_SIZE))
+            .expand(messages -> {
+                if (messages.isEmpty() || messages.size() < QUERY_BATCH_SIZE) {
+                    return Mono.empty();
+                }
+                return findAllRecentMessageMetadataBatch(mailboxId, 
Optional.of(messages.getLast().getComposedMessageId().getUid()), 
QUERY_BATCH_SIZE);
+            })
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<ComposedMessageIdWithMetaData>> 
findAllRecentMessageMetadataBatch(PostgresMailboxId mailboxId, 
Optional<MessageUid> messageUidFrom, int batchSize) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
                 .from(TABLE_NAME)
                 .where(MAILBOX_ID.eq(mailboxId.asUuid()))
                 .and(IS_RECENT.eq(true))
-                .orderBy(DEFAULT_SORT_ORDER_BY)))
-            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
+                .and(messageUidFrom.map(messageUid -> 
MESSAGE_UID.greaterThan(messageUid.asLong())).orElseGet(DSL::noCondition))
+                .orderBy(MESSAGE_UID.asc())
+                .limit(batchSize)))
+            .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION)
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Mono<Flags> replaceFlags(PostgresMailboxId mailboxId, MessageUid 
uid, Flags newFlags, ModSeq newModSeq) {
@@ -536,7 +616,7 @@ public class PostgresMailboxMessageDAO {
     }
 
     public Mono<Void> insert(MailboxMessage mailboxMessage) {
-        return insert(mailboxMessage, 
PostgresMailboxId.class.cast(mailboxMessage.getMailboxId()));
+        return insert(mailboxMessage, (PostgresMailboxId) 
mailboxMessage.getMailboxId());
     }
 
     public Mono<Void> insert(MailboxMessage mailboxMessage, PostgresMailboxId 
mailboxId) {
@@ -561,7 +641,7 @@ public class PostgresMailboxMessageDAO {
     public Flux<MailboxId> findMailboxes(PostgresMessageId messageId) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MAILBOX_ID)
                 .from(TABLE_NAME)
-                .where(MESSAGE_ID.eq(messageId.asUuid()))))
+                .where(MESSAGE_ID.eq(messageId.asUuid()))), EAGER_FETCH)
             .map(record -> PostgresMailboxId.of(record.get(MAILBOX_ID)));
     }
 
@@ -574,14 +654,14 @@ public class PostgresMailboxMessageDAO {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(fetchStrategy.fetchFields())
                 .from(MESSAGES_JOIN_MAILBOX_MESSAGES_CONDITION_STEP)
                 .where(DSL.field(TABLE_NAME.getName() + "." + 
MESSAGE_ID.getName())
-                    
.in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList())))))
+                    
.in(messageIds.stream().map(PostgresMessageId::asUuid).collect(ImmutableList.toImmutableList())))),
 EAGER_FETCH)
             .map(record -> 
Pair.of(fetchStrategy.toMessageBuilder().apply(record), record));
     }
 
     public Flux<ComposedMessageIdWithMetaData> 
findMetadataByMessageId(PostgresMessageId messageId) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
                 .from(TABLE_NAME)
-                .where(MESSAGE_ID.eq(messageId.asUuid()))))
+                .where(MESSAGE_ID.eq(messageId.asUuid()))), EAGER_FETCH)
             .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
     }
 
@@ -589,7 +669,7 @@ public class PostgresMailboxMessageDAO {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select()
                 .from(TABLE_NAME)
                 .where(MESSAGE_ID.eq(messageId.asUuid()))
-                .and(MAILBOX_ID.eq(mailboxId.asUuid()))))
+                .and(MAILBOX_ID.eq(mailboxId.asUuid()))), EAGER_FETCH)
             .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION);
     }
 
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java
index f7d3436214..3acf0faf7f 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerTest.java
@@ -18,17 +18,34 @@
  ****************************************************************/
 package org.apache.james.mailbox.postgres;
 
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Optional;
 
+import jakarta.mail.Flags;
+
 import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.core.Username;
 import org.apache.james.events.EventBus;
 import org.apache.james.mailbox.MailboxManagerTest;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.SubscriptionManager;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.PreDeletionHooks;
 import org.apache.james.mailbox.store.StoreSubscriptionManager;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.apache.james.mime4j.dom.Message;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 class PostgresMailboxManagerTest extends 
MailboxManagerTest<PostgresMailboxManager> {
     @RegisterExtension
     static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);
@@ -53,4 +70,29 @@ class PostgresMailboxManagerTest extends 
MailboxManagerTest<PostgresMailboxManag
     protected EventBus retrieveEventBus(PostgresMailboxManager mailboxManager) 
{
         return mailboxManager.getEventBus();
     }
+
+    @Test
+    void expungeMessageShouldCorrectWhenALotOfMessages() throws Exception {
+        // Given a mailbox with 6000 messages
+        Username username = Username.of("tung");
+        PostgresMailboxManager postgresMailboxManager = mailboxManager.get();
+        MailboxSession session = 
postgresMailboxManager.createSystemSession(username);
+        postgresMailboxManager.createMailbox(MailboxPath.inbox(username), 
session).get();
+        MessageManager inboxManager = 
postgresMailboxManager.getMailbox(MailboxPath.inbox(session), session);
+
+        int totalMessages = 6000;
+        Flux.range(0, totalMessages)
+            .flatMap(i -> Mono.fromCallable(() -> 
inboxManager.appendMessage(MessageManager.AppendCommand.builder().build(Message.Builder.of()
+                .setSubject("test" + i)
+                .setBody("testmail" + i, StandardCharsets.UTF_8)), session)), 
100)
+            .collectList().block();
+        // When expunge all messages
+        inboxManager.setFlags(new Flags(Flags.Flag.DELETED), 
MessageManager.FlagsUpdateMode.ADD, MessageRange.all(), session);
+
+        List<MessageUid> expungeList = 
inboxManager.expungeReactive(MessageRange.all(), session)
+            .collectList().block();
+
+        // Then all messages are expunged
+        assertThat(expungeList).hasSize(totalMessages);
+    }
 }
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java
new file mode 100644
index 0000000000..36785cc3d6
--- /dev/null
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAOTest.java
@@ -0,0 +1,215 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.postgres.mail.dao;
+
+import static 
org.apache.james.mailbox.store.mail.MessageMapper.FetchType.METADATA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import jakarta.mail.Flags;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.backends.postgres.PostgresModule;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.PlainBlobId;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.model.ByteContent;
+import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.ThreadId;
+import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.PostgresMessageId;
+import org.apache.james.mailbox.postgres.mail.PostgresMailboxModule;
+import org.apache.james.mailbox.postgres.mail.PostgresMessageModule;
+import org.apache.james.mailbox.store.mail.model.DelegatingMailboxMessage;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Limit;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+
+public class PostgresMailboxMessageDAOTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(
+        PostgresModule.aggregateModules(PostgresMailboxModule.MODULE,
+            PostgresMessageModule.MODULE));
+
+    private final MessageId.Factory messageIdFactory = new 
PostgresMessageId.Factory();
+    private final BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
+
+    private PostgresMailboxMessageDAO testee;
+    private PostgresMessageDAO messageDAO;
+
+    @BeforeAll
+    static void setUpClass() {
+        // We set the batch size to 10 to test the batching
+        System.setProperty("james.postgresql.query.batch.size", "10");
+    }
+
+    @AfterAll
+    static void tearDownClass() {
+        System.clearProperty("james.postgresql.query.batch.size");
+    }
+
+    @BeforeEach
+    void setUp() {
+        testee = new 
PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor());
+        messageDAO = new 
PostgresMessageDAO(postgresExtension.getDefaultPostgresExecutor(), 
blobIdFactory);
+    }
+
+    @Test
+    void 
findAllRecentMessageMetadataShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries()
 {
+        // Given 100 entries
+        int sampleSize = 100;
+        PostgresMailboxId mailboxId = PostgresMailboxId.generate();
+        ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, 
mailboxId);
+
+        // When retrieve all entries
+        List<ComposedMessageIdWithMetaData> listResult = 
testee.findAllRecentMessageMetadata(mailboxId)
+            .collectList().block();
+
+        // Then return all entries
+        assertThat(listResult).hasSize(sampleSize);
+
+        assertThat(listResult.stream().map(metaData -> 
metaData.getComposedMessageId().getMessageId()).toList())
+            .containsExactly(messageIds.toArray(MessageId[]::new));
+    }
+
+    private @NotNull ArrayList<MessageId> provisionMailboxMessage(int 
sampleSize, PostgresMailboxId mailboxId) {
+        ArrayList<MessageId> messageIds = new ArrayList<>();
+        Flux.range(1, sampleSize)
+            .map(index -> {
+                SimpleMailboxMessage mailboxMessage = 
generateSimpleMailboxMessage(index, mailboxId);
+                messageIds.add(mailboxMessage.getMessageId());
+                return mailboxMessage;
+            })
+            .flatMap(message -> messageDAO.insert(message, 
UUID.randomUUID().toString())
+                .then(testee.insert(message)), 
ReactorUtils.DEFAULT_CONCURRENCY)
+            .then().block();
+        return messageIds;
+    }
+
+    @Test
+    void 
findMessagesByMailboxIdShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries()
 {
+        // Given 100 entries
+        int sampleSize = 100;
+        PostgresMailboxId mailboxId = PostgresMailboxId.generate();
+        ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, 
mailboxId);
+
+        // When retrieve all entries
+        List<SimpleMailboxMessage> listResult = 
testee.findMessagesByMailboxId(mailboxId, Limit.unlimited(), METADATA)
+            .map(e -> e.getKey().build())
+            .collectList().block();
+
+        // Then return all entries
+        assertThat(listResult).hasSize(sampleSize);
+
+        assertThat(listResult.stream().map(message -> 
message.getMessageId()).toList())
+            .containsExactly(messageIds.toArray(MessageId[]::new));
+    }
+
+    @Test
+    void 
findMessagesByMailboxIdAndBetweenUIDsShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries()
 {
+        // Given 100 entries
+        int sampleSize = 100;
+        PostgresMailboxId mailboxId = PostgresMailboxId.generate();
+        ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, 
mailboxId);
+
+        // When retrieve all entries
+        List<SimpleMailboxMessage> listResult = 
testee.findMessagesByMailboxIdAndBetweenUIDs(mailboxId, MessageUid.of(0), 
MessageUid.of(sampleSize + 1), Limit.unlimited(), METADATA)
+            .map(e -> e.getKey().build())
+            .collectList().block();
+
+        // Then return all entries
+        assertThat(listResult).hasSize(sampleSize);
+
+        
assertThat(listResult.stream().map(DelegatingMailboxMessage::getMessageId).toList())
+            .containsExactly(messageIds.toArray(MessageId[]::new));
+    }
+
+    @Test
+    void 
findMessagesByMailboxIdAndAfterUIDShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries()
 {
+        // Given 100 entries
+        int sampleSize = 100;
+        PostgresMailboxId mailboxId = PostgresMailboxId.generate();
+        ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, 
mailboxId);
+
+        // When retrieve all entries
+        List<SimpleMailboxMessage> listResult = 
testee.findMessagesByMailboxIdAndAfterUID(mailboxId, MessageUid.of(0), 
Limit.unlimited(), METADATA)
+            .map(e -> e.getKey().build())
+            .collectList().block();
+
+        // Then return all entries
+        assertThat(listResult).hasSize(sampleSize);
+
+        
assertThat(listResult.stream().map(DelegatingMailboxMessage::getMessageId).toList())
+            .containsExactly(messageIds.toArray(MessageId[]::new));
+    }
+
+    @Test
+    void 
findMessagesMetadataShouldReturnAllMatchingEntryWhenBatchSizeIsSmallerThanAllEntries()
 {
+        // Given 100 entries
+        int sampleSize = 100;
+        PostgresMailboxId mailboxId = PostgresMailboxId.generate();
+        ArrayList<MessageId> messageIds = provisionMailboxMessage(sampleSize, 
mailboxId);
+
+        // When retrieve all entries
+        List<ComposedMessageIdWithMetaData> listResult = 
testee.findMessagesMetadata(mailboxId, MessageRange.all())
+            .collectList().block();
+
+        // Then return all entries
+        assertThat(listResult).hasSize(sampleSize);
+
+        assertThat(listResult.stream().map(metaData -> 
metaData.getComposedMessageId().getMessageId()).toList())
+            .containsExactly(messageIds.toArray(MessageId[]::new));
+    }
+
+    private SimpleMailboxMessage generateSimpleMailboxMessage(int index, 
PostgresMailboxId mailboxId) {
+        MessageId messageId = messageIdFactory.generate();
+        String messageContent = "Simple message content" + index;
+        return SimpleMailboxMessage.builder()
+            .messageId(messageId)
+            .threadId(ThreadId.fromBaseMessageId(messageId))
+            .uid(MessageUid.of(index))
+            .content(new 
ByteContent((messageContent.getBytes(StandardCharsets.UTF_8))))
+            .size(messageContent.length())
+            .internalDate(new Date())
+            .bodyStartOctet(0)
+            .flags(new Flags(Flags.Flag.RECENT))
+            .properties(new PropertyBuilder())
+            .mailboxId(mailboxId)
+            .modseq(ModSeq.of(index))
+            .build();
+    }
+}
diff --git a/server/apps/postgres-app/sample-configuration/jvm.properties 
b/server/apps/postgres-app/sample-configuration/jvm.properties
index 73b964c9b4..186fbe4b2c 100644
--- a/server/apps/postgres-app/sample-configuration/jvm.properties
+++ b/server/apps/postgres-app/sample-configuration/jvm.properties
@@ -50,4 +50,7 @@ james.jmx.credential.generation=true
 # Disable Remote Code Execution feature from JMX
 # CF 
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.management/share/classes/com/sun/jmx/remote/security/MBeanServerAccessController.java#L646
 jmx.remote.x.mlet.allow.getMBeansFromURL=false
-openjpa.Multithreaded=true
\ No newline at end of file
+openjpa.Multithreaded=true
+
+# Integer. Optional, defaults to 5000. In case of large data, this argument 
specifies the maximum number of rows to return in a single batch set when 
executing query.
+#query.batch.size=5000
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to