This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2a33a003a4a1d7e8f985a8e266e6538c46817a90
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri Jan 28 11:19:48 2022 +0700

    JAMES-3693 Support connections to a Redis Cluster
    
    Changeset tested manually
---
 .../rate/limiter/redis/RedisRateLimiter.scala      | 12 +++++++---
 .../redis/RedisRateLimiterConfiguration.scala      | 27 ++++++++++++++++++++--
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
index 6673d9c..e0ecaa3 100644
--- 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
+++ 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala
@@ -19,9 +19,10 @@
 
 package org.apache.james.rate.limiter.redis
 
-import es.moki.ratelimitj.core.limiter.request.{ReactiveRequestRateLimiter, 
RequestLimitRule}
-import es.moki.ratelimitj.redis.request.{RedisRateLimiterFactory => 
RateLimitjFactory}
+import 
es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, 
ReactiveRequestRateLimiter, RequestLimitRule}
+import es.moki.ratelimitj.redis.request.{RedisClusterRateLimiterFactory, 
RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => 
RedisSingleInstanceRateLimitjFactory}
 import io.lettuce.core.RedisClient
+import io.lettuce.core.cluster.RedisClusterClient
 import org.apache.james.rate.limiter.api.Increment.Increment
 import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, 
RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult, Rule, 
Rules}
 import org.reactivestreams.Publisher
@@ -30,7 +31,12 @@ import reactor.core.scala.publisher.SMono
 import scala.jdk.CollectionConverters._
 
 class RedisRateLimiterFactory(redisConfiguration: 
RedisRateLimiterConfiguration) extends RateLimiterFactory {
-  val rateLimitjFactory: RateLimitjFactory = new 
RateLimitjFactory(RedisClient.create(redisConfiguration.redisURI))
+  val rateLimitjFactory: 
AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] =
+    if (redisConfiguration.redisURI.value.size > 1) {
+      new 
RedisClusterRateLimiterFactory(RedisClusterClient.create(redisConfiguration.redisURI.value.asJava))
+    } else {
+      new 
RedisSingleInstanceRateLimitjFactory(RedisClient.create(redisConfiguration.redisURI.value.last))
+    }
 
   override def withSpecification(rules: Rules): RateLimiter = {
     val requestRateLimiter: ReactiveRequestRateLimiter = 
rateLimitjFactory.getInstanceReactive(rules.rules
diff --git 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
index aa30400..7a84e19 100644
--- 
a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
+++ 
b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala
@@ -22,7 +22,11 @@ package org.apache.james.rate.limiter.redis
 import java.time.Duration
 
 import com.google.common.base.Preconditions
+import eu.timepit.refined
+import eu.timepit.refined.api.Refined
+import eu.timepit.refined.collection.NonEmpty
 import io.lettuce.core.RedisURI
+import org.apache.james.rate.limiter.redis.RedisUris.RedisUris
 import org.apache.mailet.MailetConfig
 
 object RedisRateLimiterConfiguration {
@@ -31,8 +35,27 @@ object RedisRateLimiterConfiguration {
 
   def from(redisUri: String, windowPrecision: Option[Duration] = None): 
RedisRateLimiterConfiguration = {
     Preconditions.checkArgument(redisUri != null && !redisUri.isBlank)
-    RedisRateLimiterConfiguration(RedisURI.create(redisUri), windowPrecision)
+    RedisRateLimiterConfiguration(RedisUris.from(redisUri), windowPrecision)
   }
 }
 
-case class RedisRateLimiterConfiguration(redisURI: RedisURI, windowPrecision: 
Option[Duration] = None)
+object RedisUris {
+  type RedisUrisConstraint = NonEmpty
+  type RedisUris = List[RedisURI] Refined RedisUrisConstraint
+
+  def validate(value: List[RedisURI]): Either[IllegalArgumentException, 
RedisUris] =
+    refined.refineV[RedisUrisConstraint](value) match {
+      case Right(value) => Right(value)
+      case Left(error) => Left(new IllegalArgumentException(error))
+    }
+
+  def liftOrThrow(value: List[RedisURI]): RedisUris =
+    validate(value) match {
+      case Right(value) => value
+      case Left(error) => throw error
+    }
+
+  def from(value: String): RedisUris = 
liftOrThrow(value.split(',').toList.map(RedisURI.create))
+}
+
+case class RedisRateLimiterConfiguration(redisURI: RedisUris, windowPrecision: 
Option[Duration] = None)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to