This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 85be29a36caf5621af395b249daff3963a438387
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri May 22 15:35:46 2020 +0700

    JAMES-3184 Back-pressure simplest implementation for throttling
---
 .../quota/task/RecomputeCurrentQuotasService.java  |  5 +-
 .../java/org/apache/james/util/ReactorUtils.java   | 42 +++++++++++++--
 .../org/apache/james/util/ReactorUtilsTest.java    | 61 +++++++++++++++++++++-
 .../jmap/MessageFastViewProjectionCorrector.java   |  5 +-
 4 files changed, 102 insertions(+), 11 deletions(-)

diff --git 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
index dbb5078..31ea4bd 100644
--- 
a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
+++ 
b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java
@@ -140,8 +140,9 @@ public class RecomputeCurrentQuotasService {
 
     public Mono<Task.Result> recomputeCurrentQuotas(Context context) {
         try {
-            return Iterators.toFlux(usersRepository.list())
-                .concatMap(username -> recomputeUserCurrentQuotas(context, 
username))
+            Flux<Username> users = Iterators.toFlux(usersRepository.list());
+            return ReactorUtils.throttle(users, Duration.ofSeconds(1), 
runningOptions.getUsersPerSecond())
+                .flatMap(username -> recomputeUserCurrentQuotas(context, 
username), runningOptions.getUsersPerSecond())
                 .reduce(Task.Result.COMPLETED, Task::combine);
         } catch (UsersRepositoryException e) {
             LOGGER.error("Error while accessing users from repository", e);
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index bc4bc61..a7bed0e 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -27,6 +27,11 @@ import java.util.Iterator;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.reactivestreams.Publisher;
+
+import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -38,12 +43,39 @@ import reactor.util.function.Tuple2;
 public class ReactorUtils {
 
     public static final String MDC_KEY_PREFIX = "MDC-";
-    private static final Duration DELAY = Duration.ZERO;
 
-    public static <T> Flux<T> throttle(Flux<T> flux, Duration windowDuration, 
int windowMaxSize) {
-        return flux.windowTimeout(windowMaxSize, windowDuration)
-            .zipWith(Flux.interval(DELAY, windowDuration))
-            .flatMap(Tuple2::getT1);
+    public static class Throttler<T, U> {
+        private static final Duration DELAY = Duration.ZERO;
+
+        public static <T, U> RequiresWindowingParameters<T, U> 
forOperation(Function<T, Publisher<U>> operation) {
+            return (maxSize, duration) -> new Throttler<>(operation, maxSize, 
duration);
+        }
+
+        @FunctionalInterface
+        public interface RequiresWindowingParameters<T, U> {
+            Throttler<T, U> window(int maxSize, Duration duration);
+        }
+
+        private Throttler(Function<T, Publisher<U>> operation, int 
windowMaxSize, Duration windowDuration) {
+            Preconditions.checkArgument(windowMaxSize > 0, "'windowMaxSize' 
must be strictly positive");
+            Preconditions.checkArgument(!windowDuration.isNegative(), 
"'windowDuration' must be strictly positive");
+            Preconditions.checkArgument(!windowDuration.isZero(), 
"'windowDuration' must be strictly positive");
+
+            this.operation = operation;
+            this.windowMaxSize = windowMaxSize;
+            this.windowDuration = windowDuration;
+        }
+
+        private final Function<T, Publisher<U>> operation;
+        private final int windowMaxSize;
+        private final Duration windowDuration;
+
+        public Flux<U> throttle(Flux<T> flux) {
+            return flux.windowTimeout(windowMaxSize, windowDuration)
+                .zipWith(Flux.interval(DELAY, windowDuration))
+                .flatMap(Tuple2::getT1)
+                .flatMap(operation, windowMaxSize);
+        }
     }
 
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index c0e6819..984c165 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -19,6 +19,7 @@
 package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -29,11 +30,14 @@ import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.james.util.ReactorUtils.Throttler;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
 import org.slf4j.MDC;
 
 import com.github.steveash.guavate.Guavate;
@@ -51,6 +55,34 @@ class ReactorUtilsTest {
     @Nested
     class Throttling {
         @Test
+        void windowShouldThrowWhenMaxSizeIsNegative() {
+            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
+                    .window(-1, Duration.ofSeconds(1)))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        void windowShouldThrowWhenMaxSizeIsZero() {
+            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
+                    .window(0, Duration.ofSeconds(1)))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        void windowShouldThrowWhenDurationIsNegative() {
+            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
+                    .window(1, Duration.ofSeconds(-1)))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        void windowShouldThrowWhenDurationIsZero() {
+            assertThatThrownBy(() -> Throttler.forOperation(Mono::just)
+                    .window(1, Duration.ZERO))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
         void throttleShouldApplyMaxSize() {
             int windowMaxSize = 3;
             Duration windowDuration = Duration.ofMillis(100);
@@ -58,9 +90,11 @@ class ReactorUtilsTest {
             Stopwatch stopwatch = Stopwatch.createUnstarted();
 
             Flux<Integer> originalFlux = Flux.range(0, 10);
-            ImmutableList<Long> windowMembership = 
ReactorUtils.throttle(originalFlux, windowDuration, windowMaxSize)
+            ImmutableList<Long> windowMembership = Throttler.<Integer, 
Long>forOperation(
+                    i -> Mono.fromCallable(() -> 
stopwatch.elapsed(TimeUnit.MILLISECONDS)))
+                .window(windowMaxSize, windowDuration)
+                .throttle(originalFlux)
                 .doOnSubscribe(signal -> stopwatch.start())
-                .map(i -> stopwatch.elapsed(TimeUnit.MILLISECONDS))
                 .map(i -> i / 100)
                 .collect(Guavate.toImmutableList())
                 .block();
@@ -68,6 +102,29 @@ class ReactorUtilsTest {
             assertThat(windowMembership)
                 .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L);
         }
+
+        @Test
+        void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            AtomicInteger ongoingProcessing = new AtomicInteger();
+
+            Flux<Integer> originalFlux = Flux.range(0, 10);
+            Function<Integer, Publisher<Integer>> longRunningOperation =
+                any -> Mono.fromCallable(ongoingProcessing::incrementAndGet)
+                    .flatMap(i -> 
Mono.delay(windowDuration.multipliedBy(2)).thenReturn(i))
+                    .flatMap(i -> 
Mono.fromRunnable(ongoingProcessing::decrementAndGet).thenReturn(i));
+
+            ImmutableList<Integer> ongoingProcessingUponComputationStart = 
Throttler.forOperation(longRunningOperation)
+                .window(windowMaxSize, windowDuration)
+                .throttle(originalFlux)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(ongoingProcessingUponComputationStart)
+                .allSatisfy(processingCount -> 
assertThat(processingCount).isLessThanOrEqualTo(windowMaxSize));
+        }
     }
 
     @Nested
diff --git 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index cc8bb06..bd851f7 100644
--- 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -216,8 +216,9 @@ public class MessageFastViewProjectionCorrector {
     }
 
     private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, 
RunningOptions runningOptions, Progress progress) {
-        return ReactorUtils.throttle(entries, PERIOD, 
runningOptions.getMessagesPerSecond())
-            .flatMap(entry -> correctProjection(entry, progress))
+        return ReactorUtils.Throttler.<ProjectionEntry, 
Result>forOperation(entry -> correctProjection(entry, progress))
+            .window(runningOptions.getMessagesPerSecond(), PERIOD)
+            .throttle(entries)
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to