This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 41e10c4556bf39b3161e586bc4a95db26f15b8e0
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 21 14:51:28 2020 +0700

    JAMES-3184 RecomputeCurrentQuotasService RunningOptions
---
 ...CassandraRecomputeCurrentQuotasServiceTest.java |  5 ++--
 .../quota/task/RecomputeCurrentQuotasService.java  | 31 +++++++++++++++++++---
 .../quota/task/RecomputeCurrentQuotasTask.java     |  3 ++-
 .../RecomputeCurrentQuotasServiceContract.java     | 21 ++++++++-------
 4 files changed, 44 insertions(+), 16 deletions(-)

diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
index c2335c7..89912a3 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/task/CassandraRecomputeCurrentQuotasServiceTest.java
@@ -37,6 +37,7 @@ import 
org.apache.james.mailbox.cassandra.mail.MailboxAggregateModule;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.UserQuotaRootResolver;
 import org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService;
+import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions;
 import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasServiceContract;
 import org.apache.james.mailbox.store.StoreMailboxManager;
 import org.apache.james.mailbox.store.quota.CurrentQuotaCalculator;
@@ -132,7 +133,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest 
implements RecomputeCurr
             .times(1)
             .whenQueryStartsWith("UPDATE currentQuota SET"));
 
-        assertThat(testee().recomputeCurrentQuotas(new 
RecomputeCurrentQuotasService.Context()).block())
+        assertThat(testee().recomputeCurrentQuotas(new 
RecomputeCurrentQuotasService.Context(), RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.PARTIAL);
     }
 
@@ -151,7 +152,7 @@ public class CassandraRecomputeCurrentQuotasServiceTest 
implements RecomputeCurr
             .whenQueryStartsWith("UPDATE currentQuota SET"));
 
         RecomputeCurrentQuotasService.Context context = new 
