This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 92db805f3323022587acfde16500aa08c95790be
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Nov 4 12:45:13 2024 +0100

    JAMES-4090 Locally forced disconnection for IMAP / SMTP
---
 .../apache/james/imapserver/netty/IMAPServer.java  | 18 ++++++++++++++++
 .../netty/ImapChannelUpstreamHandler.java          | 15 ++++++++++++--
 .../james/imapserver/netty/IMAPServerTest.java     | 11 ++++++++++
 .../protocols/lib/netty/CertificateReloadable.java |  6 ++++++
 .../apache/james/lmtpserver/netty/LMTPServer.java  | 10 ++++++++-
 .../netty/SMTPChannelInboundHandler.java           | 12 +++++------
 .../apache/james/smtpserver/netty/SMTPServer.java  | 24 +++++++++++++++++++++-
 .../protocols/webadmin/ProtocolServerRoutes.java   | 10 +++++++++
 8 files changed, 96 insertions(+), 10 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index f4b71e8679..b929758a56 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration2.HierarchicalConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.configuration2.tree.ImmutableNode;
+import org.apache.james.core.Username;
 import org.apache.james.imap.api.ConnectionCheck;
 import org.apache.james.imap.api.ImapConfiguration;
 import org.apache.james.imap.api.ImapConstants;
