This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 85be29a36caf5621af395b249daff3963a438387 Author: Benoit Tellier <[email protected]> AuthorDate: Fri May 22 15:35:46 2020 +0700 JAMES-3184 Back-pressure simplest implementation for throttling --- .../quota/task/RecomputeCurrentQuotasService.java | 5 +- .../java/org/apache/james/util/ReactorUtils.java | 42 +++++++++++++-- .../org/apache/james/util/ReactorUtilsTest.java | 61 +++++++++++++++++++++- .../jmap/MessageFastViewProjectionCorrector.java | 5 +- 4 files changed, 102 insertions(+), 11 deletions(-) diff --git a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java index dbb5078..31ea4bd 100644 --- a/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java +++ b/mailbox/tools/quota-recompute/src/main/java/org/apache/james/mailbox/quota/task/RecomputeCurrentQuotasService.java @@ -140,8 +140,9 @@ public class RecomputeCurrentQuotasService { public Mono<Task.Result> recomputeCurrentQuotas(Context context) { try { - return Iterators.toFlux(usersRepository.list()) - .concatMap(username -> recomputeUserCurrentQuotas(context, username)) + Flux<Username> users = Iterators.toFlux(usersRepository.list()); + return ReactorUtils.throttle(users, Duration.ofSeconds(1), runningOptions.getUsersPerSecond()) + .flatMap(username -> recomputeUserCurrentQuotas(context, username), runningOptions.getUsersPerSecond()) .reduce(Task.Result.COMPLETED, Task::combine); } catch (UsersRepositoryException e) { LOGGER.error("Error while accessing users from repository", e); diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index bc4bc61..a7bed0e 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -27,6 +27,11 @@ import java.util.Iterator; import java.util.Optional; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Function; + +import org.reactivestreams.Publisher; + +import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,12 +43,39 @@ import reactor.util.function.Tuple2; public class ReactorUtils { public static final String MDC_KEY_PREFIX = "MDC-"; - private static final Duration DELAY = Duration.ZERO; - public static <T> Flux<T> throttle(Flux<T> flux, Duration windowDuration, int windowMaxSize) { - return flux.windowTimeout(windowMaxSize, windowDuration) - .zipWith(Flux.interval(DELAY, windowDuration)) - .flatMap(Tuple2::getT1); + public static class Throttler<T, U> { + private static final Duration DELAY = Duration.ZERO; + + public static <T, U> RequiresWindowingParameters<T, U> forOperation(Function<T, Publisher<U>> operation) { + return (maxSize, duration) -> new Throttler<>(operation, maxSize, duration); + } + + @FunctionalInterface + public interface RequiresWindowingParameters<T, U> { + Throttler<T, U> window(int maxSize, Duration duration); + } + + private Throttler(Function<T, Publisher<U>> operation, int windowMaxSize, Duration windowDuration) { + Preconditions.checkArgument(windowMaxSize > 0, "'windowMaxSize' must be strictly positive"); + Preconditions.checkArgument(!windowDuration.isNegative(), "'windowDuration' must be strictly positive"); + Preconditions.checkArgument(!windowDuration.isZero(), "'windowDuration' must be strictly positive"); + + this.operation = operation; + this.windowMaxSize = windowMaxSize; + this.windowDuration = windowDuration; + } + + private final Function<T, Publisher<U>> operation; + private final int windowMaxSize; + private final Duration windowDuration; + + public Flux<U> throttle(Flux<T> flux) { + return flux.windowTimeout(windowMaxSize, windowDuration) + .zipWith(Flux.interval(DELAY, windowDuration)) + .flatMap(Tuple2::getT1) + .flatMap(operation, windowMaxSize); + } } public static <T> Mono<T> executeAndEmpty(Runnable runnable) { diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index c0e6819..984c165 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -19,6 +19,7 @@ package org.apache.james.util; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -29,11 +30,14 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.james.util.ReactorUtils.Throttler; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import org.slf4j.MDC; import com.github.steveash.guavate.Guavate; @@ -51,6 +55,34 @@ class ReactorUtilsTest { @Nested class Throttling { @Test + void windowShouldThrowWhenMaxSizeIsNegative() { + assertThatThrownBy(() -> Throttler.forOperation(Mono::just) + .window(-1, Duration.ofSeconds(1))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void windowShouldThrowWhenMaxSizeIsZero() { + assertThatThrownBy(() -> Throttler.forOperation(Mono::just) + .window(0, Duration.ofSeconds(1))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void windowShouldThrowWhenDurationIsNegative() { + assertThatThrownBy(() -> Throttler.forOperation(Mono::just) + .window(1, Duration.ofSeconds(-1))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void windowShouldThrowWhenDurationIsZero() { + assertThatThrownBy(() -> Throttler.forOperation(Mono::just) + .window(1, Duration.ZERO)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test void throttleShouldApplyMaxSize() { int windowMaxSize = 3; Duration windowDuration = Duration.ofMillis(100); @@ -58,9 +90,11 @@ class ReactorUtilsTest { Stopwatch stopwatch = Stopwatch.createUnstarted(); Flux<Integer> originalFlux = Flux.range(0, 10); - ImmutableList<Long> windowMembership = ReactorUtils.throttle(originalFlux, windowDuration, windowMaxSize) + ImmutableList<Long> windowMembership = Throttler.<Integer, Long>forOperation( + i -> Mono.fromCallable(() -> stopwatch.elapsed(TimeUnit.MILLISECONDS))) + .window(windowMaxSize, windowDuration) + .throttle(originalFlux) .doOnSubscribe(signal -> stopwatch.start()) - .map(i -> stopwatch.elapsed(TimeUnit.MILLISECONDS)) .map(i -> i / 100) .collect(Guavate.toImmutableList()) .block(); @@ -68,6 +102,29 @@ class ReactorUtilsTest { assertThat(windowMembership) .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L); } + + @Test + void throttleDownStreamConcurrencyShouldNotExceedWindowMaxSize() { + int windowMaxSize = 3; + Duration windowDuration = Duration.ofMillis(100); + + AtomicInteger ongoingProcessing = new AtomicInteger(); + + Flux<Integer> originalFlux = Flux.range(0, 10); + Function<Integer, Publisher<Integer>> longRunningOperation = + any -> Mono.fromCallable(ongoingProcessing::incrementAndGet) + .flatMap(i -> Mono.delay(windowDuration.multipliedBy(2)).thenReturn(i)) + .flatMap(i -> Mono.fromRunnable(ongoingProcessing::decrementAndGet).thenReturn(i)); + + ImmutableList<Integer> ongoingProcessingUponComputationStart = Throttler.forOperation(longRunningOperation) + .window(windowMaxSize, windowDuration) + .throttle(originalFlux) + .collect(Guavate.toImmutableList()) + .block(); + + assertThat(ongoingProcessingUponComputationStart) + .allSatisfy(processingCount -> assertThat(processingCount).isLessThanOrEqualTo(windowMaxSize)); + } } @Nested diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java index cc8bb06..bd851f7 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java +++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java @@ -216,8 +216,9 @@ public class MessageFastViewProjectionCorrector { } private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) { - return ReactorUtils.throttle(entries, PERIOD, runningOptions.getMessagesPerSecond()) - .flatMap(entry -> correctProjection(entry, progress)) + return ReactorUtils.Throttler.<ProjectionEntry, Result>forOperation(entry -> correctProjection(entry, progress)) + .window(runningOptions.getMessagesPerSecond(), PERIOD) + .throttle(entries) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
