This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f49b5b06a7a01af46d921a276b95beb334289c14
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri Mar 20 10:53:46 2020 +0700

    JAMES-3105 Limit concurrency of mailbox counters recomputation
---
 .../cassandra/mail/task/RecomputeMailboxCountersService.java       | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index b057c93..e14d16e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -52,6 +52,9 @@ public class RecomputeMailboxCountersService {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RecomputeMailboxCountersService.class);
 
+    private static final int MAILBOX_CONCURRENCY = 2;
+    private static final int MESSAGE_CONCURRENCY = 8;
+
     private static class Counter {
         private final CassandraId mailboxId;
         private final AtomicLong total;
@@ -162,7 +165,7 @@ public class RecomputeMailboxCountersService {
 
     Mono<Result> recomputeMailboxCounters(Context context) {
         return mailboxDAO.retrieveAllMailboxes()
-            .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox))
+            .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox), 
MAILBOX_CONCURRENCY)
             .reduce(Result.COMPLETED, Task::combine)
             .onErrorResume(e -> {
                 LOGGER.error("Error listing mailboxes", e);
@@ -175,7 +178,7 @@ public class RecomputeMailboxCountersService {
         Counter counter = new Counter(mailboxId);
 
         return imapUidToMessageIdDAO.retrieveMessages(mailboxId, 
MessageRange.all())
-            .flatMap(message -> latestMetadata(mailboxId, message))
+            .flatMap(message -> latestMetadata(mailboxId, message), 
MESSAGE_CONCURRENCY)
             .doOnNext(counter::process)
             .then(Mono.defer(() -> 
counterDAO.resetCounters(counter.snapshot())))
             .then(Mono.just(Result.COMPLETED))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to