@@ -53,11 +54,15 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelPipeline;
+import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 
 /**
@@ -138,6 +143,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
     private final ImapMetrics imapMetrics;
     private final GaugeRegistry gaugeRegistry;
     private final Set<ConnectionCheck> connectionChecks;
+    private final DefaultChannelGroup imapChannelGroup;
 
     private String hello;
     private boolean compress;
@@ -152,6 +158,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
     private Duration heartbeatInterval;
     private ReactiveThrottler reactiveThrottler;
 
+
     public IMAPServer(ImapDecoder decoder, ImapEncoder encoder, ImapProcessor 
processor, ImapMetrics imapMetrics, GaugeRegistry gaugeRegistry, 
Set<ConnectionCheck> connectionChecks) {
         this.processor = processor;
         this.encoder = encoder;
@@ -159,6 +166,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
         this.imapMetrics = imapMetrics;
         this.gaugeRegistry = gaugeRegistry;
         this.connectionChecks = connectionChecks;
+        this.imapChannelGroup = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     }
 
     @Override
@@ -189,6 +197,15 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
         processor.configure(imapConfiguration);
     }
 
+    public void logout(Username user) {
+        imapChannelGroup.stream()
+            .filter(channel -> 
Optional.ofNullable(channel.attr(IMAP_SESSION_ATTRIBUTE_KEY).get())
+                .flatMap(session -> Optional.ofNullable(session.getUserName()))
+                .map(user::equals)
+                .orElse(false))
+            .forEach(channel -> 
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE));
+    }
+
     private static Integer 
parseLiteralSizeLimit(HierarchicalConfiguration<ImmutableNode> configuration) {
         return Optional.ofNullable(configuration.getString("literalSizeLimit", 
null))
             .map(Size::parse)
@@ -315,6 +332,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
             .heartbeatInterval(heartbeatInterval)
             .ignoreIDLEUponProcessing(ignoreIDLEUponProcessing)
             .proxyRequired(proxyRequired)
+            .imapChannelGroup(imapChannelGroup)
             .build();
     }
 
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
index 1af387038f..4274e1cdab 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
@@ -65,6 +65,7 @@ import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
 import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.ssl.NotSslRecordException;
@@ -93,6 +94,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
         private ReactiveThrottler reactiveThrottler;
         private Set<ConnectionCheck> connectionChecks;
         private boolean proxyRequired;
+        private ChannelGroup imapChannelGroup;
 
         public ImapChannelUpstreamHandlerBuilder 
reactiveThrottler(ReactiveThrottler reactiveThrottler) {
             this.reactiveThrottler = reactiveThrottler;
@@ -154,8 +156,13 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
             return this;
         }
 
+        public ImapChannelUpstreamHandlerBuilder imapChannelGroup(ChannelGroup 
imapChannelGroup) {
+            this.imapChannelGroup = imapChannelGroup;
+            return this;
+        }
+
         public ImapChannelUpstreamHandler build() {
-            return new ImapChannelUpstreamHandler(hello, processor, encoder, 
compress, secure, imapMetrics, authenticationConfiguration, 
ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), 
reactiveThrottler, connectionChecks, proxyRequired);
+            return new ImapChannelUpstreamHandler(hello, processor, encoder, 
compress, secure, imapMetrics, authenticationConfiguration, 
ignoreIDLEUponProcessing, (int) heartbeatInterval.toSeconds(), 
reactiveThrottler, connectionChecks, proxyRequired, imapChannelGroup);
         }
     }
 
@@ -181,11 +188,12 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
     private final ReactiveThrottler reactiveThrottler;
     private final Set<ConnectionCheck> connectionChecks;
     private final boolean proxyRequired;
+    private final ChannelGroup imapChannelGroup;
 
     public ImapChannelUpstreamHandler(String hello, ImapProcessor processor, 
ImapEncoder encoder, boolean compress,
                                       Encryption secure, ImapMetrics 
imapMetrics, AuthenticationConfiguration authenticationConfiguration,
                                       boolean ignoreIDLEUponProcessing, int 
heartbeatIntervalSeconds, ReactiveThrottler reactiveThrottler,
-                                      Set<ConnectionCheck> connectionChecks, 
boolean proxyRequired) {
+                                      Set<ConnectionCheck> connectionChecks, 
boolean proxyRequired, ChannelGroup imapChannelGroup) {
         this.hello = hello;
         this.processor = processor;
         this.encoder = encoder;
@@ -199,10 +207,12 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
         this.reactiveThrottler = reactiveThrottler;
         this.connectionChecks = connectionChecks;
         this.proxyRequired = proxyRequired;
+        this.imapChannelGroup = imapChannelGroup;
     }
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        imapChannelGroup.add(ctx.channel());
         SessionId sessionId = SessionId.generate();
         ImapSession imapsession = new NettyImapSession(ctx.channel(), secure, 
compress, authenticationConfiguration.isSSLRequired(),
             authenticationConfiguration.isPlainAuthEnabled(), sessionId,
@@ -262,6 +272,7 @@ public class ImapChannelUpstreamHandler extends 
ChannelInboundHandlerAdapter imp
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         // remove the stored attribute for the channel to free up resources
         // See JAMES-1195
+        imapChannelGroup.remove(ctx.channel());
         ImapSession imapSession = 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY).getAndSet(null);
         try (Closeable closeable = mdc(imapSession).build()) {
             InetSocketAddress address = (InetSocketAddress) 
ctx.channel().remoteAddress();
diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
index 300b19acc5..f2896fbaf2 100644
--- 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPServerTest.java
@@ -248,6 +248,17 @@ class IMAPServerTest {
                 .append("INBOX", SMALL_MESSAGE));
         }
 
+        @Test
+        void logoutShouldDisconnectUser() throws Exception {
+            testIMAPClient.connect("127.0.0.1", port)
+                .login(USER.asString(), USER_PASS);
+
+            imapServer.logout(USER);
+
+            assertThatThrownBy(() -> testIMAPClient
+                .append("INBOX", SMALL_MESSAGE));
+        }
+
         @Test
         void allowConnectWithUnbannedIp() throws IOException {
             imapServer.getConnectionChecks().stream()
diff --git 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java
 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java
index 79656203f5..212cceedca 100644
--- 
a/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java
+++ 
b/server/protocols/protocols-library/src/main/java/org/apache/james/protocols/lib/netty/CertificateReloadable.java
@@ -20,6 +20,8 @@ package org.apache.james.protocols.lib.netty;
 
 import java.util.stream.Stream;
 
+import org.apache.james.core.Username;
+
 public interface CertificateReloadable {
     interface Factory {
         Stream<? extends CertificateReloadable> certificatesReloadable();
@@ -28,4 +30,8 @@ public interface CertificateReloadable {
     void reloadSSLCertificate() throws Exception;
 
     int getPort();
+
+    default void logout(Username user) {
+
+    }
 }
diff --git 
a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
 
b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
index faa5751462..9eb8f12264 100644
--- 
a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
+++ 
b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
@@ -33,6 +33,7 @@ import 
org.apache.james.protocols.lib.netty.AbstractProtocolAsyncServer;
 import org.apache.james.protocols.lmtp.LMTPConfiguration;
 import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
 import org.apache.james.protocols.netty.ChannelHandlerFactory;
+import org.apache.james.protocols.netty.Encryption;
 import 
org.apache.james.protocols.netty.LineDelimiterBasedChannelHandlerFactory;
 import org.apache.james.protocols.smtp.SMTPProtocol;
 import org.apache.james.smtpserver.ExtendedSMTPSession;
@@ -41,6 +42,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 public class LMTPServer extends AbstractProtocolAsyncServer implements 
LMTPServerMBean {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LMTPServer.class);
@@ -52,10 +56,12 @@ public class LMTPServer extends AbstractProtocolAsyncServer 
implements LMTPServe
     private long maxMessageSize = 0;
     private final LMTPConfigurationImpl lmtpConfig = new 
LMTPConfigurationImpl();
     private final LMTPMetricsImpl lmtpMetrics;
+    private final ChannelGroup lmtpChannelGroup;
     private String lmtpGreeting;
 
     public LMTPServer(LMTPMetricsImpl lmtpMetrics) {
         this.lmtpMetrics = lmtpMetrics;
+        this.lmtpChannelGroup = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     }
 
     @Override
@@ -155,7 +161,9 @@ public class LMTPServer extends AbstractProtocolAsyncServer 
implements LMTPServe
                 return new ExtendedSMTPSession(lmtpConfig, transport);
             }
         };
-        return new SMTPChannelInboundHandler(transport, lmtpMetrics);
+        boolean proxyRequired = true;
+        Encryption noEncryption = null;
+        return new SMTPChannelInboundHandler(transport, noEncryption, 
!proxyRequired, lmtpMetrics, lmtpChannelGroup);
     }
 
     @Override
diff --git 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
index d232a217e4..e8e37020bd 100644
--- 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
+++ 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
@@ -31,6 +31,7 @@ import org.apache.james.smtpserver.ExtendedSMTPSession;
 import org.apache.james.smtpserver.SMTPConstants;
 
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.group.ChannelGroup;
 
 /**
  * {@link BasicChannelInboundHandler} which is used by the SMTPServer
@@ -38,18 +39,15 @@ import io.netty.channel.ChannelHandlerContext;
 public class SMTPChannelInboundHandler extends BasicChannelInboundHandler {
 
     private final SmtpMetrics smtpMetrics;
+    private final ChannelGroup smtpChannelGroup;
 
-    public SMTPChannelInboundHandler(Protocol protocol, Encryption encryption, 
boolean proxyRequired, SmtpMetrics smtpMetrics) {
+    public SMTPChannelInboundHandler(Protocol protocol, Encryption encryption, 
boolean proxyRequired, SmtpMetrics smtpMetrics, ChannelGroup smtpChannelGroup) {
         super(new SMTPMDCContextFactory(), protocol, encryption, 
proxyRequired);
         this.smtpMetrics = smtpMetrics;
+        this.smtpChannelGroup = smtpChannelGroup;
         this.resultHandlers.add(recordCommandCount(smtpMetrics));
     }
 
-    public SMTPChannelInboundHandler(Protocol protocol, SmtpMetrics 
smtpMetrics) {
-        super(new SMTPMDCContextFactory(), protocol);
-        this.smtpMetrics = smtpMetrics;
-        this.resultHandlers.add(recordCommandCount(smtpMetrics));
-    }
 
     private ProtocolHandlerResultHandler recordCommandCount(SmtpMetrics 
smtpMetrics) {
         return (session, response, executionTime, handler) -> {
@@ -60,12 +58,14 @@ public class SMTPChannelInboundHandler extends 
BasicChannelInboundHandler {
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        smtpChannelGroup.add(ctx.channel());
         super.channelActive(ctx);
         smtpMetrics.getConnectionMetric().increment();
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        smtpChannelGroup.remove(ctx.channel());
         super.channelInactive(ctx);
         smtpMetrics.getConnectionMetric().decrement();
     }
diff --git 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 8dc271aaef..5ed535b7d3 100644
--- 
a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ 
b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.smtpserver.netty;
 
+import static 
org.apache.james.protocols.netty.BasicChannelInboundHandler.SESSION_ATTRIBUTE_KEY;
 import static 
org.apache.james.smtpserver.netty.SMTPServer.AuthenticationAnnounceMode.ALWAYS;
 import static 
org.apache.james.smtpserver.netty.SMTPServer.AuthenticationAnnounceMode.NEVER;
 
@@ -32,6 +33,7 @@ import jakarta.inject.Inject;
 import org.apache.commons.configuration2.HierarchicalConfiguration;
 import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.configuration2.tree.ImmutableNode;
+import org.apache.james.core.Username;
 import org.apache.james.dnsservice.api.DNSService;
 import org.apache.james.dnsservice.library.netmatcher.NetMatcher;
 import org.apache.james.protocols.api.OidcSASLConfiguration;
@@ -44,6 +46,7 @@ import 
org.apache.james.protocols.netty.AllButStartTlsLineChannelHandlerFactory;
 import org.apache.james.protocols.netty.ChannelHandlerFactory;
 import org.apache.james.protocols.smtp.SMTPConfiguration;
 import org.apache.james.protocols.smtp.SMTPProtocol;
+import org.apache.james.protocols.smtp.SMTPSession;
 import org.apache.james.smtpserver.CoreCmdHandlerLoader;
 import org.apache.james.smtpserver.ExtendedSMTPSession;
 import org.apache.james.smtpserver.jmx.JMXHandlersLoader;
@@ -53,7 +56,12 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
 
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
 
 /**
  * NIO SMTPServer which use Netty
@@ -184,6 +192,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer 
implements SMTPServe
      */
     private final SMTPConfiguration theConfigData = new 
