chibenwa commented on code in PR #2514: URL: https://github.com/apache/james-project/pull/2514#discussion_r1853892580
########## mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/search/AllSearchOverrideTest.java: ########## @@ -51,7 +51,7 @@ public class AllSearchOverrideTest { @BeforeEach void setUp() { postgresMessageDAO = new PostgresMessageDAO(postgresExtension.getDefaultPostgresExecutor(), new PlainBlobId.Factory()); - postgresMailboxMessageDAO = new PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor()); + postgresMailboxMessageDAO = new PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor(), postgresExtension.getExecutorFactory().queryBatchSize()); Review Comment: for all search overrides we only load a list of uids. I think .collectList would be better. At least we could start with it. ########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java: ########## @@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> domain) { public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); - private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { - SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq((mailboxId.asUuid()))) - .and(extraCondition) - .orderBy(MESSAGE_UID.asc()); - return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) - .orElse(queryWithoutLimit); - } - private final PostgresExecutor postgresExecutor; + private final int queryBatchSize; - public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) { + public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor, + int queryBatchSize) { this.postgresExecutor = postgresExecutor; + this.queryBatchSize = queryBatchSize; } public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.limit(1), dslContext))) + return postgresExecutor.executeRow(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(1))) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) Review Comment: For uid only let's .collectList and not do application side batching. ########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java: ########## @@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> domain) { public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); - private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { - SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq((mailboxId.asUuid()))) - .and(extraCondition) - .orderBy(MESSAGE_UID.asc()); - return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) - .orElse(queryWithoutLimit); - } - private final PostgresExecutor postgresExecutor; + private final int queryBatchSize; - public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) { + public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor, + int queryBatchSize) { this.postgresExecutor = postgresExecutor; + this.queryBatchSize = queryBatchSize; } public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.limit(1), dslContext))) + return postgresExecutor.executeRow(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(1))) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(batchSize) + .offset(offset))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, MessageRange range) { - switch (range.getType()) { - case ALL: - return listUnseen(mailboxId); - case FROM: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case RANGE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case ONE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - default: - throw new RuntimeException("Unsupported range type " + range.getType()); - } + return switch (range.getType()) { + case ALL -> listUnseen(mailboxId); + case FROM -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case RANGE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case ONE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + }; } public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId mailboxId) { Review Comment: Same ########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java: ########## @@ -345,33 +425,66 @@ public Flux<SimpleMailboxMessage.Builder> findMessagesByMailboxIdAndUIDs(Postgre } public Flux<MessageUid> findDeletedMessagesByMailboxId(PostgresMailboxId mailboxId) { + return Flux.range(0, OFFSET_UNLIMITED) Review Comment: Idem we are selecting uid only so let's rather .collectList ? ########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java: ########## @@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> domain) { public static final SortField<Long> DEFAULT_SORT_ORDER_BY = MESSAGE_UID.asc(); - private static SelectFinalStep<Record1<Long>> selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, Condition extraCondition, Limit limit, DSLContext dslContext) { - SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq((mailboxId.asUuid()))) - .and(extraCondition) - .orderBy(MESSAGE_UID.asc()); - return limit.getLimit().map(limitValue -> (SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue)) - .orElse(queryWithoutLimit); - } - private final PostgresExecutor postgresExecutor; + private final int queryBatchSize; - public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) { + public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor, + int queryBatchSize) { this.postgresExecutor = postgresExecutor; + this.queryBatchSize = queryBatchSize; } public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRow(dslContext -> Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.limit(1), dslContext))) + return postgresExecutor.executeRow(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(1))) .map(RECORD_TO_MESSAGE_UID_FUNCTION); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_SEEN.eq(false), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_SEEN.eq(false)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(batchSize) + .offset(offset))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, MessageRange range) { - switch (range.getType()) { - case ALL: - return listUnseen(mailboxId); - case FROM: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case RANGE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) - .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - case ONE: - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) - .from(TABLE_NAME) - .where(MAILBOX_ID.eq(mailboxId.asUuid())) - .and(IS_SEEN.eq(false)) - .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) - .orderBy(DEFAULT_SORT_ORDER_BY))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); - default: - throw new RuntimeException("Unsupported range type " + range.getType()); - } + return switch (range.getType()) { + case ALL -> listUnseen(mailboxId); + case FROM -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case RANGE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong())) + .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + case ONE -> postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(IS_SEEN.eq(false)) + .and(MESSAGE_UID.eq(range.getUidFrom().asLong())) + .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH) + .map(RECORD_TO_MESSAGE_UID_FUNCTION); + }; } public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - IS_RECENT.eq(true), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> findAllRecentMessageUidBatch(mailboxId, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> findAllRecentMessageUidBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { + return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_UID) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq((mailboxId.asUuid()))) + .and(IS_RECENT.eq(true)) + .orderBy(DEFAULT_SORT_ORDER_BY) + .limit(batchSize) + .offset(offset))) + .map(RECORD_TO_MESSAGE_UID_FUNCTION) + .collectList() + .switchIfEmpty(Mono.just(ImmutableList.of())); } public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) { - return postgresExecutor.executeRows(dslContext -> Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId, - DSL.noCondition(), Limit.unlimited(), dslContext))) - .map(RECORD_TO_MESSAGE_UID_FUNCTION); + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listAllMessageUidBatch(mailboxId, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<MessageUid>> listAllMessageUidBatch(PostgresMailboxId mailboxId, int offset, int batchSize) { Review Comment: Same ########## backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java: ########## @@ -59,6 +59,8 @@ public class PostgresConfiguration { public static final String SSL_MODE_DEFAULT_VALUE = "allow"; public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout"; public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = Duration.ofSeconds(10); + public static final String QUERY_BATCH_SIZE = "query.batch.size"; + public static final int QUERY_BATCH_SIZE_DEFAULT_VALUE = 100; Review Comment: let's do atleast 5000 (cassandra batch size) ########## mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java: ########## @@ -202,11 +204,18 @@ public Flux<PostgresMailbox> findMailboxByIds(List<PostgresMailboxId> mailboxIds public Flux<PostgresMailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query); + Function<MailboxQuery.UserBound, Condition> getQueryCondition = mailboxQuery -> { Review Comment: I expect the count of mailboxes to be small (per user) few thousands at best. Let's just .collectList ? ########## server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java: ########## @@ -151,9 +165,20 @@ public Flux<BucketName> listBuckets() { @Override public Flux<BlobId> listBlobs(BucketName bucketName) { + return Flux.range(0, OFFSET_UNLIMITED) + .concatMap(offsetIndex -> listBlobsBatch(bucketName, offsetIndex * queryBatchSize, queryBatchSize)) + .takeUntil(List::isEmpty) + .flatMapIterable(Function.identity()); + } + + private Mono<List<BlobId>> listBlobsBatch(BucketName bucketName, int offset, int batchSize) { return postgresExecutor.executeRows(dsl -> Flux.from(dsl.select(BLOB_ID) .from(TABLE_NAME) - .where(BUCKET_NAME.eq(bucketName.asString())))) - .map(record -> blobIdFactory.parse(record.get(BLOB_ID))); + .where(BUCKET_NAME.eq(bucketName.asString())) + .limit(batchSize) + .offset(offset))) Review Comment: We are missing ordering to ensure the stability of the transformation. Also here we have a massive issue: the use case is to delete as we iterate. This means that, without snapshot reads, the writes would impact the reads and cause entries to be skept. (I predict GC with more blobs than batch size NOT to work well. We are a bit stuck here. I'd say only small size installs would b okay storing blobs in PG so we *could* get over it by just doing a collectList - serious deployments with millions of blobs will anyway choose S3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org