This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 41e10c4556bf39b3161e586bc4a95db26f15b8e0 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 21 14:51:28 2020 +0700 JAMES-3184 RecomputeCurrentQuotasService RunningOptions --- ...CassandraRecomputeCurrentQuotasServiceTest.java | 5 ++-- .../quota/task/RecomputeCurrentQuotasService.java | 31 +++++++++++++++++++--- .../quota/task/RecomputeCurrentQuotasTask.java | 3 ++- .../RecomputeCurrentQuotasServiceContract.java | 21 ++++++++------- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java index c2335c7..89912a3 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule; import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService; +import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions; import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasServiceContract; import org.apache.james.mailbox.store.StoreMailboxManager; import org.apache.james.mailbox.store.quota.CurrentQuotaCalculator; @@ -132,7 +133,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest implements RecomputeCurr .times(1) .whenQueryStartsWith("UPDATE currentQuota SET")); - assertThat(testee().recomputeCurrentQuotas(new RecomputeCurrentQuotasService.Context()).block()) + assertThat(testee().recomputeCurrentQuotas(new RecomputeCurrentQuotasService.Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.PARTIAL); } @@ -151,7 +152,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest implements RecomputeCurr .whenQueryStartsWith("UPDATE currentQuota SET")); RecomputeCurrentQuotasService.Context context = new RecomputeCurrentQuotasService.Context(); - testee().recomputeCurrentQuotas(context).block(); + testee().recomputeCurrentQuotas(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(new RecomputeCurrentQuotasService.Context(0L, diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java index 31ea4bd..2d9095b 100644 --- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java +++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.quota.task; +import java.time.Duration; import java.util.Collection; import java.util.Objects; import java.util.concurrent.ConcurrentLinkedDeque; @@ -37,18 +38,41 @@ import org.apache.james.mailbox.store.quota.CurrentQuotaCalculator; import org.apache.james.task.Task; import org.apache.james.user.api.UsersRepository; import org.apache.james.user.api.UsersRepositoryException; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.Iterators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class RecomputeCurrentQuotasService { private static final Logger LOGGER = LoggerFactory.getLogger(RecomputeCurrentQuotasService.class); + public static class RunningOptions { + public static RunningOptions withUsersPerSecond(int usersPerSecond) { + return new RunningOptions(usersPerSecond); + } + + public static final RunningOptions DEFAULT = withUsersPerSecond(1); + + private final int usersPerSecond; + + private RunningOptions(int usersPerSecond) { + Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' needs to be strictly positive"); + + this.usersPerSecond = usersPerSecond; + } + + public int getUsersPerSecond() { + return usersPerSecond; + } + } + public static class Context { static class Snapshot { private final long processedQuotaRootCount; @@ -138,11 +162,12 @@ public class RecomputeCurrentQuotasService { this.sessionProvider = sessionProvider; } - public Mono<Task.Result> recomputeCurrentQuotas(Context context) { + public Mono<Task.Result> recomputeCurrentQuotas(Context context, RunningOptions runningOptions) { try { Flux<Username> users = Iterators.toFlux(usersRepository.list()); - return ReactorUtils.throttle(users, Duration.ofSeconds(1), runningOptions.getUsersPerSecond()) - .flatMap(username -> recomputeUserCurrentQuotas(context, username), runningOptions.getUsersPerSecond()) + return ReactorUtils.Throttler.<Username, Task.Result>forOperation(username -> recomputeUserCurrentQuotas(context, username)) + .window(runningOptions.getUsersPerSecond(), Duration.ofSeconds(1)) + .throttle(users) .reduce(Task.Result.COMPLETED, Task::combine); } catch (UsersRepositoryException e) { LOGGER.error("Error while accessing users from repository", e); diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java index 050737b..82d142d 100644 --- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java +++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context; import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context.Snapshot; +import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskType; @@ -77,7 +78,7 @@ public class RecomputeCurrentQuotasTask implements Task { @Override public Task.Result run() { - return service.recomputeCurrentQuotas(context) + return service.recomputeCurrentQuotas(context, RunningOptions.DEFAULT) .subscribeOn(Schedulers.elastic()) .block(); } diff --git a/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java b/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java index f5784ac..95c2fc3 100644 --- a/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java +++ b/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.QuotaRoot; import org.apache.james.mailbox.quota.CurrentQuotaManager; import org.apache.james.mailbox.quota.UserQuotaRootResolver; import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context; +import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions; import org.apache.james.mime4j.dom.Message; import org.apache.james.task.Task; import org.apache.james.user.api.UsersRepository; @@ -61,7 +62,7 @@ public interface RecomputeCurrentQuotasServiceContract { @Test default void recomputeCurrentQuotasShouldReturnCompleteWhenNoData() { - assertThat(testee().recomputeCurrentQuotas(new Context()).block()) + assertThat(testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -69,7 +70,7 @@ public interface RecomputeCurrentQuotasServiceContract { default void recomputeCurrentQuotasShouldReturnCompleteWhenUserWithNoMessage() throws Exception { usersRepository().addUser(USER_1, PASSWORD); - assertThat(testee().recomputeCurrentQuotas(new Context()).block()) + assertThat(testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -77,7 +78,7 @@ public interface RecomputeCurrentQuotasServiceContract { default void recomputeCurrentQuotasShouldComputeEmptyQuotasWhenUserWithNoMessage() throws Exception { usersRepository().addUser(USER_1, PASSWORD); - testee().recomputeCurrentQuotas(new Context()).block(); + testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block(); assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block()) .isEqualTo(CurrentQuotas.emptyQuotas()); @@ -93,7 +94,7 @@ public interface RecomputeCurrentQuotasServiceContract { MessageManager messageManager = mailboxManager().getMailbox(MAILBOX_PATH, session); appendAMessageForUser(messageManager, session); - assertThat(testee().recomputeCurrentQuotas(new Context()).block()) + assertThat(testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block()) .isEqualTo(Task.Result.COMPLETED); } @@ -107,7 +108,7 @@ public interface RecomputeCurrentQuotasServiceContract { MessageManager messageManager = mailboxManager().getMailbox(MAILBOX_PATH, session); appendAMessageForUser(messageManager, session); - testee().recomputeCurrentQuotas(new Context()).block(); + testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block(); assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block()) .isEqualTo(EXPECTED_QUOTAS); @@ -128,7 +129,7 @@ public interface RecomputeCurrentQuotasServiceContract { QuotaOperation operation = new QuotaOperation(quotaRoot, QuotaCountUsage.count(3L), QuotaSizeUsage.size(390L)); Mono.from(currentQuotaManager().increase(operation)).block(); - testee().recomputeCurrentQuotas(new Context()).block(); + testee().recomputeCurrentQuotas(new Context(), RunningOptions.DEFAULT).block(); assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block()) .isEqualTo(EXPECTED_QUOTAS); @@ -137,7 +138,7 @@ public interface RecomputeCurrentQuotasServiceContract { @Test default void recomputeCurrentQuotasShouldNotUpdateContextWhenNoData() { Context context = new Context(); - testee().recomputeCurrentQuotas(context).block(); + testee().recomputeCurrentQuotas(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new Context().snapshot()); } @@ -147,7 +148,7 @@ public interface RecomputeCurrentQuotasServiceContract { usersRepository().addUser(USER_1, PASSWORD); Context context = new Context(); - testee().recomputeCurrentQuotas(context).block(); + testee().recomputeCurrentQuotas(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(new Context(1L, ImmutableList.of()).snapshot()); @@ -164,7 +165,7 @@ public interface RecomputeCurrentQuotasServiceContract { appendAMessageForUser(messageManager, session); Context context = new Context(); - testee().recomputeCurrentQuotas(context).block(); + testee().recomputeCurrentQuotas(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(new Context(1L, ImmutableList.of()).snapshot()); @@ -187,7 +188,7 @@ public interface RecomputeCurrentQuotasServiceContract { Mono.from(currentQuotaManager().increase(operation)).block(); Context context = new Context(); - testee().recomputeCurrentQuotas(context).block(); + testee().recomputeCurrentQuotas(context, RunningOptions.DEFAULT).block(); assertThat(context.snapshot()) .isEqualTo(new Context(2L, ImmutableList.of()).snapshot()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
