This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 9c4a58d5e7 JAMES-4019 ReactiveThrottler should handle better
cancellation (#2104)
9c4a58d5e7 is described below
commit 9c4a58d5e73877e28570f138b7199b32b1b6b461
Author: Benoit TELLIER <[email protected]>
AuthorDate: Tue Mar 12 06:57:27 2024 +0100
JAMES-4019 ReactiveThrottler should handle better cancellation (#2104)
---
.../james/imapserver/netty/ReactiveThrottler.java | 46 ++++++++++++---
.../imapserver/netty/ReactiveThrottlerTest.java | 67 ++++++++++++++++++++++
2 files changed, 106 insertions(+), 7 deletions(-)
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 05998122ae..09b1b9d01d 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -19,18 +19,31 @@
package org.apache.james.imapserver.netty;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.james.imap.api.ImapMessage;
import org.apache.james.metrics.api.GaugeRegistry;
import org.reactivestreams.Publisher;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
public class ReactiveThrottler {
+ private static class TaskHolder {
+ private final Publisher<Void> task;
+ private final AtomicReference<Disposable> disposable = new
AtomicReference<>();
+
+ private TaskHolder(Publisher<Void> task) {
+ this.task = task;
+ }
+ }
+
public static class RejectedException extends RuntimeException {
private final ImapMessage imapMessage;
@@ -48,7 +61,7 @@ public class ReactiveThrottler {
private final int maxQueueSize;
// In flight + executing
private final AtomicInteger concurrentRequests = new AtomicInteger(0);
- private final Queue<Publisher<Void>> queue = new ConcurrentLinkedQueue<>();
+ private final Queue<TaskHolder> queue = new ConcurrentLinkedQueue<>();
public ReactiveThrottler(GaugeRegistry gaugeRegistry, int
maxConcurrentRequests, int maxQueueSize) {
gaugeRegistry.register("imap.request.queue.size", () ->
Math.max(concurrentRequests.get() - maxConcurrentRequests, 0));
@@ -69,11 +82,27 @@ public class ReactiveThrottler {
.doFinally(any -> onRequestDone());
} else if (requestNumber <= maxQueueSize + maxConcurrentRequests) {
// Queue the request for later
+ AtomicBoolean cancelled = new AtomicBoolean(false);
Sinks.One<Void> one = Sinks.one();
- queue.add(Mono.from(task)
+ TaskHolder taskHolder = new
TaskHolder(Mono.fromCallable(cancelled::get)
+ .flatMap(cancel -> {
+ if (cancel) {
+ return Mono.empty();
+ }
+ return Mono.from(task);
+ })
.then(Mono.fromRunnable(() ->
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+ queue.add(taskHolder);
// Let the caller await task completion
- return one.asMono();
+ return one.asMono()
+ .doOnCancel(() -> {
+ cancelled.set(true);
+
Optional.ofNullable(taskHolder.disposable.get()).ifPresent(Disposable::dispose);
+ boolean removed = queue.remove(taskHolder);
+ if (removed) {
+ concurrentRequests.decrementAndGet();
+ }
+ });
} else {
concurrentRequests.decrementAndGet();
@@ -86,12 +115,15 @@ public class ReactiveThrottler {
}
private void onRequestDone() {
- concurrentRequests.decrementAndGet();
- Publisher<Void> throttled = queue.poll();
+ concurrentRequests.getAndDecrement();
+ TaskHolder throttled = queue.poll();
if (throttled != null) {
- Mono.from(throttled)
- .doFinally(any -> onRequestDone())
+ Disposable disposable = Mono.from(throttled.task)
+ .doFinally(any -> {
+ onRequestDone();
+ })
.subscribe();
+ throttled.disposable.set(disposable);
}
}
}
diff --git
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
index c11ab260a8..e1adafa9d5 100644
---
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
+++
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/ReactiveThrottlerTest.java
@@ -24,6 +24,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -34,8 +35,10 @@ import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
+import com.github.fge.lambdas.Throwing;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
class ReactiveThrottlerTest {
@@ -69,6 +72,70 @@ class ReactiveThrottlerTest {
assertThat(executed.get()).isFalse();
}
+ @RepeatedTest(10)
+ void shouldPropagateCancel() throws Exception {
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 5);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Given a throttler
+
+ // When I submit many tasks task - they will get queued
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Disposable disposable1 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable2 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable3 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable4 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable5 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable6 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable7 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ disposable7.dispose();
+ disposable6.dispose();
+ disposable5.dispose();
+ disposable4.dispose();
+ disposable3.dispose();
+ disposable2.dispose();
+ disposable1.dispose();
+ Thread.sleep(200);
+
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+
+ // Then that task is not executed straight away
+ assertThat(executed.get()).isTrue();
+ }
+
+ @RepeatedTest(10)
+ void shouldPropagateCancelInReverseOrder() throws Exception {
+ ReactiveThrottler testee = new ReactiveThrottler(new
NoopGaugeRegistry(), 2, 5);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // Given a throttler
+
+ // When I submit many tasks task - they will get queued
+ AtomicBoolean executed = new AtomicBoolean(false);
+ Disposable disposable1 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable2 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable3 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable4 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable5 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable6 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+ Disposable disposable7 =
Mono.from(testee.throttle(Mono.fromRunnable(Throwing.runnable(latch::await)).subscribeOn(Schedulers.boundedElastic()).then(),
NO_IMAP_MESSAGE)).subscribe();
+
+ disposable1.dispose();
+ disposable2.dispose();
+ disposable3.dispose();
+ disposable4.dispose();
+ disposable5.dispose();
+ disposable6.dispose();
+ disposable7.dispose();
+ Thread.sleep(200);
+
+ Mono.from(testee.throttle(Mono.fromRunnable(() ->
executed.getAndSet(true)), NO_IMAP_MESSAGE)).block();
+
+ // Then that task is not executed straight away
+ assertThat(executed.get()).isTrue();
+ }
+
@Test
void throttleShouldEventuallyExecuteQueuedTasks() {
// Given a throttler
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]