This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 45a836fae5 feat(managesieve): add support for proxy protocol in managesieve 45a836fae5 is described below commit 45a836fae5988bd77516c66373a50b8210e32455 Author: Felix Auringer <felix.aurin...@protonmail.com> AuthorDate: Sun Sep 14 20:15:45 2025 +0200 feat(managesieve): add support for proxy protocol in managesieve --- .../netty/ManageSieveChannelUpstreamHandler.java | 26 ++++++++++++++++ .../netty/ManageSieveMDCContext.java | 21 ++++++++++--- .../managesieveserver/netty/ManageSieveServer.java | 36 ++++++++++++---------- .../managesieveserver/netty/NettyConstants.java | 3 +- 4 files changed, 63 insertions(+), 23 deletions(-) diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveChannelUpstreamHandler.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveChannelUpstreamHandler.java index 2a1098b041..444d426351 100644 --- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveChannelUpstreamHandler.java +++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveChannelUpstreamHandler.java @@ -27,6 +27,7 @@ import org.apache.james.managesieve.api.SessionTerminatedException; import org.apache.james.managesieve.transcode.ManageSieveProcessor; import org.apache.james.managesieve.transcode.NotEnoughDataException; import org.apache.james.managesieve.util.SettableSession; +import org.apache.james.protocols.api.ProxyInformation; import org.apache.james.protocols.netty.Encryption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +39,8 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.haproxy.HAProxyMessage; +import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol; @ChannelHandler.Sharable public class ManageSieveChannelUpstreamHandler extends ChannelInboundHandlerAdapter { @@ -64,6 +67,10 @@ public class ManageSieveChannelUpstreamHandler extends ChannelInboundHandlerAdap public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ChannelManageSieveResponseWriter attachment = ctx.channel().attr(NettyConstants.RESPONSE_WRITER_ATTRIBUTE_KEY).get(); try (Closeable closeable = ManageSieveMDCContext.from(ctx)) { + if (msg instanceof HAProxyMessage) { + handleHAProxyMessage(ctx, (HAProxyMessage) msg); + return; + } String request = attachment.cumulate((String) msg); if (request.isEmpty() || request.startsWith("\r\n")) { return; @@ -87,6 +94,25 @@ public class ManageSieveChannelUpstreamHandler extends ChannelInboundHandlerAdap } } + private void handleHAProxyMessage(ChannelHandlerContext ctx, HAProxyMessage haproxyMsg) throws Exception { + try { + if (haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP4) || haproxyMsg.proxiedProtocol().equals(HAProxyProxiedProtocol.TCP6)) { + ProxyInformation proxyInformation = new ProxyInformation( + new InetSocketAddress(haproxyMsg.sourceAddress(), haproxyMsg.sourcePort()), + new InetSocketAddress(haproxyMsg.destinationAddress(), haproxyMsg.destinationPort())); + ctx.channel().attr(NettyConstants.PROXY_INFO).set(proxyInformation); + + LOGGER.info("Connection from {} runs through {} proxy", haproxyMsg.sourceAddress(), haproxyMsg.destinationAddress()); + } else { + throw new IllegalArgumentException("Only TCP4/TCP6 are supported when using PROXY protocol."); + } + + super.channelReadComplete(ctx); + } finally { + haproxyMsg.release(); + } + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { try (Closeable closeable = ManageSieveMDCContext.from(ctx)) { diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveMDCContext.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveMDCContext.java index c04210d4ef..cababf75c7 100644 --- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveMDCContext.java +++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveMDCContext.java @@ -26,19 +26,30 @@ import java.util.Optional; import org.apache.james.core.Username; import org.apache.james.managesieve.api.Session; +import org.apache.james.protocols.api.ProxyInformation; import org.apache.james.util.MDCBuilder; import io.netty.channel.ChannelHandlerContext; public class ManageSieveMDCContext { public static Closeable from(ChannelHandlerContext ctx) { - return MDCBuilder.create() + MDCBuilder builder = MDCBuilder.create() .addToContext(from(ctx.channel().attr(NettyConstants.SESSION_ATTRIBUTE_KEY).get())) .addToContext(MDCBuilder.PROTOCOL, "MANAGE-SIEVE") - .addToContext(MDCBuilder.IP, retrieveIp(ctx)) - .addToContext(MDCBuilder.HOST, retrieveHost(ctx)) - .addToContext(MDCBuilder.SESSION_ID, ctx.channel().id().asShortText()) - .build(); + .addToContext(MDCBuilder.SESSION_ID, ctx.channel().id().asShortText()); + addRemoteInformation(ctx, builder); + return builder.build(); + } + + private static void addRemoteInformation(ChannelHandlerContext ctx, MDCBuilder builder) { + Optional<ProxyInformation> proxyInformation = Optional.ofNullable(ctx.channel().attr(NettyConstants.PROXY_INFO).get()); + if (proxyInformation.isPresent()) { + builder.addToContext(MDCBuilder.IP, proxyInformation.get().getSource().getAddress().getHostAddress()); + builder.addToContext(MDCBuilder.HOST, proxyInformation.get().getSource().getHostName()); + } else { + builder.addToContext(MDCBuilder.IP, retrieveIp(ctx)); + builder.addToContext(MDCBuilder.HOST, retrieveHost(ctx)); + } } private static String retrieveIp(ChannelHandlerContext ctx) { diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java index 93ab6ab22c..114b082217 100644 --- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java +++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/ManageSieveServer.java @@ -19,10 +19,6 @@ package org.apache.james.managesieveserver.netty; - -import static org.apache.james.protocols.netty.HandlerConstants.CONNECTION_LIMIT_HANDLER; -import static org.apache.james.protocols.netty.HandlerConstants.CONNECTION_LIMIT_PER_IP_HANDLER; - import java.util.Optional; import org.apache.commons.configuration2.HierarchicalConfiguration; @@ -36,12 +32,14 @@ import org.apache.james.protocols.netty.ChannelHandlerFactory; import org.apache.james.protocols.netty.ConnectionLimitUpstreamHandler; import org.apache.james.protocols.netty.ConnectionPerIpLimitUpstreamHandler; import org.apache.james.protocols.netty.Encryption; +import org.apache.james.protocols.netty.HandlerConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.stream.ChunkedWriteHandler; @@ -51,11 +49,6 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement private static final Logger LOGGER = LoggerFactory.getLogger(ManageSieveServer.class); - static final String SSL_HANDLER = "sslHandler"; - static final String FRAMER = "framer"; - static final String CORE_HANDLER = "coreHandler"; - static final String CHUNK_WRITE_HANDLER = "chunkWriteHandler"; - private final int maxLineLength; private final ManageSieveProcessor manageSieveProcessor; private Optional<ConnectionLimitUpstreamHandler> connectionLimitUpstreamHandler = Optional.empty(); @@ -102,25 +95,34 @@ public class ManageSieveServer extends AbstractConfigurableAsyncServer implement @Override public void initChannel(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); + + if (proxyRequired) { + pipeline.addLast(HandlerConstants.PROXY_HANDLER, new HAProxyMessageDecoder()); + } + + // See also AbstractSSLAwareChannelPipelineFactory. Encryption secure = getEncryption(); - if (secure != null && !secure.isStartTLS()) { - pipeline.addFirst(SSL_HANDLER, secure.sslHandler()); + if (secure != null && secure.supportsEncryption() && !secure.isStartTLS()) { + if (proxyRequired && proxyFirst) { + pipeline.addAfter(HandlerConstants.PROXY_HANDLER, HandlerConstants.SSL_HANDLER, secure.sslHandler()); + } else { + pipeline.addFirst(HandlerConstants.SSL_HANDLER, secure.sslHandler()); + } } - connectionLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(CONNECTION_LIMIT_HANDLER, handler)); - connectionPerIpLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(CONNECTION_LIMIT_PER_IP_HANDLER, handler)); + connectionLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(HandlerConstants.CONNECTION_LIMIT_HANDLER, handler)); + connectionPerIpLimitUpstreamHandler.ifPresent(handler -> pipeline.addLast(HandlerConstants.CONNECTION_LIMIT_PER_IP_HANDLER, handler)); // Add the text line decoder which limit the max line length, // don't strip the delimiter and use CRLF as delimiter // Use a SwitchableDelimiterBasedFrameDecoder, see JAMES-1436 - pipeline.addLast(getExecutorGroup(), FRAMER, getFrameHandlerFactory().create(pipeline)); - pipeline.addLast(getExecutorGroup(), CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); + pipeline.addLast(getExecutorGroup(), HandlerConstants.FRAMER, getFrameHandlerFactory().create(pipeline)); + pipeline.addLast(getExecutorGroup(), HandlerConstants.CHUNK_HANDLER, new ChunkedWriteHandler()); pipeline.addLast(getExecutorGroup(), "stringDecoder", new StringDecoder(CharsetUtil.UTF_8)); - pipeline.addLast(getExecutorGroup(), CORE_HANDLER, createHandler()); + pipeline.addLast(getExecutorGroup(), HandlerConstants.CORE_HANDLER, createHandler()); pipeline.addLast(getExecutorGroup(), "stringEncoder", new StringEncoder(CharsetUtil.UTF_8)); } - }; } diff --git a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/NettyConstants.java b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/NettyConstants.java index 9e52d9f18b..d966e796e8 100644 --- a/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/NettyConstants.java +++ b/server/protocols/protocols-managesieve/src/main/java/org/apache/james/managesieveserver/netty/NettyConstants.java @@ -19,6 +19,7 @@ package org.apache.james.managesieveserver.netty; import org.apache.james.managesieve.api.Session; +import org.apache.james.protocols.api.ProxyInformation; import io.netty.util.AttributeKey; @@ -27,6 +28,6 @@ import io.netty.util.AttributeKey; */ public interface NettyConstants { AttributeKey<ChannelManageSieveResponseWriter> RESPONSE_WRITER_ATTRIBUTE_KEY = AttributeKey.valueOf("ResponseWriter"); + AttributeKey<ProxyInformation> PROXY_INFO = AttributeKey.valueOf("ProxyInfo"); AttributeKey<Session> SESSION_ATTRIBUTE_KEY = AttributeKey.valueOf("Session"); - } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org