This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1dc33d60c286cf360d8b83c61211fce9554fa33c
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Sun Nov 10 14:40:09 2024 +0100

    [FIX] Always schedule execution of new IMAP requests to the event loop
    
    This prevents them to be executed outside of the event loop, eg
    upon ReactiveThrottler piggy backing the execution on DB threads.
    
    Running outside of the event loop means writes are not immediate
    and changes drastically the behaviour of the IMAP server, leading
    to countless bugs.
---
 .../imapserver/netty/ImapChannelUpstreamHandler.java    |  6 ++++--
 .../james/imapserver/netty/ReactiveThrottler.java       | 17 +++++++++++++----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index e639784794..1af387038f 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -445,11 +445,13 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
                     ctx.fireChannelReadComplete();
                     if (signal.isOnComplete() || signal.isOnError()) {
                         if (waitingMessage != null && signal.isOnComplete()) {
-                            channelRead(ctx, waitingMessage);
+                            ctx.channel().eventLoop().execute(
+                                () -> channelRead(ctx, waitingMessage));
                         }
                     }
                 }))
-                .contextWrite(ReactorUtils.context("imap", mdc(session))), 
message)
+                .contextWrite(ReactorUtils.context("imap", mdc(session))), 
message,
+                runnable -> ctx.channel().eventLoop().execute(runnable))
             // Manage throttling errors
             .doOnError(ctx::fireExceptionCaught)
             .doFinally(Throwing.consumer(any -> {
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
index 9ada14ec27..0b4e2d66c4 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java
@@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import org.apache.james.imap.api.ImapMessage;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.reactivestreams.Publisher;
 
+import com.google.common.annotations.VisibleForTesting;
 import reactor.core.Disposable;
 import reactor.core.publisher.Mono;
 import reactor.core.publisher.Sinks;
@@ -38,10 +40,12 @@ import reactor.core.scheduler.Schedulers;
 public class ReactiveThrottler {
     private static class TaskHolder {
         private final Publisher<Void> task;
+        private final Consumer<Runnable> ctx;
         private final AtomicReference<Disposable> disposable = new 
AtomicReference<>();
 
-        private TaskHolder(Publisher<Void> task) {
+        private TaskHolder(Publisher<Void> task, Consumer<Runnable> ctx) {
             this.task = task;
+            this.ctx = ctx;
         }
     }
 
@@ -75,15 +79,20 @@ public class ReactiveThrottler {
 
         sink.asFlux()
             .subscribeOn(Schedulers.parallel())
-            .subscribe(taskHolder -> {
+            .subscribe(taskHolder -> taskHolder.ctx.accept(() -> {
                     Disposable disposable = Mono.from(taskHolder.task)
                         .doFinally(any -> onRequestDone())
                         .subscribe();
                     taskHolder.disposable.set(disposable);
-                });
+                }));
     }
 
+    @VisibleForTesting
     public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage) {
+        return throttle(task, imapMessage, Runnable::run);
+    }
+
+    public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage, 
Consumer<Runnable> ctx) {
         if (maxConcurrentRequests < 0) {
             return Mono.from(task);
         }
@@ -104,7 +113,7 @@ public class ReactiveThrottler {
                     }
                     return Mono.from(task);
                 })
-                .then(Mono.fromRunnable(() -> 
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))));
+                .then(Mono.fromRunnable(() -> 
one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))), ctx);
             queue.add(taskHolder);
             // Let the caller await task completion
             return one.asMono()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to