This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 71bb4ab  JAMES-3693 Redis implementation RateLimiter (with 
configuration) (#848)
71bb4ab is described below

commit 71bb4ab68e2da3d025a2af842d054bbbaff74aff
Author: vttran <[email protected]>
AuthorDate: Fri Jan 28 09:36:30 2022 +0700

    JAMES-3693 Redis implementation RateLimiter (with configuration) (#848)
---
 pom.xml                                            | 12 ++++-
 server/mailet/rate-limiter-redis/pom.xml           | 47 +++++++++++++++++
 .../rate/limiter/redis/RedisRateLimiter.scala      | 59 ++++++++++++++++++++++
 .../redis/RedisRateLimiterConfiguration.scala      | 38 ++++++++++++++
 .../org/apache/james/rate/limiter/DockerRedis.java |  4 ++
 .../apache/james/rate/limiter/RedisExtension.java  |  5 ++
 .../james/rate/limiter/RedisRateLimiterTest.scala  | 42 +++++++++++++++
 7 files changed, 206 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index a11748c..0826b2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1767,7 +1767,17 @@
                 <artifactId>james-server-queue-rabbitmq</artifactId>
                 <version>${project.version}</version>
             </dependency>
-
+            <dependency>
+                <groupId>${james.groupId}</groupId>
+                <artifactId>james-server-rate-limiter</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>${james.groupId}</groupId>
+                <artifactId>james-server-rate-limiter</artifactId>
+                <version>${project.version}</version>
+                <type>test-jar</type>
+            </dependency>
             <dependency>
                 <groupId>${james.groupId}</groupId>
                 <artifactId>james-server-spring</artifactId>
diff --git a/server/mailet/rate-limiter-redis/pom.xml 
b/server/mailet/rate-limiter-redis/pom.xml
index 4fe8dae..61ae706 100644
--- a/server/mailet/rate-limiter-redis/pom.xml
+++ b/server/mailet/rate-limiter-redis/pom.xml
@@ -33,23 +33,70 @@
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>apache-mailet-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>apache-mailet-base</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-guice-common</artifactId>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-rate-limiter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-rate-limiter</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>testing-base</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>es.moki.ratelimitj</groupId>
+            <artifactId>ratelimitj-redis</artifactId>
+            <version>0.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>eu.timepit</groupId>
+            <artifactId>refined_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.lettuce</groupId>
             <artifactId>lettuce-core</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.testcontainers</groupId>
             <artifactId>testcontainers</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
new file mode 100644
index 0000000..6673d9c
--- /dev/null
+++ 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter.redis
+
+import es.moki.ratelimitj.core.limiter.request.{ReactiveRequestRateLimiter, 
RequestLimitRule}
+import es.moki.ratelimitj.redis.request.{RedisRateLimiterFactory => 
RateLimitjFactory}
+import io.lettuce.core.RedisClient
+import org.apache.james.rate.limiter.api.Increment.Increment
+import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, 
RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult, Rule, 
Rules}
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.SMono
+
+import scala.jdk.CollectionConverters._
+
+class RedisRateLimiterFactory(redisConfiguration: 
RedisRateLimiterConfiguration) extends RateLimiterFactory {
+  val rateLimitjFactory: RateLimitjFactory = new 
RateLimitjFactory(RedisClient.create(redisConfiguration.redisURI))
+
+  override def withSpecification(rules: Rules): RateLimiter = {
+    val requestRateLimiter: ReactiveRequestRateLimiter = 
rateLimitjFactory.getInstanceReactive(rules.rules
+      .map(convert)
+      .map(withPrecision)
+      .toSet.asJava)
+    RedisRateLimiter(requestRateLimiter)
+  }
+
+  private def withPrecision(rule: RequestLimitRule): RequestLimitRule =
+    redisConfiguration.windowPrecision
+      .map(rule.withPrecision)
+      .getOrElse(rule)
+
+  private def convert(rule: Rule): RequestLimitRule = 
RequestLimitRule.of(rule.duration, rule.quantity.value)
+
+}
+
+case class RedisRateLimiter(limiter: ReactiveRequestRateLimiter) extends 
RateLimiter {
+  override def rateLimit(key: RateLimitingKey, increaseQuantity: Increment): 
Publisher[RateLimitingResult] =
+    
SMono.fromPublisher(limiter.overLimitWhenIncrementedReactive(key.asString(), 
increaseQuantity.value))
+      .filter(isOverLimit => !isOverLimit)
+      .map(_ => AcceptableRate)
+      .switchIfEmpty(SMono.just(RateExceeded))
+
+}
diff --git 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
new file mode 100644
index 0000000..aa30400
--- /dev/null
+++ 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
@@ -0,0 +1,38 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter.redis
+
+import java.time.Duration
+
+import com.google.common.base.Preconditions
+import io.lettuce.core.RedisURI
+import org.apache.mailet.MailetConfig
+
+object RedisRateLimiterConfiguration {
+  def from(mailetConfig: MailetConfig): RedisRateLimiterConfiguration =
+    from(mailetConfig.getInitParameter("redisURL"))
+
+  def from(redisUri: String, windowPrecision: Option[Duration] = None): 
RedisRateLimiterConfiguration = {
+    Preconditions.checkArgument(redisUri != null && !redisUri.isBlank)
+    RedisRateLimiterConfiguration(RedisURI.create(redisUri), windowPrecision)
+  }
+}
+
+case class RedisRateLimiterConfiguration(redisURI: RedisURI, windowPrecision: 
Option[Duration] = None)
diff --git 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/DockerRedis.java
 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/DockerRedis.java
index ec70ab0..3dfcb5d 100644
--- 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/DockerRedis.java
+++ 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/DockerRedis.java
@@ -71,4 +71,8 @@ public class DockerRedis {
         return RedisClient.create(redisURI().toString())
             .connect().sync();
     }
+
+    public void flushAll() {
+        createClient().flushall();
+    }
 }
diff --git 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisExtension.java
 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisExtension.java
index fde4b70..bf0db74 100644
--- 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisExtension.java
+++ 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisExtension.java
@@ -37,6 +37,11 @@ public class RedisExtension implements 
GuiceModuleTestExtension {
         DOCKER_REDIS_SINGLETON.stop();
     }
 
+    @Override
+    public void beforeEach(ExtensionContext extensionContext) throws Exception 
{
+        DOCKER_REDIS_SINGLETON.flushAll();
+    }
+
     public DockerRedis dockerRedis() {
         return DOCKER_REDIS_SINGLETON;
     }
diff --git 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
new file mode 100644
index 0000000..2e9949e
--- /dev/null
+++ 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala
@@ -0,0 +1,42 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter
+
+import java.time.Duration
+
+import org.apache.james.rate.limiter.api.{RateLimiterContract, 
RateLimiterFactory}
+import org.apache.james.rate.limiter.redis.{RedisRateLimiterConfiguration, 
RedisRateLimiterFactory}
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.extension.ExtendWith
+
+@ExtendWith(Array(classOf[RedisExtension]))
+class RedisRateLimiterTest extends RateLimiterContract {
+
+  var redisRateLimiterConfiguration: RedisRateLimiterConfiguration = _
+
+  @BeforeEach
+  def setup(redis: DockerRedis): Unit = {
+    redisRateLimiterConfiguration = 
RedisRateLimiterConfiguration.from(redis.redisURI().toString, 
Some(Duration.ofSeconds(1)))
+  }
+
+  override def testee(): RateLimiterFactory = new 
RedisRateLimiterFactory(redisRateLimiterConfiguration)
+
+  override def sleep(duration: Duration): Unit = 
Thread.sleep(duration.toMillis)
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to