This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f49b5b06a7a01af46d921a276b95beb334289c14 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Mar 20 10:53:46 2020 +0700 JAMES-3105 Limit concurrency of mailbox counters recomputation --- .../cassandra/mail/task/RecomputeMailboxCountersService.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java index b057c93..e14d16e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java @@ -52,6 +52,9 @@ public class RecomputeMailboxCountersService { private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeMailboxCountersService.class); + private static final int MAILBOX_CONCURRENCY = 2; + private static final int MESSAGE_CONCURRENCY = 8; + private static class Counter { private final CassandraId mailboxId; private final AtomicLong total; @@ -162,7 +165,7 @@ public class RecomputeMailboxCountersService { Mono<Result> recomputeMailboxCounters(Context context) { return mailboxDAO.retrieveAllMailboxes() - .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox)) + .flatMap(mailbox -> recomputeMailboxCounter(context, mailbox), MAILBOX_CONCURRENCY) .reduce(Result.COMPLETED, Task::combine) .onErrorResume(e -> { LOGGER.error("Error listing mailboxes", e); @@ -175,7 +178,7 @@ public class RecomputeMailboxCountersService { Counter counter = new Counter(mailboxId); return imapUidToMessageIdDAO.retrieveMessages(mailboxId, MessageRange.all()) - .flatMap(message -> latestMetadata(mailboxId, message)) + .flatMap(message -> latestMetadata(mailboxId, message), MESSAGE_CONCURRENCY) .doOnNext(counter::process) .then(Mono.defer(() -> counterDAO.resetCounters(counter.snapshot()))) .then(Mono.just(Result.COMPLETED)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
