This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c8b83b5b JAMES-4140 Implement adaptative, session-scoped, throttling 
for IMAP … (#2766)
c8c8b83b5b is described below

commit c8c8b83b5b9dd66dee1b54d29d1fb4318b8a6d27
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Fri Jul 25 10:06:11 2025 +0200

    JAMES-4140 Implement adaptative, session-scoped, throttling for IMAP … 
(#2766)
---
 docs/modules/servers/partials/configure/imap.adoc  |  42 +++++-
 .../imapserver/netty/IMAPCommandsThrottler.java    | 155 +++++++++++++++++++++
 .../apache/james/imapserver/netty/IMAPServer.java  |   8 ++
 .../netty/IMAPCommandsThrottlerTest.java           |  85 +++++++++++
 .../src/test/resources/commandsThrottling.xml      |  14 ++
 5 files changed, 302 insertions(+), 2 deletions(-)

diff --git a/docs/modules/servers/partials/configure/imap.adoc 
b/docs/modules/servers/partials/configure/imap.adoc
index 0d3084c121..af6911fe43 100644
--- a/docs/modules/servers/partials/configure/imap.adoc
+++ b/docs/modules/servers/partials/configure/imap.adoc
@@ -161,11 +161,11 @@ It uses the Keycloak OIDC provider, but usage of similar 
technologies is definit
 
 == Traffic Shaping
 
-James ships optional 
link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty
 built in Traffic Shaping] that can be optionally configured.
+James ships an optional 
link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty
 built in Traffic Shaping] that can be optionally configured.
 
 This enables both:
  - Record per channel bandwidth consumption
- - Allows defining per channel bandwidth limit, which helps at fairness and 
maintaining a good quality of service.
+ - Allows defining per channel bandwidth limit, which helps at fairness and 
maintaining a good quality of service in terms of incoming/outgoing bandwidth.
 
 Example:
 
@@ -185,6 +185,44 @@ Those tags maps to the corresponding Netty argument.
 
 If omitted no traffic handle is added to the channel pipeline.
 
+== IMAP command throttler
+
+James ships an optional IMAP command throttler aimed at slowing down 
lower-quality clients that generate a high
+volume of requests. It allows per command granularity and is applied at the 
scope of an IMAP session.
+
+The user can declare the list of commands on which throttling needs to be 
tracked and for each:
+
+ - `thresholdCount`: below this number of occurrence, no throttling is 
applied. Integer.
+ - `additionalDelayPerOperation`: delay to be applied when exceeding the 
threshold. The delay is cumulative and thus
+ would always increase. Duration.
+ - `observationPeriod`: the count of observed commands is reset after this 
period thus stopping delays. Duration.
+ - `maxDelay`: maximum value the client will be delayed for.
+
+Sample configuration:
+
+....
+<imapserver>
+  <!-- ... -->
+  <perSessionCommandThrottling>
+    <select>
+      <thresholdCount>25</thresholdCount>
+      <additionalDelayPerOperation>2ms</additionalDelayPerOperation>
+      <observationPeriod>10m</observationPeriod>
+      <maxDelay>1s</maxDelay>
+    </select>
+    <append>
+      <thresholdCount>5</thresholdCount>
+      <additionalDelayPerOperation>10ms</additionalDelayPerOperation>
+      <observationPeriod>5m</observationPeriod>
+      <maxDelay>2s</maxDelay>
+    </append>
+  </perSessionCommandThrottling>
+</imapserver>
+....
+
+Note that commands are delayed prior the execution and thus are not subject to 
the IMAP upper concurrency limit until
+they are executed.
+
 == Extending IMAP
 
 IMAP decoders, processors and encoder can be customized. 
