This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit bc1557a13ef4e60f447f806aa3ba9e5b649027b0
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Thu Nov 21 11:36:00 2024 +0100

    JAMES-4093 Deliver Traffic shaping for IMAP
---
 .../apache/james/imapserver/netty/IMAPServer.java  | 37 ++++++++++++++++------
 .../netty/TrafficShapingConfiguration.java         | 19 +++++++++++
 .../src/test/resources/imapserver.xml              |  6 ++++
 3 files changed, 52 insertions(+), 10 deletions(-)

diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
index d96dfe53bd..e3ba88e277 100644
--- 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/IMAPServer.java
@@ -75,6 +75,9 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.traffic.AbstractTrafficShapingHandler;
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+import io.netty.handler.traffic.TrafficCounter;
 import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
@@ -168,6 +171,7 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
     private int timeout;
     private int literalSizeLimit;
     private AuthenticationConfiguration authenticationConfiguration;
+    private Optional<TrafficShapingConfiguration> trafficShaping = 
Optional.empty();
     private Optional<ConnectionLimitUpstreamHandler> 
connectionLimitUpstreamHandler = Optional.empty();
     private Optional<ConnectionPerIpLimitUpstreamHandler> 
connectionPerIpLimitUpstreamHandler = Optional.empty();
     private boolean ignoreIDLEUponProcessing;
@@ -211,6 +215,10 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
         heartbeatInterval = imapConfiguration.idleTimeIntervalAsDuration();
         reactiveThrottler = new ReactiveThrottler(gaugeRegistry, 
imapConfiguration.getConcurrentRequests(), imapConfiguration.getMaxQueueSize());
         processor.configure(imapConfiguration);
+        if (configuration.getKeys("trafficShaping").hasNext()) {
+            trafficShaping = 
Optional.ofNullable(configuration.configurationAt("trafficShaping"))
+                .map(TrafficShapingConfiguration::from);
+        }
     }
 
     @Override
@@ -318,6 +326,8 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
                         channel.pipeline().addFirst(SSL_HANDLER, 
secure.sslHandler());
                     }
                 }
+                trafficShaping.map(TrafficShapingConfiguration::newHandler)
+                    .ifPresent(handler -> 
pipeline.addLast("trafficShaping",handler));
 
                 pipeline.addLast(CHUNK_WRITE_HANDLER, new 
ChunkedWriteHandler());
 
