chibenwa commented on code in PR #2514:
URL: https://github.com/apache/james-project/pull/2514#discussion_r1853892580


##########
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/search/AllSearchOverrideTest.java:
##########
@@ -51,7 +51,7 @@ public class AllSearchOverrideTest {
     @BeforeEach
     void setUp() {
         postgresMessageDAO = new 
PostgresMessageDAO(postgresExtension.getDefaultPostgresExecutor(), new 
PlainBlobId.Factory());
-        postgresMailboxMessageDAO = new 
PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor());
+        postgresMailboxMessageDAO = new 
PostgresMailboxMessageDAO(postgresExtension.getDefaultPostgresExecutor(), 
postgresExtension.getExecutorFactory().queryBatchSize());

Review Comment:
   for all search overrides we only load a list of uids. I think .collectList 
would be better. At least we could start with it.



##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java:
##########
@@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> 
domain) {
 
     public static final SortField<Long> DEFAULT_SORT_ORDER_BY = 
MESSAGE_UID.asc();
 
-    private static SelectFinalStep<Record1<Long>> 
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, 
Condition extraCondition, Limit limit, DSLContext dslContext) {
-        SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = 
dslContext.select(MESSAGE_UID)
-            .from(TABLE_NAME)
-            .where(MAILBOX_ID.eq((mailboxId.asUuid())))
-            .and(extraCondition)
-            .orderBy(MESSAGE_UID.asc());
-        return limit.getLimit().map(limitValue -> 
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
-            .orElse(queryWithoutLimit);
-    }
-
     private final PostgresExecutor postgresExecutor;
+    private final int queryBatchSize;
 
-    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) {
+    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor,
+                                     int queryBatchSize) {
         this.postgresExecutor = postgresExecutor;
+        this.queryBatchSize = queryBatchSize;
     }
 
     public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRow(dslContext -> 
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+        return postgresExecutor.executeRow(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(1)))
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.unlimited(), dslContext)))
-            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * 
queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId 
mailboxId, int offset, int batchSize) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)

Review Comment:
   For uid only let's .collectList and not do application side batching.



##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java:
##########
@@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> 
domain) {
 
     public static final SortField<Long> DEFAULT_SORT_ORDER_BY = 
MESSAGE_UID.asc();
 
-    private static SelectFinalStep<Record1<Long>> 
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, 
Condition extraCondition, Limit limit, DSLContext dslContext) {
-        SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = 
dslContext.select(MESSAGE_UID)
-            .from(TABLE_NAME)
-            .where(MAILBOX_ID.eq((mailboxId.asUuid())))
-            .and(extraCondition)
-            .orderBy(MESSAGE_UID.asc());
-        return limit.getLimit().map(limitValue -> 
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
-            .orElse(queryWithoutLimit);
-    }
-
     private final PostgresExecutor postgresExecutor;
+    private final int queryBatchSize;
 
-    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) {
+    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor,
+                                     int queryBatchSize) {
         this.postgresExecutor = postgresExecutor;
+        this.queryBatchSize = queryBatchSize;
     }
 
     public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRow(dslContext -> 
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+        return postgresExecutor.executeRow(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(1)))
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.unlimited(), dslContext)))
-            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * 
queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId 
mailboxId, int offset, int batchSize) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(batchSize)
+                .offset(offset)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION)
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, 
MessageRange range) {
-        switch (range.getType()) {
-            case ALL:
-                return listUnseen(mailboxId);
-            case FROM:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case RANGE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        
.and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case ONE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            default:
-                throw new RuntimeException("Unsupported range type " + 
range.getType());
-        }
+        return switch (range.getType()) {
+            case ALL -> listUnseen(mailboxId);
+            case FROM -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case RANGE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case ONE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        };
     }
 
     public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId 
