This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.8.x in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 660068a3e3da2e0ed604201d76f22ba2d8f18308 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jun 30 21:51:29 2023 +0700 JAMES-3924 Review CassandraMailQueueBrowser parallelism --- .../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 64f85be0b1..9c1241351d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -134,7 +134,7 @@ public class CassandraMailQueueBrowser { Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName, Instant browseStart) { return allSlicesStartingAt(browseStart) - .flatMapSequential(slice -> browseSlice(queueName, slice), 4); + .concatMap(slice -> browseSlice(queueName, slice)); } private Mono<Pair<EnqueuedItem, Mail>> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { @@ -156,15 +156,14 @@ public class CassandraMailQueueBrowser { } private Flux<EnqueuedItemWithSlicingContext> browseSlice(MailQueueName queueName, Slice slice) { - return - allBucketIds() - .concatMap(bucketId -> browseBucket(queueName, slice, bucketId), DEFAULT_CONCURRENCY) - .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); + return allBucketIds() + .concatMap(bucketId -> browseBucket(queueName, slice, bucketId), 4) + .sort(Comparator.comparing(enqueuedMail -> enqueuedMail.getEnqueuedItem().getEnqueuedTime())); } private Flux<EnqueuedItemWithSlicingContext> browseBucket(MailQueueName queueName, Slice slice, BucketId bucketId) { return enqueuedMailsDao.selectEnqueuedMails(queueName, slice, bucketId) - .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), DEFAULT_CONCURRENCY); + .filterWhen(mailReference -> deletedMailsDao.isStillEnqueued(queueName, mailReference.getEnqueuedItem().getEnqueueId()), 4); } private Flux<Slice> allSlicesStartingAt(Instant browseStart) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
