vttranlina commented on code in PR #2514: URL: https://github.com/apache/james-project/pull/2514#discussion_r1853409698
########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java: ########## @@ -117,78 +116,107 @@ public PostgresMailboxMessageDAO create(Optional<Domain> domain) { public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); - private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { - SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq((mailboxId.asUuid()))) - .and(extraCondition) - .orderBy(MESSAGE_UID.asc()); - return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) - .orElse(queryWithoutLimit); - } - private final PostgresExecutor postgresExecutor; public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) { this.postgresExecutor = postgresExecutor; } public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.limit(1), dslContext))) + return postgresExecutor.executeRow(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(1))) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * BATCH_SIZE, BATCH_SIZE)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(batchSize) + .offset(offset))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION) + .collectList() Review Comment: Note: we still need `collectionList` in small block. If not, the issue https://github.com/apache/james-project/pull/2514#issuecomment-2490210967 still happend. TLDR: if we don't want to use `collectionList` the batchSize must be < `32` when reactor configuration by default (IMO, 32 is quiet small for batch size)  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org