mailboxId) {

Review Comment:
   Same



##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java:
##########
@@ -345,33 +425,66 @@ public Flux<SimpleMailboxMessage.Builder> 
findMessagesByMailboxIdAndUIDs(Postgre
     }
 
     public Flux<MessageUid> findDeletedMessagesByMailboxId(PostgresMailboxId 
mailboxId) {
+        return Flux.range(0, OFFSET_UNLIMITED)

Review Comment:
   Idem we are selecting uid only so let's rather .collectList ?



##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java:
##########
@@ -117,78 +115,110 @@ public PostgresMailboxMessageDAO create(Optional<Domain> 
domain) {
 
     public static final SortField<Long> DEFAULT_SORT_ORDER_BY = 
MESSAGE_UID.asc();
 
-    private static SelectFinalStep<Record1<Long>> 
selectMessageUidByMailboxIdAndExtraConditionQuery(PostgresMailboxId mailboxId, 
Condition extraCondition, Limit limit, DSLContext dslContext) {
-        SelectSeekStep1<Record1<Long>, Long> queryWithoutLimit = 
dslContext.select(MESSAGE_UID)
-            .from(TABLE_NAME)
-            .where(MAILBOX_ID.eq((mailboxId.asUuid())))
-            .and(extraCondition)
-            .orderBy(MESSAGE_UID.asc());
-        return limit.getLimit().map(limitValue -> 
(SelectFinalStep<Record1<Long>>) queryWithoutLimit.limit(limitValue))
-            .orElse(queryWithoutLimit);
-    }
-
     private final PostgresExecutor postgresExecutor;
+    private final int queryBatchSize;
 
-    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor) {
+    public PostgresMailboxMessageDAO(PostgresExecutor postgresExecutor,
+                                     int queryBatchSize) {
         this.postgresExecutor = postgresExecutor;
+        this.queryBatchSize = queryBatchSize;
     }
 
     public Mono<MessageUid> findFirstUnseenMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRow(dslContext -> 
Mono.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.limit(1), dslContext)))
+        return postgresExecutor.executeRow(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(1)))
             .map(RECORD_TO_MESSAGE_UID_FUNCTION);
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_SEEN.eq(false), Limit.unlimited(), dslContext)))
-            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> listUnseenBatch(mailboxId, offsetIndex * 
queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<MessageUid>> listUnseenBatch(PostgresMailboxId 
mailboxId, int offset, int batchSize) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_SEEN.eq(false))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(batchSize)
+                .offset(offset)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION)
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<MessageUid> listUnseen(PostgresMailboxId mailboxId, 
MessageRange range) {
-        switch (range.getType()) {
-            case ALL:
-                return listUnseen(mailboxId);
-            case FROM:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case RANGE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
-                        
.and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            case ONE:
-                return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
-                        .from(TABLE_NAME)
-                        .where(MAILBOX_ID.eq(mailboxId.asUuid()))
-                        .and(IS_SEEN.eq(false))
-                        .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
-                        .orderBy(DEFAULT_SORT_ORDER_BY)))
-                    .map(RECORD_TO_MESSAGE_UID_FUNCTION);
-            default:
-                throw new RuntimeException("Unsupported range type " + 
range.getType());
-        }
+        return switch (range.getType()) {
+            case ALL -> listUnseen(mailboxId);
+            case FROM -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case RANGE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    
.and(MESSAGE_UID.greaterOrEqual(range.getUidFrom().asLong()))
+                    .and(MESSAGE_UID.lessOrEqual(range.getUidTo().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+            case ONE -> postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                    .from(TABLE_NAME)
+                    .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+                    .and(IS_SEEN.eq(false))
+                    .and(MESSAGE_UID.eq(range.getUidFrom().asLong()))
+                    .orderBy(DEFAULT_SORT_ORDER_BY)), EAGER_FETCH)
+                .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        };
     }
 
     public Flux<MessageUid> findAllRecentMessageUid(PostgresMailboxId 
mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                IS_RECENT.eq(true), Limit.unlimited(), dslContext)))
-            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> findAllRecentMessageUidBatch(mailboxId, 
offsetIndex * queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<MessageUid>> 
findAllRecentMessageUidBatch(PostgresMailboxId mailboxId, int offset, int 
batchSize) {
+        return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MESSAGE_UID)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq((mailboxId.asUuid())))
+                .and(IS_RECENT.eq(true))
+                .orderBy(DEFAULT_SORT_ORDER_BY)
+                .limit(batchSize)
+                .offset(offset)))
+            .map(RECORD_TO_MESSAGE_UID_FUNCTION)
+            .collectList()
+            .switchIfEmpty(Mono.just(ImmutableList.of()));
     }
 
     public Flux<MessageUid> listAllMessageUid(PostgresMailboxId mailboxId) {
-        return postgresExecutor.executeRows(dslContext -> 
Flux.from(selectMessageUidByMailboxIdAndExtraConditionQuery(mailboxId,
-                DSL.noCondition(), Limit.unlimited(), dslContext)))
-            .map(RECORD_TO_MESSAGE_UID_FUNCTION);
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> listAllMessageUidBatch(mailboxId, 
offsetIndex * queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<MessageUid>> listAllMessageUidBatch(PostgresMailboxId 
mailboxId, int offset, int batchSize) {

Review Comment:
   Same



##########
backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java:
##########
@@ -59,6 +59,8 @@ public class PostgresConfiguration {
     public static final String SSL_MODE_DEFAULT_VALUE = "allow";
     public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout";
     public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = 
Duration.ofSeconds(10);
+    public static final String QUERY_BATCH_SIZE = "query.batch.size";
+    public static final int QUERY_BATCH_SIZE_DEFAULT_VALUE = 100;

Review Comment:
   let's do atleast 5000 (cassandra batch size)



##########
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java:
##########
@@ -202,11 +204,18 @@ public Flux<PostgresMailbox> 
findMailboxByIds(List<PostgresMailboxId> mailboxIds
 
     public Flux<PostgresMailbox> 
findMailboxWithPathLike(MailboxQuery.UserBound query) {
         String pathLike = 
MailboxExpressionBackwardCompatibility.getPathLike(query);
+        Function<MailboxQuery.UserBound, Condition> getQueryCondition = 
mailboxQuery -> {

Review Comment:
   I expect the count of mailboxes to be small (per user) few thousands at best.
   
   Let's just .collectList ?



##########
server/blob/blob-postgres/src/main/java/org/apache/james/blob/postgres/PostgresBlobStoreDAO.java:
##########
@@ -151,9 +165,20 @@ public Flux<BucketName> listBuckets() {
 
     @Override
     public Flux<BlobId> listBlobs(BucketName bucketName) {
+        return Flux.range(0, OFFSET_UNLIMITED)
+            .concatMap(offsetIndex -> listBlobsBatch(bucketName, offsetIndex * 
queryBatchSize, queryBatchSize))
+            .takeUntil(List::isEmpty)
+            .flatMapIterable(Function.identity());
+    }
+
+    private Mono<List<BlobId>> listBlobsBatch(BucketName bucketName,  int 
offset, int batchSize) {
         return postgresExecutor.executeRows(dsl -> 
Flux.from(dsl.select(BLOB_ID)
                 .from(TABLE_NAME)
-                .where(BUCKET_NAME.eq(bucketName.asString()))))
-            .map(record -> blobIdFactory.parse(record.get(BLOB_ID)));
+                .where(BUCKET_NAME.eq(bucketName.asString()))
+                .limit(batchSize)
+                .offset(offset)))

Review Comment:
   We are missing ordering to ensure the stability of the transformation.
   
   Also here we have a massive issue: the use case is to delete as we iterate. 
This means that, without snapshot reads, the writes would impact the reads and 
cause entries to be skept.
   
   (I predict GC with more blobs than batch size NOT to work well.
   
   We are a bit stuck here. I'd say only small size installs would b okay 
storing blobs in PG so we *could* get over it by just doing a collectList  - 
serious deployments with millions of blobs will anyway choose S3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to