This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1dc33d60c286cf360d8b83c61211fce9554fa33c Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Sun Nov 10 14:40:09 2024 +0100 [FIX] Always schedule execution of new IMAP requests to the event loop This prevents them to be executed outside of the event loop, eg upon ReactiveThrottler piggy backing the execution on DB threads. Running outside of the event loop means writes are not immediate and changes drastically the behaviour of the IMAP server, leading to countless bugs. --- .../imapserver/netty/ImapChannelUpstreamHandler.java | 6 ++++-- .../james/imapserver/netty/ReactiveThrottler.java | 17 +++++++++++++---- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index e639784794..1af387038f 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -445,11 +445,13 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp ctx.fireChannelReadComplete(); if (signal.isOnComplete() || signal.isOnError()) { if (waitingMessage != null && signal.isOnComplete()) { - channelRead(ctx, waitingMessage); + ctx.channel().eventLoop().execute( + () -> channelRead(ctx, waitingMessage)); } } })) - .contextWrite(ReactorUtils.context("imap", mdc(session))), message) + .contextWrite(ReactorUtils.context("imap", mdc(session))), message, + runnable -> ctx.channel().eventLoop().execute(runnable)) // Manage throttling errors .doOnError(ctx::fireExceptionCaught) .doFinally(Throwing.consumer(any -> { diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java index 9ada14ec27..0b4e2d66c4 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ReactiveThrottler.java @@ -25,11 +25,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.james.imap.api.ImapMessage; import org.apache.james.metrics.api.GaugeRegistry; import org.reactivestreams.Publisher; +import com.google.common.annotations.VisibleForTesting; import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -38,10 +40,12 @@ import reactor.core.scheduler.Schedulers; public class ReactiveThrottler { private static class TaskHolder { private final Publisher<Void> task; + private final Consumer<Runnable> ctx; private final AtomicReference<Disposable> disposable = new AtomicReference<>(); - private TaskHolder(Publisher<Void> task) { + private TaskHolder(Publisher<Void> task, Consumer<Runnable> ctx) { this.task = task; + this.ctx = ctx; } } @@ -75,15 +79,20 @@ public class ReactiveThrottler { sink.asFlux() .subscribeOn(Schedulers.parallel()) - .subscribe(taskHolder -> { + .subscribe(taskHolder -> taskHolder.ctx.accept(() -> { Disposable disposable = Mono.from(taskHolder.task) .doFinally(any -> onRequestDone()) .subscribe(); taskHolder.disposable.set(disposable); - }); + })); } + @VisibleForTesting public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage) { + return throttle(task, imapMessage, Runnable::run); + } + + public Mono<Void> throttle(Publisher<Void> task, ImapMessage imapMessage, Consumer<Runnable> ctx) { if (maxConcurrentRequests < 0) { return Mono.from(task); } @@ -104,7 +113,7 @@ public class ReactiveThrottler { } return Mono.from(task); }) - .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST)))); + .then(Mono.fromRunnable(() -> one.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST))), ctx); queue.add(taskHolder); // Let the caller await task completion return one.asMono() --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org