This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new c8c8b83b5b JAMES-4140 Implement adaptative, session-scoped, throttling for IMAP … (#2766) c8c8b83b5b is described below commit c8c8b83b5b9dd66dee1b54d29d1fb4318b8a6d27 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Fri Jul 25 10:06:11 2025 +0200 JAMES-4140 Implement adaptative, session-scoped, throttling for IMAP … (#2766) --- docs/modules/servers/partials/configure/imap.adoc | 42 +++++- .../imapserver/netty/IMAPCommandsThrottler.java | 155 +++++++++++++++++++++ .../apache/james/imapserver/netty/IMAPServer.java | 8 ++ .../netty/IMAPCommandsThrottlerTest.java | 85 +++++++++++ .../src/test/resources/commandsThrottling.xml | 14 ++ 5 files changed, 302 insertions(+), 2 deletions(-) diff --git a/docs/modules/servers/partials/configure/imap.adoc b/docs/modules/servers/partials/configure/imap.adoc index 0d3084c121..af6911fe43 100644 --- a/docs/modules/servers/partials/configure/imap.adoc +++ b/docs/modules/servers/partials/configure/imap.adoc @@ -161,11 +161,11 @@ It uses the Keycloak OIDC provider, but usage of similar technologies is definit == Traffic Shaping -James ships optional link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty built in Traffic Shaping] that can be optionally configured. +James ships an optional link:https://netty.io/4.0/api/io/netty/handler/traffic/ChannelTrafficShapingHandler.html[Netty built in Traffic Shaping] that can be optionally configured. This enables both: - Record per channel bandwidth consumption - - Allows defining per channel bandwidth limit, which helps at fairness and maintaining a good quality of service. + - Allows defining per channel bandwidth limit, which helps at fairness and maintaining a good quality of service in terms of incoming/outgoing bandwidth. Example: @@ -185,6 +185,44 @@ Those tags maps to the corresponding Netty argument. If omitted no traffic handle is added to the channel pipeline. +== IMAP command throttler + +James ships an optional IMAP command throttler aimed at slowing down lower-quality clients that generate a high +volume of requests. It allows per command granularity and is applied at the scope of an IMAP session. + +The user can declare the list of commands on which throttling needs to be tracked and for each: + + - `thresholdCount`: below this number of occurrence, no throttling is applied. Integer. + - `additionalDelayPerOperation`: delay to be applied when exceeding the threshold. The delay is cumulative and thus + would always increase. Duration. + - `observationPeriod`: the count of observed commands is reset after this period thus stopping delays. Duration. + - `maxDelay`: maximum value the client will be delayed for. + +Sample configuration: + +.... +<imapserver> + <!-- ... --> + <perSessionCommandThrottling> + <select> + <thresholdCount>25</thresholdCount> + <additionalDelayPerOperation>2ms</additionalDelayPerOperation> + <observationPeriod>10m</observationPeriod> + <maxDelay>1s</maxDelay> + </select> + <append> + <thresholdCount>5</thresholdCount> + <additionalDelayPerOperation>10ms</additionalDelayPerOperation> + <observationPeriod>5m</observationPeriod> + <maxDelay>2s</maxDelay> + </append> + </perSessionCommandThrottling> +</imapserver> +.... + +Note that commands are delayed prior the execution and thus are not subject to the IMAP upper concurrency limit until +they are executed. + == Extending IMAP IMAP decoders, processors and encoder can be customized. xref:customization:imap.adoc[Read more]. diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java new file mode 100644 index 0000000000..53e2074f00 --- /dev/null +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPCommandsThrottler.java @@ -0,0 +1,155 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.imapserver.netty; + +import static org.apache.james.imapserver.netty.NettyConstants.IMAP_SESSION_ATTRIBUTE_KEY; + +import java.time.Duration; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.configuration2.HierarchicalConfiguration; +import org.apache.commons.configuration2.ImmutableHierarchicalConfiguration; +import org.apache.commons.configuration2.tree.ImmutableNode; +import org.apache.james.imap.api.message.request.ImapRequest; +import org.apache.james.imap.api.process.ImapSession; +import org.apache.james.imap.processor.IdProcessor; +import org.apache.james.util.DurationParser; +import org.apache.james.util.MDCStructuredLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import it.unimi.dsi.fastutil.Pair; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public class IMAPCommandsThrottler extends ChannelInboundHandlerAdapter { + private static final Logger LOGGER = LoggerFactory.getLogger(IMAPCommandsThrottler.class); + + public record ThrottlerConfigurationEntry( + int thresholdCount, + Duration additionalDelayPerOperation, + Duration observationPeriod, + Duration maxDelay) { + + public static ThrottlerConfigurationEntry from(ImmutableHierarchicalConfiguration configuration) { + return new ThrottlerConfigurationEntry( + Optional.ofNullable(configuration.getString("thresholdCount", null)) + .map(Integer::parseInt) + .orElseThrow(() -> new IllegalArgumentException("thresholdCount in compulsory for ThrottlerConfigurationEntry")), + Optional.ofNullable(configuration.getString("additionalDelayPerOperation", null)) + .map(DurationParser::parse) + .orElseThrow(() -> new IllegalArgumentException("additionalDelayPerOperation in compulsory for ThrottlerConfigurationEntry")), + Optional.ofNullable(configuration.getString("observationPeriod", null)) + .map(DurationParser::parse) + .orElseThrow(() -> new IllegalArgumentException("observationPeriod in compulsory for ThrottlerConfigurationEntry")), + Optional.ofNullable(configuration.getString("maxDelay", null)) + .map(DurationParser::parse) + .orElseThrow(() -> new IllegalArgumentException("maxDelay in compulsory for ThrottlerConfigurationEntry"))); + } + + long delayMSFor(long occurrenceCount) { + if (occurrenceCount < thresholdCount) { + return 0; + } + + return Math.min(maxDelay.toMillis(), occurrenceCount * additionalDelayPerOperation.toMillis()); + } + } + + public record ThrottlerConfiguration(Map<String, ThrottlerConfigurationEntry> entryMap) { + public static ThrottlerConfiguration from(HierarchicalConfiguration<ImmutableNode> configuration) { + return new ThrottlerConfiguration(configuration.getNodeModel() + .getNodeHandler() + .getRootNode() + .getChildren() + .stream() + .map(key -> Pair.of(key.getNodeName().toUpperCase(Locale.US), ThrottlerConfigurationEntry.from(configuration.immutableConfigurationAt(key.getNodeName())))) + .collect(ImmutableMap.toImmutableMap(Pair::key, Pair::value))); + } + } + + private final ThrottlerConfiguration configuration; + + public IMAPCommandsThrottler(ThrottlerConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ImapRequest imapRequest) { + String key = imapRequest.getCommand().getName().toUpperCase(Locale.US); + Optional.ofNullable(configuration.entryMap().get(key)) + .ifPresentOrElse(configurationEntry -> throttle(ctx, msg, imapRequest, configurationEntry), + () -> ctx.fireChannelRead(msg)); + } else { + ctx.fireChannelRead(msg); + } + } + + private static void throttle(ChannelHandlerContext ctx, Object msg, ImapRequest imapRequest, ThrottlerConfigurationEntry configurationEntry) { + ImapSession session = (ImapSession) ctx.channel().attr(IMAP_SESSION_ATTRIBUTE_KEY); + + AtomicLong atomicLong = retrieveAssociatedCounter(imapRequest, session, configurationEntry); + Duration delay = Duration.ofMillis(configurationEntry.delayMSFor(atomicLong.getAndIncrement())); + + if (delay.isPositive()) { + logDelay(imapRequest, session, delay); + + Mono.delay(delay) + .then(Mono.fromRunnable(() -> ctx.fireChannelRead(msg))) + .subscribeOn(Schedulers.parallel()) + .subscribe(); + } else { + ctx.fireChannelRead(msg); + } + } + + private static AtomicLong retrieveAssociatedCounter(ImapRequest imapRequest, ImapSession session, ThrottlerConfigurationEntry entry) { + String key = "imap-applicative-traffic-shaper-counter-" + imapRequest.getCommand().getName(); + return Optional.ofNullable(session.getAttribute(key)) + .filter(AtomicLong.class::isInstance) + .map(AtomicLong.class::cast) + .orElseGet(() -> { + AtomicLong res = new AtomicLong(0); + session.setAttribute(key, res); + session.schedule(() -> session.setAttribute(key, new AtomicLong(0)), entry.observationPeriod()); + return res; + }); + } + + private static void logDelay(ImapRequest imapRequest, ImapSession session, Duration delay) { + MDCStructuredLogger.forLogger(LOGGER) + .field("username", session.getUserName().asString()) + .field("userAgent", Optional.ofNullable(session.getAttribute(IdProcessor.USER_AGENT)) + .filter(String.class::isInstance) + .map(String.class::cast) + .orElse("")) + .log(logger -> logger.info("Delayed command {} on an IMAP session. Delay {} ms", + imapRequest.getCommand().getName(), + delay.toMillis())); + } +} diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java index 115ea2dd08..b66129f04c 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java @@ -177,6 +177,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC private Optional<TrafficShapingConfiguration> trafficShaping = Optional.empty(); private Optional<ConnectionLimitUpstreamHandler> connectionLimitUpstreamHandler = Optional.empty(); private Optional<ConnectionPerIpLimitUpstreamHandler> connectionPerIpLimitUpstreamHandler = Optional.empty(); + private Optional<IMAPCommandsThrottler.ThrottlerConfiguration> throttlerConfiguration = Optional.empty(); private boolean ignoreIDLEUponProcessing; private Duration heartbeatInterval; private ReactiveThrottler reactiveThrottler; @@ -222,6 +223,10 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC trafficShaping = Optional.ofNullable(configuration.configurationAt("trafficShaping")) .map(TrafficShapingConfiguration::from); } + if (configuration.getKeys("perSessionCommandThrottling").hasNext()) { + throttlerConfiguration = Optional.ofNullable(configuration.configurationAt("perSessionCommandThrottling")) + .map(IMAPCommandsThrottler.ThrottlerConfiguration::from); + } } @Override @@ -337,6 +342,9 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC pipeline.addLast(REQUEST_DECODER, new ImapRequestFrameDecoder(decoder, inMemorySizeLimit, literalSizeLimit, maxLineLength)); + throttlerConfiguration.map(IMAPCommandsThrottler::new) + .ifPresent(handler -> pipeline.addLast("commandThrottler", handler)); + pipeline.addLast(CORE_HANDLER, createCoreHandler()); } diff --git a/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java new file mode 100644 index 0000000000..32fc769b1b --- /dev/null +++ b/server/protocols/protocols-imap4/src/test/java/org/apache/james/imapserver/netty/IMAPCommandsThrottlerTest.java @@ -0,0 +1,85 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.imapserver.netty; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; + +import org.apache.commons.configuration2.HierarchicalConfiguration; +import org.apache.commons.configuration2.tree.ImmutableNode; +import org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfiguration; +import org.apache.james.imapserver.netty.IMAPCommandsThrottler.ThrottlerConfigurationEntry; +import org.apache.james.protocols.lib.mock.ConfigLoader; +import org.apache.james.util.ClassLoaderUtils; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableMap; + +class IMAPCommandsThrottlerTest { + @Nested + class ConfigTest { + @Test + void shouldLoad() throws Exception { + HierarchicalConfiguration<ImmutableNode> config = ConfigLoader.getConfig(ClassLoaderUtils.getSystemResourceAsSharedStream("commandsThrottling.xml")); + + var selectEntry = new ThrottlerConfigurationEntry(25, Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1)); + var appendEntry = new ThrottlerConfigurationEntry(5, Duration.ofMillis(10), Duration.ofMinutes(5), Duration.ofSeconds(2)); + + assertThat(ThrottlerConfiguration.from(config)) + .isEqualTo(new ThrottlerConfiguration( + ImmutableMap.of( + "SELECT", selectEntry, + "APPEND", appendEntry))); + } + } + + @Nested + class DelayTest { + @Test + void shouldNotDelayWhenBelowThreshold() { + var selectEntry = new ThrottlerConfigurationEntry(25, Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1)); + + assertThat(selectEntry.delayMSFor(24)).isZero(); + } + + @Test + void shouldDelayWhenThreshold() { + var selectEntry = new ThrottlerConfigurationEntry(25, Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1)); + + assertThat(selectEntry.delayMSFor(25)).isEqualTo(50); + } + + @Test + void shouldAdditionalDelayWhenAboveThreshold() { + var selectEntry = new ThrottlerConfigurationEntry(25, Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1)); + + assertThat(selectEntry.delayMSFor(26)).isEqualTo(52); + } + + @Test + void shouldNotExceedMaximumDelay() { + var selectEntry = new ThrottlerConfigurationEntry(25, Duration.ofMillis(2), Duration.ofMinutes(10), Duration.ofSeconds(1)); + + assertThat(selectEntry.delayMSFor(2600)).isEqualTo(1000); + } + } +} \ No newline at end of file diff --git a/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml new file mode 100644 index 0000000000..dc4f57afb4 --- /dev/null +++ b/server/protocols/protocols-imap4/src/test/resources/commandsThrottling.xml @@ -0,0 +1,14 @@ +<perSessionCommandThrottling> + <select> + <thresholdCount>25</thresholdCount> + <additionalDelayPerOperation>2ms</additionalDelayPerOperation> + <observationPeriod>10m</observationPeriod> + <maxDelay>1s</maxDelay> + </select> + <append> + <thresholdCount>5</thresholdCount> + <additionalDelayPerOperation>10ms</additionalDelayPerOperation> + <observationPeriod>5m</observationPeriod> + <maxDelay>2s</maxDelay> + </append> +</perSessionCommandThrottling> \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org