This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ffb22b9e2c0d00f3f9f3d07d1d3c70d4fc9518ec
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 21 14:42:25 2020 +0700

    JAMES-3184 ReactorUtils::throttle
---
 .../java/org/apache/james/util/ReactorUtils.java   |  9 ++++++++
 .../org/apache/james/util/ReactorUtilsTest.java    | 27 +++++++++++++++++++++-
 .../jmap/MessageFastViewProjectionCorrector.java   | 10 ++------
 3 files changed, 37 insertions(+), 9 deletions(-)

diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index a477355..bc4bc61 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Optional;
 import java.util.function.BiConsumer;
@@ -32,10 +33,18 @@ import reactor.core.publisher.Mono;
 import reactor.core.publisher.Signal;
 import reactor.core.publisher.SynchronousSink;
 import reactor.util.context.Context;
+import reactor.util.function.Tuple2;
 
 public class ReactorUtils {
 
     public static final String MDC_KEY_PREFIX = "MDC-";
+    private static final Duration DELAY = Duration.ZERO;
+
+    public static <T> Flux<T> throttle(Flux<T> flux, Duration windowDuration, 
int windowMaxSize) {
+        return flux.windowTimeout(windowMaxSize, windowDuration)
+            .zipWith(Flux.interval(DELAY, windowDuration))
+            .flatMap(Tuple2::getT1);
+    }
 
     public static <T> Mono<T> executeAndEmpty(Runnable runnable) {
         return Mono.fromRunnable(runnable).then(Mono.empty());
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 1ef5184..c0e6819 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -25,9 +25,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
@@ -35,6 +36,8 @@ import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.slf4j.MDC;
 
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.primitives.Bytes;
 
@@ -44,6 +47,28 @@ import reactor.core.scheduler.Schedulers;
 
 class ReactorUtilsTest {
     static final int BUFFER_SIZE = 5;
+    
+    @Nested
+    class Throttling {
+        @Test
+        void throttleShouldApplyMaxSize() {
+            int windowMaxSize = 3;
+            Duration windowDuration = Duration.ofMillis(100);
+
+            Stopwatch stopwatch = Stopwatch.createUnstarted();
+
+            Flux<Integer> originalFlux = Flux.range(0, 10);
+            ImmutableList<Long> windowMembership = 
ReactorUtils.throttle(originalFlux, windowDuration, windowMaxSize)
+                .doOnSubscribe(signal -> stopwatch.start())
+                .map(i -> stopwatch.elapsed(TimeUnit.MILLISECONDS))
+                .map(i -> i / 100)
+                .collect(Guavate.toImmutableList())
+                .block();
+
+            assertThat(windowMembership)
+                .containsExactly(0L, 0L, 0L, 1L, 1L, 1L, 2L, 2L, 2L, 3L);
+        }
+    }
 
     @Nested
     class ExecuteAndEmpty {
diff --git 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
index 95767f5..cc8bb06 100644
--- 
a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
+++ 
b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/MessageFastViewProjectionCorrector.java
@@ -44,6 +44,7 @@ import org.apache.james.task.Task;
 import org.apache.james.task.Task.Result;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +54,6 @@ import com.google.common.base.Preconditions;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 public class MessageFastViewProjectionCorrector {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MessageFastViewProjectionCorrector.class);
@@ -216,18 +216,12 @@ public class MessageFastViewProjectionCorrector {
     }
 
     private Mono<Result> correctProjection(Flux<ProjectionEntry> entries, 
RunningOptions runningOptions, Progress progress) {
-        return throttleWithRate(entries, runningOptions)
+        return ReactorUtils.throttle(entries, PERIOD, 
runningOptions.getMessagesPerSecond())
             .flatMap(entry -> correctProjection(entry, progress))
             .reduce(Task::combine)
             .switchIfEmpty(Mono.just(Result.COMPLETED));
     }
 
-    private Flux<ProjectionEntry> throttleWithRate(Flux<ProjectionEntry> 
entries, RunningOptions runningOptions) {
-        return entries.windowTimeout(runningOptions.getMessagesPerSecond(), 
Duration.ofSeconds(1))
-            .zipWith(Flux.interval(DELAY, PERIOD))
-            .flatMap(Tuple2::getT1);
-    }
-
     private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) 
throws MailboxException {
         return 
Flux.fromIterable(mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(),
 session));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to