This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ffb22b9e2c0d00f3f9f3d07d1d3c70d4fc9518ec Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 21 14:42:25 2020 +0700 JAMES-3184 ReactorUtils::throttle --- .../java/org/apache/james/util/ReactorUtils.java | 9 ++++++++ .../org/apache/james/util/ReactorUtilsTest.java | 27 +++++++++++++++++++++- .../jmap/MessageFastViewProjectionCorrector.java | 10 ++------ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index a477355..bc4bc61 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.Iterator; import java.util.Optional; import java.util.function.BiConsumer; @@ -32,10 +33,18 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Signal; import reactor.core.publisher.SynchronousSink; import reactor.util.context.Context; +import reactor.util.function.Tuple2; public class ReactorUtils { public static final String MDC_KEY_PREFIX = "MDC-"; + private static final Duration DELAY = Duration.ZERO; + + public static <T> Flux<T> throttle(Flux<T> flux, Duration windowDuration, int windowMaxSize) { + return flux.windowTimeout(windowMaxSize, windowDuration) + .zipWith(Flux.interval(DELAY, windowDuration)) + .flatMap(Tuple2::getT1); + } public static <T> Mono<T> executeAndEmpty(Runnable runnable) { return Mono.fromRunnable(runnable).then(Mono.empty()); diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 1ef5184..c0e6819 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -25,9 +25,10 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.IntStream; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; @@ -35,6 +36,8 @@ import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.slf4j.MDC; +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Bytes; @@ -44,6 +47,28 @@ import reactor.core.scheduler.Schedulers; class ReactorUtilsTest { static final int BUFFER_SIZE = 5; + + @Nested + class Throttling { + @Test + void throttleShouldApplyMaxSize() { + int windowMaxSize = 3; + Duration windowDuration = Duration.ofMillis(100); + + Stopwatch stopwatch = Stopwatch.createUnstarted(); + + Flux<Integer> originalFlux = Flux.range(0, 10); + ImmutableList<Long> windowMembership = ReactorUtils.throttle(originalFlux, windowDuration, windowMaxSize) + .doOnSubscribe(signal -> stopwatch.start()) + .map(i -> stopwatch.elapsed(TimeUnit.MILLISECONDS)) + .map(i -> i / 100) + .collect(Guavate.toImmutableList()) + .block(); + + assertThat(windowMembership) + .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L); + } + } @Nested class ExecuteAndEmpty { diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java index 95767f5..cc8bb06 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java +++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java @@ -44,6 +44,7 @@ import org.apache.james.task.Task; import org.apache.james.task.Task.Result; import org.apache.james.user.api.UsersRepository; import org.apache.james.user.api.UsersRepositoryException; +import org.apache.james.util.ReactorUtils; import org.apache.james.util.streams.Iterators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +54,6 @@ import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; public class MessageFastViewProjectionCorrector { private static final Logger LOGGER = LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class); @@ -216,18 +216,12 @@ public class MessageFastViewProjectionCorrector { } private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, RunningOptions runningOptions, Progress progress) { - return throttleWithRate(entries, runningOptions) + return ReactorUtils.throttle(entries, PERIOD, runningOptions.getMessagesPerSecond()) .flatMap(entry -> correctProjection(entry, progress)) .reduce(Task::combine) .switchIfEmpty(Mono.just(Result.COMPLETED)); } - private Flux<ProjectionEntry> throttleWithRate(Flux<ProjectionEntry> entries, RunningOptions runningOptions) { - return entries.windowTimeout(runningOptions.getMessagesPerSecond(), Duration.ofSeconds(1)) - .zipWith(Flux.interval(DELAY, PERIOD)) - .flatMap(Tuple2::getT1); - } - private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) throws MailboxException { return Flux.fromIterable(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), session)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