@@ -378,6 +388,8 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
         return imapChannelGroup.stream()
             .map(channel -> {
                 Optional<ImapSession> imapSession = 
Optional.ofNullable(channel.attr(IMAP_SESSION_ATTRIBUTE_KEY).get());
+                Optional<TrafficCounter> trafficCounter = 
Optional.ofNullable(channel.pipeline().get(ChannelTrafficShapingHandler.class))
+                    .map(AbstractTrafficShapingHandler::trafficCounter);
                 return new ConnectionDescription(
                     "IMAP",
                     jmxName,
@@ -388,22 +400,27 @@ public class IMAPServer extends 
AbstractConfigurableAsyncServer implements ImapC
                     channel.isWritable(),
                     imapSession.map(ImapSession::isTLSActive).orElse(false),
                     imapSession.flatMap(session -> 
Optional.ofNullable(session.getUserName())),
-                    ImmutableMap.of(
-                        "isCompressed", 
Boolean.toString(imapSession.map(ImapSession::isCompressionActive).orElse(false)),
-                        "selectedMailbox", imapSession.flatMap(session -> 
Optional.ofNullable(session.getSelected()))
+                    ImmutableMap.<String, String>builder()
+                        .put("isCompressed", 
Boolean.toString(imapSession.map(ImapSession::isCompressionActive).orElse(false)))
+                        .put("selectedMailbox", imapSession.flatMap(session -> 
Optional.ofNullable(session.getSelected()))
                             .map(SelectedMailbox::getMailboxId)
                             .map(MailboxId::serialize)
-                            .orElse(""),
-                        "isIdling", 
Boolean.toString(imapSession.flatMap(session -> 
Optional.ofNullable(session.getSelected()))
+                            .orElse(""))
+                        .put("isIdling", 
Boolean.toString(imapSession.flatMap(session -> 
Optional.ofNullable(session.getSelected()))
                             .map(SelectedMailbox::isIdling)
-                            .orElse(false)),
-                        "requestCount", 
Long.toString(Optional.ofNullable(channel.attr(REQUEST_COUNTER))
+                            .orElse(false)))
+                        .put("requestCount", 
Long.toString(Optional.ofNullable(channel.attr(REQUEST_COUNTER))
                             .flatMap(attribute -> 
Optional.ofNullable(attribute.get()))
                             .map(AtomicLong::get)
-                            .orElse(0L)),
-                        "userAgent", imapSession.flatMap(s -> 
Optional.ofNullable(s.getAttribute("userAgent")))
+                            .orElse(0L)))
+                        .put("userAgent", imapSession.flatMap(s -> 
Optional.ofNullable(s.getAttribute("userAgent")))
                             .map(Object::toString)
-                            .orElse("")));
+                            .orElse(""))
+                        .put("cumulativeWrittenBytes", 
Long.toString(trafficCounter.map(TrafficCounter::cumulativeWrittenBytes).orElse(0L)))
+                        .put("cumulativeReadBytes", 
Long.toString(trafficCounter.map(TrafficCounter::cumulativeReadBytes).orElse(0L)))
+                        .put("liveReadThroughputBytePerSecond", 
Long.toString(trafficCounter.map(TrafficCounter::lastReadThroughput).orElse(0L)))
+                        .put("liveWriteThroughputBytePerSecond", 
Long.toString(trafficCounter.map(TrafficCounter::lastWriteThroughput).orElse(0L)))
+                        .build());
             });
     }
 
diff --git 
a/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java
 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java
new file mode 100644
index 0000000000..b87a1faee3
--- /dev/null
+++ 
b/server/protocols/protocols-imap4/src/main/java/org/apache/james/imapserver/netty/TrafficShapingConfiguration.java
@@ -0,0 +1,19 @@
+package org.apache.james.imapserver.netty;
+
+import org.apache.commons.configuration2.Configuration;
+
+import io.netty.handler.traffic.ChannelTrafficShapingHandler;
+
+public record TrafficShapingConfiguration(long writeLimit, long readLimit, 
long checkInterval, long maxTime) {
+    static TrafficShapingConfiguration from(Configuration configuration) {
+        return new TrafficShapingConfiguration(
+            configuration.getLong("writeTrafficPerSecond", 0),
+            configuration.getLong("readTrafficPerSecond", 0),
+            configuration.getLong("checkInterval", 30),
+            configuration.getLong("maxDelays", 30));
+    }
+
+    public ChannelTrafficShapingHandler newHandler(){
+        return new ChannelTrafficShapingHandler(writeLimit, readLimit, 
checkInterval, maxTime);
+    }
+}
diff --git 
a/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml
 
b/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml
index ec86ad6ed7..8f376f58f5 100644
--- 
a/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml
+++ 
b/server/protocols/webadmin-integration-test/memory-webadmin-integration-test/src/test/resources/imapserver.xml
@@ -37,6 +37,12 @@ under the License.
         <connectionLimitPerIP>0</connectionLimitPerIP>
         <plainAuthDisallowed>false</plainAuthDisallowed>
         <gracefulShutdown>false</gracefulShutdown>
+        <trafficShaping>
+            <writeTrafficPerSecond>0</writeTrafficPerSecond>
+            <readTrafficPerSecond>0</readTrafficPerSecond>
+            <checkInterval>1</checkInterval>
+            <maxDelays>30</maxDelays>
+        </trafficShaping>
     </imapserver>
     <imapserver enabled="true">
         <jmxName>imapserver-ssl</jmxName>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to