This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 70dee40530 JAMES-3811 Ability to cancel IMAP request execution upon
closed connections (#1267)
70dee40530 is described below
commit 70dee405303b83c79c786d267b6eb4523f247e0e
Author: Benoit TELLIER <[email protected]>
AuthorDate: Wed Oct 26 09:17:01 2022 +0700
JAMES-3811 Ability to cancel IMAP request execution upon closed connections
(#1267)
---
.../james/imapserver/netty/ImapChannelUpstreamHandler.java | 12 +++++++++++-
.../org/apache/james/imapserver/netty/NettyConstants.java | 2 ++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 2bb3731940..b0dd0e66f6 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -57,6 +57,8 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.Attribute;
+import reactor.core.Disposable;
import reactor.core.publisher.Mono;
/**
@@ -210,6 +212,7 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
// remove the stored attribute for the channel to free up resources
// See JAMES-1195
ImapSession imapSession =
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null);
+ Disposable disposableAttribute =
ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null);
Optional.ofNullable(imapSession)
.map(ImapSession::logout)
@@ -221,6 +224,7 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
.subscribe(any -> {
}, ctx::fireExceptionCaught);
+
Optional.ofNullable(disposableAttribute).ifPresent(Disposable::dispose);
}
}
@@ -272,6 +276,9 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
// logout on error not sure if that is the best way to handle it
final ImapSession imapSession =
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+
Optional.ofNullable(ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY).getAndSet(null))
+ .ifPresent(Disposable::dispose);
+
Optional.ofNullable(imapSession)
.map(ImapSession::logout)
.orElse(Mono.empty())
@@ -298,12 +305,13 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
public void channelRead(ChannelHandlerContext ctx, Object msg) {
imapCommandsMetric.increment();
ImapSession session =
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).get();
+ Attribute<Disposable> disposableAttribute =
ctx.channel().attr(REQUEST_IN_FLIGHT_ATTRIBUTE_KEY);
ImapResponseComposer response = new ImapResponseComposerImpl(new
ChannelImapResponseWriter(ctx.channel()));
ImapMessage message = (ImapMessage) msg;
beforeIDLEUponProcessing(ctx);
ResponseEncoder responseEncoder = new ResponseEncoder(encoder,
response);
- reactiveThrottler.throttle(
+ Disposable disposable = reactiveThrottler.throttle(
processor.processReactive(message, responseEncoder, session)
.doOnEach(Throwing.consumer(signal -> {
if (session.getState() == ImapSessionState.LOGOUT) {
@@ -335,12 +343,14 @@ public class ImapChannelUpstreamHandler extends
ChannelInboundHandlerAdapter imp
if (signal.hasError()) {
ctx.fireExceptionCaught(signal.getThrowable());
}
+ disposableAttribute.set(null);
ctx.fireChannelReadComplete();
}))
.contextWrite(ReactorUtils.context("imap", mdc(ctx))), message)
// Manage throttling errors
.doOnError(ctx::fireExceptionCaught)
.subscribe();
+ disposableAttribute.set(disposable);
}
private void beforeIDLEUponProcessing(ChannelHandlerContext ctx) {
diff --git
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
index 3a62cb0931..0db9543968 100644
---
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
+++
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/NettyConstants.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.james.imap.api.process.ImapSession;
import io.netty.util.AttributeKey;
+import reactor.core.Disposable;
/**
@@ -40,6 +41,7 @@ public interface NettyConstants {
String HEARTBEAT_HANDLER = "heartbeatHandler";
AttributeKey<ImapSession> IMAP_SESSION_ATTRIBUTE_KEY =
AttributeKey.valueOf("ImapSession");
+ AttributeKey<Disposable> REQUEST_IN_FLIGHT_ATTRIBUTE_KEY =
AttributeKey.valueOf("requestInFlight");
AttributeKey<Map<String, Object>> FRAME_DECODE_ATTACHMENT_ATTRIBUTE_KEY =
AttributeKey.valueOf("FrameDecoderMap");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]