SMTPHandlerConfigurationDataImpl();
     private final SmtpMetrics smtpMetrics;
+    private final DefaultChannelGroup smtpChannelGroup;
     private Set<String> disabledFeatures = ImmutableSet.of();
 
     private boolean addressBracketsEnforcement = true;
@@ -195,6 +204,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer 
implements SMTPServe
 
     public SMTPServer(SmtpMetrics smtpMetrics) {
         this.smtpMetrics = smtpMetrics;
+        this.smtpChannelGroup = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
     }
 
     @Inject
@@ -392,7 +402,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer 
implements SMTPServe
 
     @Override
     protected ChannelInboundHandlerAdapter createCoreHandler() {
-        return new SMTPChannelInboundHandler(transport, getEncryption(), 
proxyRequired, smtpMetrics);
+        return new SMTPChannelInboundHandler(transport, getEncryption(), 
proxyRequired, smtpMetrics, smtpChannelGroup);
     }
 
     @Override
@@ -413,4 +423,16 @@ public class SMTPServer extends 
AbstractProtocolAsyncServer implements SMTPServe
     public AuthenticationAnnounceMode getAuthRequired() {
         return authenticationConfiguration.getAuthenticationAnnounceMode();
     }
+
+    @Override
+    public void logout(Username user) {
+        smtpChannelGroup.stream()
+            .filter(channel -> {
+                if (channel.attr(SESSION_ATTRIBUTE_KEY).get() instanceof 
SMTPSession smtpSession) {
+                    return user.equals(smtpSession.getUsername());
+                }
+                return false;
+            })
+            .forEach(channel -> 
channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE));
+    }
 }
diff --git 
a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java
 
b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java
index ae6cde7188..00aebd1528 100644
--- 
a/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java
+++ 
b/server/protocols/webadmin/webadmin-protocols/src/main/java/org/apache/james/protocols/webadmin/ProtocolServerRoutes.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import jakarta.inject.Inject;
 
+import org.apache.james.core.Username;
 import org.apache.james.protocols.lib.netty.CertificateReloadable;
 import org.apache.james.util.Port;
 import org.apache.james.webadmin.Routes;
@@ -74,6 +75,15 @@ public class ProtocolServerRoutes implements Routes {
 
             return Responses.returnNoContent(response);
         });
+
+        service.delete(SERVERS + "/users/:user", (request, response) -> {
+            Username username = Username.of(request.params("user"));
+            servers.stream()
+                .flatMap(CertificateReloadable.Factory::certificatesReloadable)
+                .forEach(s -> s.logout(username));
+
+            return Responses.returnNoContent(response);
+        });
     }
 
     private Predicate<CertificateReloadable> filters(Request request) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to