This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f36ea102f8d67a538bdfc8d9afb4698b1ed95510
Author: LanKhuat <[email protected]>
AuthorDate: Thu May 28 11:12:38 2020 +0700

    JAMES-3184 Use ReactorUtils Throttler
---
 .../mail/task/SolveMessageInconsistenciesService.java  | 18 +++++++-----------
 1 file changed, 7 insertions(+), 11 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index e25d6f3..89a90ea 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -38,6 +38,7 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.task.Task;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +48,6 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.util.function.Tuple2;
 
 public class SolveMessageInconsistenciesService {
 
@@ -424,18 +424,13 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Flux<Task.Result> fixInconsistenciesInImapUid(Context context, 
RunningOptions runningOptions) {
-        return throttle(messageIdToImapUidDAO.retrieveAllMessages(), 
runningOptions)
+        return 
ReactorUtils.Throttler.forOperation(this::detectInconsistencyInImapUid)
+            .window(runningOptions.getMessagesPerSecond(), PERIOD)
+            .throttle(messageIdToImapUidDAO.retrieveAllMessages())
             .doOnNext(any -> context.incrementProcessedImapUidEntries())
-            .flatMap(this::detectInconsistencyInImapUid)
             .flatMap(inconsistency -> inconsistency.fix(context, 
messageIdToImapUidDAO, messageIdDAO));
     }
 
-    private Flux<ComposedMessageIdWithMetaData> 
throttle(Flux<ComposedMessageIdWithMetaData> messages, RunningOptions 
runningOptions) {
-        return messages.windowTimeout(runningOptions.getMessagesPerSecond(), 
Duration.ofSeconds(1))
-            .zipWith(Flux.interval(DELAY, PERIOD))
-            .flatMap(Tuple2::getT1);
-    }
-
     private Mono<Inconsistency> 
detectInconsistencyInImapUid(ComposedMessageIdWithMetaData message) {
         return compareWithMessageIdRecord(message)
             .onErrorResume(error -> Mono.just(new 
FailedToRetrieveRecord(message)));
@@ -474,9 +469,10 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Flux<Task.Result> fixInconsistenciesInMessageId(Context context, 
RunningOptions runningOptions) {
-        return throttle(messageIdDAO.retrieveAllMessages(), runningOptions)
+        return 
ReactorUtils.Throttler.forOperation(this::detectInconsistencyInMessageId)
+            .window(runningOptions.getMessagesPerSecond(), PERIOD)
+            .throttle(messageIdDAO.retrieveAllMessages())
             .doOnNext(any -> context.incrementMessageIdEntries())
-            .flatMap(this::detectInconsistencyInMessageId)
             .flatMap(inconsistency -> inconsistency.fix(context, 
messageIdToImapUidDAO, messageIdDAO));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to