This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 92db805f3323022587acfde16500aa08c95790be Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Mon Nov 4 12:45:13 2024 +0100 JAMES-4090 Locally forced disconnection for IMAP / SMTP --- .../apache/james/imapserver/netty/IMAPServer.java | 18 ++++++++++++++++ .../netty/ImapChannelUpstreamHandler.java | 15 ++++++++++++-- .../james/imapserver/netty/IMAPServerTest.java | 11 ++++++++++ .../protocols/lib/netty/CertificateReloadable.java | 6 ++++++ .../apache/james/lmtpserver/netty/LMTPServer.java | 10 ++++++++- .../netty/SMTPChannelInboundHandler.java | 12 +++++------ .../apache/james/smtpserver/netty/SMTPServer.java | 24 +++++++++++++++++++++- .../protocols/webadmin/ProtocolServerRoutes.java | 10 +++++++++ 8 files changed, 96 insertions(+), 10 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java index f4b71e8679..b929758a56 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.configuration2.tree.ImmutableNode; +import org.apache.james.core.Username; import org.apache.james.imap.api.ConnectionCheck; import org.apache.james.imap.api.ImapConfiguration; import org.apache.james.imap.api.ImapConstants; @@ -53,11 +54,15 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; +import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.concurrent.GlobalEventExecutor; /** @@ -138,6 +143,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC private final ImapMetrics imapMetrics; private final GaugeRegistry gaugeRegistry; private final Set<ConnectionCheck> connectionChecks; + private final DefaultChannelGroup imapChannelGroup; private String hello; private boolean compress; @@ -152,6 +158,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC private Duration heartbeatInterval; private ReactiveThrottler reactiveThrottler; + public IMAPServer(ImapDecoder decoder, ImapEncoder encoder, ImapProcessor processor, ImapMetrics imapMetrics, GaugeRegistry gaugeRegistry, Set<ConnectionCheck> connectionChecks) { this.processor = processor; this.encoder = encoder; @@ -159,6 +166,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC this.imapMetrics = imapMetrics; this.gaugeRegistry = gaugeRegistry; this.connectionChecks = connectionChecks; + this.imapChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } @Override @@ -189,6 +197,15 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC processor.configure(imapConfiguration); } + public void logout(Username user) { + imapChannelGroup.stream() + .filter(channel -> Optional.ofNullable(channel.attr(IMAP_SESSION_ATTRIBUTE_KEY).get()) + .flatMap(session -> Optional.ofNullable(session.getUserName())) + .map(user::equals) + .orElse(false)) + .forEach(channel -> channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE)); + } + private static Integer parseLiteralSizeLimit(HierarchicalConfiguration<ImmutableNode> configuration) { return Optional.ofNullable(configuration.getString("literalSizeLimit", null)) .map(Size::parse) @@ -315,6 +332,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC .heartbeatInterval(heartbeatInterval) .ignoreIDLEUponProcessing(ignoreIDLEUponProcessing) .proxyRequired(proxyRequired) + .imapChannelGroup(imapChannelGroup) .build(); } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java index 1af387038f..4274e1cdab 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java @@ -65,6 +65,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.ssl.NotSslRecordException; @@ -93,6 +94,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp private ReactiveThrottler reactiveThrottler; private Set<ConnectionCheck> connectionChecks; private boolean proxyRequired; + private ChannelGroup imapChannelGroup; public ImapChannelUpstreamHandlerBuilder reactiveThrottler(ReactiveThrottler reactiveThrottler) { this.reactiveThrottler = reactiveThrottler; @@ -154,8 +156,13 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp return this; } + public ImapChannelUpstreamHandlerBuilder imapChannelGroup(ChannelGroup imapChannelGroup) { + this.imapChannelGroup = imapChannelGroup; + return this; + } + public ImapChannelUpstreamHandler build() { - return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler, connectionChecks, proxyRequired); + return new ImapChannelUpstreamHandler(hello, processor, encoder, compress, secure, imapMetrics, authenticationConfiguration, ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), reactiveThrottler, connectionChecks, proxyRequired, imapChannelGroup); } } @@ -181,11 +188,12 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp private final ReactiveThrottler reactiveThrottler; private final Set<ConnectionCheck> connectionChecks; private final boolean proxyRequired; + private final ChannelGroup imapChannelGroup; public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, ImapEncoder encoder, boolean compress, Encryption secure, ImapMetrics imapMetrics, AuthenticationConfiguration authenticationConfiguration, boolean ignoreIDLEUponProcessing, int heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler, - Set<ConnectionCheck> connectionChecks, boolean proxyRequired) { + Set<ConnectionCheck> connectionChecks, boolean proxyRequired, ChannelGroup imapChannelGroup) { this.hello = hello; this.processor = processor; this.encoder = encoder; @@ -199,10 +207,12 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp this.reactiveThrottler = reactiveThrottler; this.connectionChecks = connectionChecks; this.proxyRequired = proxyRequired; + this.imapChannelGroup = imapChannelGroup; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + imapChannelGroup.add(ctx.channel()); SessionId sessionId = SessionId.generate(); ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, compress, authenticationConfiguration.isSSLRequired(), authenticationConfiguration.isPlainAuthEnabled(), sessionId, @@ -262,6 +272,7 @@ public class ImapChannelUpstreamHandler extends ChannelInboundHandlerAdapter imp public void channelInactive(ChannelHandlerContext ctx) throws Exception { // remove the stored attribute for the channel to free up resources // See JAMES-1195 + imapChannelGroup.remove(ctx.channel()); ImapSession imapSession = ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null); try (Closeable closeable = mdc(imapSession).build()) { InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java index 300b19acc5..f2896fbaf2 100644 --- a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java +++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java @@ -248,6 +248,17 @@ class IMAPServerTest { .append("INBOX", SMALL_MESSAGE)); } + @Test + void logoutShouldDisconnectUser() throws Exception { + testIMAPClient.connect("127.0.0.1", port) + .login(USER.asString(), USER_PASS); + + imapServer.logout(USER); + + assertThatThrownBy(() -> testIMAPClient + .append("INBOX", SMALL_MESSAGE)); + } + @Test void allowConnectWithUnbannedIp() throws IOException { imapServer.getConnectionChecks().stream() diff --git a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java index 79656203f5..212cceedca 100644 --- a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java +++ b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java @@ -20,6 +20,8 @@ package org.apache.james.protocols.lib.netty; import java.util.stream.Stream; +import org.apache.james.core.Username; + public interface CertificateReloadable { interface Factory { Stream<? extends CertificateReloadable> certificatesReloadable(); @@ -28,4 +30,8 @@ public interface CertificateReloadable { void reloadSSLCertificate() throws Exception; int getPort(); + + default void logout(Username user) { + + } } diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java index faa5751462..9eb8f12264 100644 --- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java +++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java @@ -33,6 +33,7 @@ import org.apache.james.protocols.lib.netty.AbstractProtocolAsyncServer; import org.apache.james.protocols.lmtp.LMTPConfiguration; import org.apache.james.protocols.netty.AbstractChannelPipelineFactory; import org.apache.james.protocols.netty.ChannelHandlerFactory; +import org.apache.james.protocols.netty.Encryption; import org.apache.james.protocols.netty.LineDelimiterBasedChannelHandlerFactory; import org.apache.james.protocols.smtp.SMTPProtocol; import org.apache.james.smtpserver.ExtendedSMTPSession; @@ -41,6 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServerMBean { private static final Logger LOGGER = LoggerFactory.getLogger(LMTPServer.class); @@ -52,10 +56,12 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe private long maxMessageSize = 0; private final LMTPConfigurationImpl lmtpConfig = new LMTPConfigurationImpl(); private final LMTPMetricsImpl lmtpMetrics; + private final ChannelGroup lmtpChannelGroup; private String lmtpGreeting; public LMTPServer(LMTPMetricsImpl lmtpMetrics) { this.lmtpMetrics = lmtpMetrics; + this.lmtpChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } @Override @@ -155,7 +161,9 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe return new ExtendedSMTPSession(lmtpConfig, transport); } }; - return new SMTPChannelInboundHandler(transport, lmtpMetrics); + boolean proxyRequired = true; + Encryption noEncryption = null; + return new SMTPChannelInboundHandler(transport, noEncryption, !proxyRequired, lmtpMetrics, lmtpChannelGroup); } @Override diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java index d232a217e4..e8e37020bd 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java @@ -31,6 +31,7 @@ import org.apache.james.smtpserver.ExtendedSMTPSession; import org.apache.james.smtpserver.SMTPConstants; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.group.ChannelGroup; /** * {@link BasicChannelInboundHandler} which is used by the SMTPServer @@ -38,18 +39,15 @@ import io.netty.channel.ChannelHandlerContext; public class SMTPChannelInboundHandler extends BasicChannelInboundHandler { private final SmtpMetrics smtpMetrics; + private final ChannelGroup smtpChannelGroup; - public SMTPChannelInboundHandler(Protocol protocol, Encryption encryption, boolean proxyRequired, SmtpMetrics smtpMetrics) { + public SMTPChannelInboundHandler(Protocol protocol, Encryption encryption, boolean proxyRequired, SmtpMetrics smtpMetrics, ChannelGroup smtpChannelGroup) { super(new SMTPMDCContextFactory(), protocol, encryption, proxyRequired); this.smtpMetrics = smtpMetrics; + this.smtpChannelGroup = smtpChannelGroup; this.resultHandlers.add(recordCommandCount(smtpMetrics)); } - public SMTPChannelInboundHandler(Protocol protocol, SmtpMetrics smtpMetrics) { - super(new SMTPMDCContextFactory(), protocol); - this.smtpMetrics = smtpMetrics; - this.resultHandlers.add(recordCommandCount(smtpMetrics)); - } private ProtocolHandlerResultHandler recordCommandCount(SmtpMetrics smtpMetrics) { return (session, response, executionTime, handler) -> { @@ -60,12 +58,14 @@ public class SMTPChannelInboundHandler extends BasicChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + smtpChannelGroup.add(ctx.channel()); super.channelActive(ctx); smtpMetrics.getConnectionMetric().increment(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + smtpChannelGroup.remove(ctx.channel()); super.channelInactive(ctx); smtpMetrics.getConnectionMetric().decrement(); } diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java index 8dc271aaef..5ed535b7d3 100644 --- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java +++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java @@ -18,6 +18,7 @@ ****************************************************************/ package org.apache.james.smtpserver.netty; +import static org.apache.james.protocols.netty.BasicChannelInboundHandler.SESSION_ATTRIBUTE_KEY; import static org.apache.james.smtpserver.netty.SMTPServer.AuthenticationAnnounceMode.ALWAYS; import static org.apache.james.smtpserver.netty.SMTPServer.AuthenticationAnnounceMode.NEVER; @@ -32,6 +33,7 @@ import jakarta.inject.Inject; import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.configuration2.tree.ImmutableNode; +import org.apache.james.core.Username; import org.apache.james.dnsservice.api.DNSService; import org.apache.james.dnsservice.library.netmatcher.NetMatcher; import org.apache.james.protocols.api.OidcSASLConfiguration; @@ -44,6 +46,7 @@ import org.apache.james.protocols.netty.AllButStartTlsLineChannelHandlerFactory; import org.apache.james.protocols.netty.ChannelHandlerFactory; import org.apache.james.protocols.smtp.SMTPConfiguration; import org.apache.james.protocols.smtp.SMTPProtocol; +import org.apache.james.protocols.smtp.SMTPSession; import org.apache.james.smtpserver.CoreCmdHandlerLoader; import org.apache.james.smtpserver.ExtendedSMTPSession; import org.apache.james.smtpserver.jmx.JMXHandlersLoader; @@ -53,7 +56,12 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableSet; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + /** * NIO SMTPServer which use Netty @@ -184,6 +192,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe */ private final SMTPConfiguration theConfigData = new SMTPHandlerConfigurationDataImpl(); private final SmtpMetrics smtpMetrics; + private final DefaultChannelGroup smtpChannelGroup; private Set<String> disabledFeatures = ImmutableSet.of(); private boolean addressBracketsEnforcement = true; @@ -195,6 +204,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe public SMTPServer(SmtpMetrics smtpMetrics) { this.smtpMetrics = smtpMetrics; + this.smtpChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } @Inject @@ -392,7 +402,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe @Override protected ChannelInboundHandlerAdapter createCoreHandler() { - return new SMTPChannelInboundHandler(transport, getEncryption(), proxyRequired, smtpMetrics); + return new SMTPChannelInboundHandler(transport, getEncryption(), proxyRequired, smtpMetrics, smtpChannelGroup); } @Override @@ -413,4 +423,16 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe public AuthenticationAnnounceMode getAuthRequired() { return authenticationConfiguration.getAuthenticationAnnounceMode(); } + + @Override + public void logout(Username user) { + smtpChannelGroup.stream() + .filter(channel -> { + if (channel.attr(SESSION_ATTRIBUTE_KEY).get() instanceof SMTPSession smtpSession) { + return user.equals(smtpSession.getUsername()); + } + return false; + }) + .forEach(channel -> channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE)); + } } diff --git a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java index ae6cde7188..00aebd1528 100644 --- a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java +++ b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java @@ -24,6 +24,7 @@ import java.util.Set; import jakarta.inject.Inject; +import org.apache.james.core.Username; import org.apache.james.protocols.lib.netty.CertificateReloadable; import org.apache.james.util.Port; import org.apache.james.webadmin.Routes; @@ -74,6 +75,15 @@ public class ProtocolServerRoutes implements Routes { return Responses.returnNoContent(response); }); + + service.delete(SERVERS + "/users/:user", (request, response) -> { + Username username = Username.of(request.params("user")); + servers.stream() + .flatMap(CertificateReloadable.Factory::certificatesReloadable) + .forEach(s -> s.logout(username)); + + return Responses.returnNoContent(response); + }); } private Predicate<CertificateReloadable> filters(Request request) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org