This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit bc1557a13ef4e60f447f806aa3ba9e5b649027b0 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Thu Nov 21 11:36:00 2024 +0100 JAMES-4093 Deliver Traffic shaping for IMAP --- .../apache/james/imapserver/netty/IMAPServer.java | 37 ++++++++++++++++------ .../netty/TrafficShapingConfiguration.java | 19 +++++++++++ .../src/test/resources/imapserver.xml | 6 ++++ 3 files changed, 52 insertions(+), 10 deletions(-) diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java index d96dfe53bd..e3ba88e277 100644 --- a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java @@ -75,6 +75,9 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.haproxy.HAProxyMessageDecoder; import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.traffic.AbstractTrafficShapingHandler; +import io.netty.handler.traffic.ChannelTrafficShapingHandler; +import io.netty.handler.traffic.TrafficCounter; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; @@ -168,6 +171,7 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC private int timeout; private int literalSizeLimit; private AuthenticationConfiguration authenticationConfiguration; + private Optional<TrafficShapingConfiguration> trafficShaping = Optional.empty(); private Optional<ConnectionLimitUpstreamHandler> connectionLimitUpstreamHandler = Optional.empty(); private Optional<ConnectionPerIpLimitUpstreamHandler> connectionPerIpLimitUpstreamHandler = Optional.empty(); private boolean ignoreIDLEUponProcessing; @@ -211,6 +215,10 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC heartbeatInterval = imapConfiguration.idleTimeIntervalAsDuration(); reactiveThrottler = new ReactiveThrottler(gaugeRegistry, imapConfiguration.getConcurrentRequests(), imapConfiguration.getMaxQueueSize()); processor.configure(imapConfiguration); + if (configuration.getKeys("trafficShaping").hasNext()) { + trafficShaping = Optional.ofNullable(configuration.configurationAt("trafficShaping")) + .map(TrafficShapingConfiguration::from); + } } @Override @@ -318,6 +326,8 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC channel.pipeline().addFirst(SSL_HANDLER, secure.sslHandler()); } } + trafficShaping.map(TrafficShapingConfiguration::newHandler) + .ifPresent(handler -> pipeline.addLast("trafficShaping",handler)); pipeline.addLast(CHUNK_WRITE_HANDLER, new ChunkedWriteHandler()); @@ -378,6 +388,8 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC return imapChannelGroup.stream() .map(channel -> { Optional<ImapSession> imapSession = Optional.ofNullable(channel.attr(IMAP_SESSION_ATTRIBUTE_KEY).get()); + Optional<TrafficCounter> trafficCounter = Optional.ofNullable(channel.pipeline().get(ChannelTrafficShapingHandler.class)) + .map(AbstractTrafficShapingHandler::trafficCounter); return new ConnectionDescription( "IMAP", jmxName, @@ -388,22 +400,27 @@ public class IMAPServer extends AbstractConfigurableAsyncServer implements ImapC channel.isWritable(), imapSession.map(ImapSession::isTLSActive).orElse(false), imapSession.flatMap(session -> Optional.ofNullable(session.getUserName())), - ImmutableMap.of( - "isCompressed", Boolean.toString(imapSession.map(ImapSession::isCompressionActive).orElse(false)), - "selectedMailbox", imapSession.flatMap(session -> Optional.ofNullable(session.getSelected())) + ImmutableMap.<String, String>builder() + .put("isCompressed", Boolean.toString(imapSession.map(ImapSession::isCompressionActive).orElse(false))) + .put("selectedMailbox", imapSession.flatMap(session -> Optional.ofNullable(session.getSelected())) .map(SelectedMailbox::getMailboxId) .map(MailboxId::serialize) - .orElse(""), - "isIdling", Boolean.toString(imapSession.flatMap(session -> Optional.ofNullable(session.getSelected())) + .orElse("")) + .put("isIdling", Boolean.toString(imapSession.flatMap(session -> Optional.ofNullable(session.getSelected())) .map(SelectedMailbox::isIdling) - .orElse(false)), - "requestCount", Long.toString(Optional.ofNullable(channel.attr(REQUEST_COUNTER)) + .orElse(false))) + .put("requestCount", Long.toString(Optional.ofNullable(channel.attr(REQUEST_COUNTER)) .flatMap(attribute -> Optional.ofNullable(attribute.get())) .map(AtomicLong::get) - .orElse(0L)), - "userAgent", imapSession.flatMap(s -> Optional.ofNullable(s.getAttribute("userAgent"))) + .orElse(0L))) + .put("userAgent", imapSession.flatMap(s -> Optional.ofNullable(s.getAttribute("userAgent"))) .map(Object::toString) - .orElse(""))); + .orElse("")) + .put("cumulativeWrittenBytes", Long.toString(trafficCounter.map(TrafficCounter::cumulativeWrittenBytes).orElse(0L))) + .put("cumulativeReadBytes", Long.toString(trafficCounter.map(TrafficCounter::cumulativeReadBytes).orElse(0L))) + .put("liveReadThroughputBytePerSecond", Long.toString(trafficCounter.map(TrafficCounter::lastReadThroughput).orElse(0L))) + .put("liveWriteThroughputBytePerSecond", Long.toString(trafficCounter.map(TrafficCounter::lastWriteThroughput).orElse(0L))) + .build()); }); } diff --git a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java new file mode 100644 index 0000000000..b87a1faee3 --- /dev/null +++ b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java @@ -0,0 +1,19 @@ +package org.apache.james.imapserver.netty; + +import org.apache.commons.configuration2.Configuration; + +import io.netty.handler.traffic.ChannelTrafficShapingHandler; + +public record TrafficShapingConfiguration(long writeLimit, long readLimit, long checkInterval, long maxTime) { + static TrafficShapingConfiguration from(Configuration configuration) { + return new TrafficShapingConfiguration( + configuration.getLong("writeTrafficPerSecond", 0), + configuration.getLong("readTrafficPerSecond", 0), + configuration.getLong("checkInterval", 30), + configuration.getLong("maxDelays", 30)); + } + + public ChannelTrafficShapingHandler newHandler(){ + return new ChannelTrafficShapingHandler(writeLimit, readLimit, checkInterval, maxTime); + } +} diff --git a/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml b/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml index ec86ad6ed7..8f376f58f5 100644 --- a/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml +++ b/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml @@ -37,6 +37,12 @@ under the License. <connectionLimitPerIP>0</connectionLimitPerIP> <plainAuthDisallowed>false</plainAuthDisallowed> <gracefulShutdown>false</gracefulShutdown> + <trafficShaping> + <writeTrafficPerSecond>0</writeTrafficPerSecond> + <readTrafficPerSecond>0</readTrafficPerSecond> + <checkInterval>1</checkInterval> + <maxDelays>30</maxDelays> + </trafficShaping> </imapserver> <imapserver enabled="true"> <jmxName>imapserver-ssl</jmxName> --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org