xref:customization:imap.adoc[Read more].
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
new file mode 100644
index 0000000000..53e2074f00
--- /dev/null
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java
@@ -0,0 +1,155 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static 
org.apache.james.imapserver.netty.NettyConstants.IMAP_SESSION_ATTRIBUTE_KEY;
+
+import java.time.Duration;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.configuration2.HierarchicalConfiguration;
+import org.apache.commons.configuration2.ImmutableHierarchicalConfiguration;
+import org.apache.commons.configuration2.tree.ImmutableNode;
+import org.apache.james.imap.api.message.request.ImapRequest;
+import org.apache.james.imap.api.process.ImapSession;
+import org.apache.james.imap.processor.IdProcessor;
+import org.apache.james.util.DurationParser;
+import org.apache.james.util.MDCStructuredLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import it.unimi.dsi.fastutil.Pair;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class IMAPCommandsThrottler extends ChannelInboundHandlerAdapter {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IMAPCommandsThrottler.class);
+
+    public record ThrottlerConfigurationEntry(
+        int thresholdCount,
+        Duration additionalDelayPerOperation,
+        Duration observationPeriod,
+        Duration maxDelay) {
+
+        public static ThrottlerConfigurationEntry 
from(ImmutableHierarchicalConfiguration configuration) {
+            return new ThrottlerConfigurationEntry(
+                Optional.ofNullable(configuration.getString("thresholdCount", 
null))
+                    .map(Integer::parseInt)
+                    .orElseThrow(() -> new 
IllegalArgumentException("thresholdCount in compulsory for 
ThrottlerConfigurationEntry")),
+                
Optional.ofNullable(configuration.getString("additionalDelayPerOperation", 
null))
+                    .map(DurationParser::parse)
+                    .orElseThrow(() -> new 
IllegalArgumentException("additionalDelayPerOperation in compulsory for 
ThrottlerConfigurationEntry")),
+                
Optional.ofNullable(configuration.getString("observationPeriod", null))
+                    .map(DurationParser::parse)
+                    .orElseThrow(() -> new 
IllegalArgumentException("observationPeriod in compulsory for 
ThrottlerConfigurationEntry")),
+                Optional.ofNullable(configuration.getString("maxDelay", null))
+                    .map(DurationParser::parse)
+                    .orElseThrow(() -> new IllegalArgumentException("maxDelay 
in compulsory for ThrottlerConfigurationEntry")));
+        }
+
+        long delayMSFor(long occurrenceCount) {
+            if (occurrenceCount < thresholdCount) {
+                return 0;
+            }
+
+            return Math.min(maxDelay.toMillis(), occurrenceCount * 
additionalDelayPerOperation.toMillis());
+        }
+    }
+
+    public record ThrottlerConfiguration(Map<String, 
ThrottlerConfigurationEntry> entryMap) {
+        public static ThrottlerConfiguration 
from(HierarchicalConfiguration<ImmutableNode> configuration) {
+            return new ThrottlerConfiguration(configuration.getNodeModel()
+                .getNodeHandler()
+                .getRootNode()
+                .getChildren()
+                .stream()
+                .map(key -> Pair.of(key.getNodeName().toUpperCase(Locale.US), 
ThrottlerConfigurationEntry.from(configuration.immutableConfigurationAt(key.getNodeName()))))
+                .collect(ImmutableMap.toImmutableMap(Pair::key, Pair::value)));
+        }
+    }
+
+    private final ThrottlerConfiguration configuration;
+
+    public IMAPCommandsThrottler(ThrottlerConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        if (msg instanceof ImapRequest imapRequest) {
+            String key = 
imapRequest.getCommand().getName().toUpperCase(Locale.US);
+            Optional.ofNullable(configuration.entryMap().get(key))
+                .ifPresentOrElse(configurationEntry -> throttle(ctx, msg, 
imapRequest, configurationEntry),
+                    () -> ctx.fireChannelRead(msg));
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    private static void throttle(ChannelHandlerContext ctx, Object msg, 
ImapRequest imapRequest, ThrottlerConfigurationEntry configurationEntry) {
+        ImapSession session = (ImapSession) 
ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY);
+
+        AtomicLong atomicLong = retrieveAssociatedCounter(imapRequest, 
session, configurationEntry);
+        Duration delay = 
Duration.ofMillis(configurationEntry.delayMSFor(atomicLong.getAndIncrement()));
+
+        if (delay.isPositive()) {
+            logDelay(imapRequest, session, delay);
+
+            Mono.delay(delay)
+                .then(Mono.fromRunnable(() -> ctx.fireChannelRead(msg)))
+                .subscribeOn(Schedulers.parallel())
+                .subscribe();
+        } else {
+            ctx.fireChannelRead(msg);
+        }
+    }
+
+    private static AtomicLong retrieveAssociatedCounter(ImapRequest 
imapRequest, ImapSession session, ThrottlerConfigurationEntry entry) {
+        String key = "imap-applicative-traffic-shaper-counter-" + 
imapRequest.getCommand().getName();
+        return Optional.ofNullable(session.getAttribute(key))
+            .filter(AtomicLong.class::isInstance)
+            .map(AtomicLong.class::cast)
+            .orElseGet(() -> {
+                AtomicLong res = new AtomicLong(0);
+                session.setAttribute(key, res);
+                session.schedule(() -> session.setAttribute(key, new 
AtomicLong(0)), entry.observationPeriod());
+                return res;
+            });
+    }
+
+    private static void logDelay(ImapRequest imapRequest, ImapSession session, 
Duration delay) {
+        MDCStructuredLogger.forLogger(LOGGER)
+            .field("username", session.getUserName().asString())
+            .field("userAgent", 
Optional.ofNullable(session.getAttribute(IdProcessor.USER_AGENT))
+                .filter(String.class::isInstance)
+                .map(String.class::cast)
+                .orElse(""))
+            .log(logger -> logger.info("Delayed command {} on an IMAP session. 
Delay {} ms", 
+                imapRequest.getCommand().getName(),
+                delay.toMillis()));
+    }
+}
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index 115ea2dd08..b66129f04c 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -177,6 +177,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
     private Optional<TrafficShapingConfiguration> trafficShaping = 
Optional.empty();
     private Optional<ConnectionLimitUpstreamHandler> 
connectionLimitUpstreamHandler = Optional.empty();
     private Optional<ConnectionPerIpLimitUpstreamHandler> 
connectionPerIpLimitUpstreamHandler = Optional.empty();
+    private Optional<IMAPCommandsThrottler.ThrottlerConfiguration> 
throttlerConfiguration = Optional.empty();
     private boolean ignoreIDLEUponProcessing;
     private Duration heartbeatInterval;
     private ReactiveThrottler reactiveThrottler;
@@ -222,6 +223,10 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
             trafficShaping = 
Optional.ofNullable(configuration.configurationAt("trafficShaping"))
                 .map(TrafficShapingConfiguration::from);
         }
+        if (configuration.getKeys("perSessionCommandThrottling").hasNext()) {
+            throttlerConfiguration = 
Optional.ofNullable(configuration.configurationAt("perSessionCommandThrottling"))
+                .map(IMAPCommandsThrottler.ThrottlerConfiguration::from);
+        }
     }
 
     @Override
@@ -337,6 +342,9 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
                 pipeline.addLast(REQUEST_DECODER, new 
ImapRequestFrameDecoder(decoder, inMemorySizeLimit,
                     literalSizeLimit, maxLineLength));
 
+                throttlerConfiguration.map(IMAPCommandsThrottler::new)
+                    .ifPresent(handler -> pipeline.addLast("commandThrottler", 
handler));
+
                 pipeline.addLast(CORE_HANDLER, createCoreHandler());
             }
 
diff --git 
a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
new file mode 100644
index 0000000000..32fc769b1b
--- /dev/null
+++ 
b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.imapserver.netty;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+
+import org.apache.commons.configuration2.HierarchicalConfiguration;
+import org.apache.commons.configuration2.tree.ImmutableNode;
+import 
org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfiguration;
+import 
org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfigurationEntry;
+import org.apache.james.protocols.lib.mock.ConfigLoader;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+class IMAPCommandsThrottlerTest {
+    @Nested
+    class ConfigTest {
+        @Test
+        void shouldLoad() throws Exception {
+            HierarchicalConfiguration<ImmutableNode> config = 
ConfigLoader.getConfig(ClassLoaderUtils.getSystemResourceAsSharedStream("commandsThrottling.xml"));
+
+            var selectEntry = new ThrottlerConfigurationEntry(25, 
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+            var appendEntry = new ThrottlerConfigurationEntry(5, 
Duration.ofMillis(10), Duration.ofMinutes(5), Duration.ofSeconds(2));
+
+            assertThat(ThrottlerConfiguration.from(config))
+                .isEqualTo(new ThrottlerConfiguration(
+                    ImmutableMap.of(
+                        "SELECT", selectEntry,
+                        "APPEND", appendEntry)));
+        }
+    }
+
+    @Nested
+    class DelayTest {
+        @Test
+        void shouldNotDelayWhenBelowThreshold() {
+            var selectEntry = new ThrottlerConfigurationEntry(25, 
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+            assertThat(selectEntry.delayMSFor(24)).isZero();
+        }
+
+        @Test
+        void shouldDelayWhenThreshold() {
+            var selectEntry = new ThrottlerConfigurationEntry(25, 
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+            assertThat(selectEntry.delayMSFor(25)).isEqualTo(50);
+        }
+
+        @Test
+        void shouldAdditionalDelayWhenAboveThreshold() {
+            var selectEntry = new ThrottlerConfigurationEntry(25, 
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+            assertThat(selectEntry.delayMSFor(26)).isEqualTo(52);
+        }
+
+        @Test
+        void shouldNotExceedMaximumDelay() {
+            var selectEntry = new ThrottlerConfigurationEntry(25, 
Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1));
+
+            assertThat(selectEntry.delayMSFor(2600)).isEqualTo(1000);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml 
b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml
new file mode 100644
index 0000000000..dc4f57afb4
--- /dev/null
+++ b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml
@@ -0,0 +1,14 @@
+<perSessionCommandThrottling>
+    <select>
+        <thresholdCount>25</thresholdCount>
+        <additionalDelayPerOperation>2ms</additionalDelayPerOperation>
+        <observationPeriod>10m</observationPeriod>
+        <maxDelay>1s</maxDelay>
+    </select>
+    <append>
+        <thresholdCount>5</thresholdCount>
+        <additionalDelayPerOperation>10ms</additionalDelayPerOperation>
+        <observationPeriod>5m</observationPeriod>
+        <maxDelay>2s</maxDelay>
+    </append>
+</perSessionCommandThrottling>
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to