RecomputeCurrentQuotasService.Context();
-        testee().recomputeCurrentQuotas(context).block();
+        testee().recomputeCurrentQuotas(context, 
RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(new RecomputeCurrentQuotasService.Context(0L,
diff --git 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index 31ea4bd..2d9095b 100644
--- 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.quota.task;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -37,18 +38,41 @@ import 
org.apache.james.mailbox.store.quota.CurrentQuotaCalculator;
 import org.apache.james.task.Task;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class RecomputeCurrentQuotasService {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RecomputeCurrentQuotasService.class);
 
+    public static class RunningOptions {
+        public static RunningOptions withUsersPerSecond(int usersPerSecond) {
+            return new RunningOptions(usersPerSecond);
+        }
+
+        public static final RunningOptions DEFAULT = withUsersPerSecond(1);
+
+        private final int usersPerSecond;
+
+        private RunningOptions(int usersPerSecond) {
+            Preconditions.checkArgument(usersPerSecond > 0, "'usersPerSecond' 
needs to be strictly positive");
+
+            this.usersPerSecond = usersPerSecond;
+        }
+
+        public int getUsersPerSecond() {
+            return usersPerSecond;
+        }
+    }
+
     public static class Context {
         static class Snapshot {
             private final long processedQuotaRootCount;
@@ -138,11 +162,12 @@ public class RecomputeCurrentQuotasService {
         this.sessionProvider = sessionProvider;
     }
 
-    public Mono<Task.Result> recomputeCurrentQuotas(Context context) {
+    public Mono<Task.Result> recomputeCurrentQuotas(Context context, 
RunningOptions runningOptions) {
         try {
             Flux<Username> users = Iterators.toFlux(usersRepository.list());
-            return ReactorUtils.throttle(users, Duration.ofSeconds(1), 
runningOptions.getUsersPerSecond())
-                .flatMap(username -> recomputeUserCurrentQuotas(context, 
username), runningOptions.getUsersPerSecond())
+            return ReactorUtils.Throttler.<Username, 
Task.Result>forOperation(username -> recomputeUserCurrentQuotas(context, 
username))
+                .window(runningOptions.getUsersPerSecond(), 
Duration.ofSeconds(1))
+                .throttle(users)
                 .reduce(Task.Result.COMPLETED, Task::combine);
         } catch (UsersRepositoryException e) {
             LOGGER.error("Error while accessing users from repository", e);
diff --git 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java
 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java
index 050737b..82d142d 100644
--- 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java
+++ 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasTask.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import org.apache.james.mailbox.model.QuotaRoot;
 import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context;
 import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context.Snapshot;
+import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskType;
@@ -77,7 +78,7 @@ public class RecomputeCurrentQuotasTask implements Task {
 
     @Override
     public Task.Result run() {
-        return service.recomputeCurrentQuotas(context)
+        return service.recomputeCurrentQuotas(context, RunningOptions.DEFAULT)
             .subscribeOn(Schedulers.elastic())
             .block();
     }
diff --git 
a/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java
 
b/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java
index f5784ac..95c2fc3 100644
--- 
a/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java
+++ 
b/mailbox/tools/quota-recompute/src/test/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasServiceContract.java
@@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.QuotaRoot;
 import org.apache.james.mailbox.quota.CurrentQuotaManager;
 import org.apache.james.mailbox.quota.UserQuotaRootResolver;
 import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.Context;
+import 
org.apache.james.mailbox.quota.task.RecomputeCurrentQuotasService.RunningOptions;
 import org.apache.james.mime4j.dom.Message;
 import org.apache.james.task.Task;
 import org.apache.james.user.api.UsersRepository;
@@ -61,7 +62,7 @@ public interface RecomputeCurrentQuotasServiceContract {
 
     @Test
     default void recomputeCurrentQuotasShouldReturnCompleteWhenNoData() {
-        assertThat(testee().recomputeCurrentQuotas(new Context()).block())
+        assertThat(testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
@@ -69,7 +70,7 @@ public interface RecomputeCurrentQuotasServiceContract {
     default void 
recomputeCurrentQuotasShouldReturnCompleteWhenUserWithNoMessage() throws 
Exception {
         usersRepository().addUser(USER_1, PASSWORD);
 
-        assertThat(testee().recomputeCurrentQuotas(new Context()).block())
+        assertThat(testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
@@ -77,7 +78,7 @@ public interface RecomputeCurrentQuotasServiceContract {
     default void 
recomputeCurrentQuotasShouldComputeEmptyQuotasWhenUserWithNoMessage() throws 
Exception {
         usersRepository().addUser(USER_1, PASSWORD);
 
-        testee().recomputeCurrentQuotas(new Context()).block();
+        testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block();
 
         
assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block())
             .isEqualTo(CurrentQuotas.emptyQuotas());
@@ -93,7 +94,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         MessageManager messageManager = 
mailboxManager().getMailbox(MAILBOX_PATH, session);
         appendAMessageForUser(messageManager, session);
 
-        assertThat(testee().recomputeCurrentQuotas(new Context()).block())
+        assertThat(testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block())
             .isEqualTo(Task.Result.COMPLETED);
     }
 
@@ -107,7 +108,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         MessageManager messageManager = 
mailboxManager().getMailbox(MAILBOX_PATH, session);
         appendAMessageForUser(messageManager, session);
 
-        testee().recomputeCurrentQuotas(new Context()).block();
+        testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block();
 
         
assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block())
             .isEqualTo(EXPECTED_QUOTAS);
@@ -128,7 +129,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         QuotaOperation operation = new QuotaOperation(quotaRoot, 
QuotaCountUsage.count(3L), QuotaSizeUsage.size(390L));
         Mono.from(currentQuotaManager().increase(operation)).block();
 
-        testee().recomputeCurrentQuotas(new Context()).block();
+        testee().recomputeCurrentQuotas(new Context(), 
RunningOptions.DEFAULT).block();
 
         
assertThat(Mono.from(currentQuotaManager().getCurrentQuotas(userQuotaRootResolver().forUser(USER_1))).block())
             .isEqualTo(EXPECTED_QUOTAS);
@@ -137,7 +138,7 @@ public interface RecomputeCurrentQuotasServiceContract {
     @Test
     default void recomputeCurrentQuotasShouldNotUpdateContextWhenNoData() {
         Context context = new Context();
-        testee().recomputeCurrentQuotas(context).block();
+        testee().recomputeCurrentQuotas(context, 
RunningOptions.DEFAULT).block();
 
         
assertThat(context.snapshot()).isEqualToComparingFieldByFieldRecursively(new 
Context().snapshot());
     }
@@ -147,7 +148,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         usersRepository().addUser(USER_1, PASSWORD);
 
         Context context = new Context();
-        testee().recomputeCurrentQuotas(context).block();
+        testee().recomputeCurrentQuotas(context, 
RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(new Context(1L, ImmutableList.of()).snapshot());
@@ -164,7 +165,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         appendAMessageForUser(messageManager, session);
 
         Context context = new Context();
-        testee().recomputeCurrentQuotas(context).block();
+        testee().recomputeCurrentQuotas(context, 
RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(new Context(1L, ImmutableList.of()).snapshot());
@@ -187,7 +188,7 @@ public interface RecomputeCurrentQuotasServiceContract {
         Mono.from(currentQuotaManager().increase(operation)).block();
 
         Context context = new Context();
-        testee().recomputeCurrentQuotas(context).block();
+        testee().recomputeCurrentQuotas(context, 
RunningOptions.DEFAULT).block();
 
         assertThat(context.snapshot())
             .isEqualTo(new Context(2L, ImmutableList.of()).snapshot());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to