This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2a33a003a4a1d7e8f985a8e266e6538c46817a90 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jan 28 11:19:48 2022 +0700 JAMES-3693 Support connections to a Redis Cluster Changeset tested manually --- .../rate/limiter/redis/RedisRateLimiter.scala | 12 +++++++--- .../redis/RedisRateLimiterConfiguration.scala | 27 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala index 6673d9c..e0ecaa3 100644 --- a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala +++ b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala @@ -19,9 +19,10 @@ package org.apache.james.rate.limiter.redis -import es.moki.ratelimitj.core.limiter.request.{ReactiveRequestRateLimiter, RequestLimitRule} -import es.moki.ratelimitj.redis.request.{RedisRateLimiterFactory => RateLimitjFactory} +import es.moki.ratelimitj.core.limiter.request.{AbstractRequestRateLimiterFactory, ReactiveRequestRateLimiter, RequestLimitRule} +import es.moki.ratelimitj.redis.request.{RedisClusterRateLimiterFactory, RedisSlidingWindowRequestRateLimiter, RedisRateLimiterFactory => RedisSingleInstanceRateLimitjFactory} import io.lettuce.core.RedisClient +import io.lettuce.core.cluster.RedisClusterClient import org.apache.james.rate.limiter.api.Increment.Increment import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult, Rule, Rules} import org.reactivestreams.Publisher @@ -30,7 +31,12 @@ import reactor.core.scala.publisher.SMono import scala.jdk.CollectionConverters._ class RedisRateLimiterFactory(redisConfiguration: RedisRateLimiterConfiguration) extends RateLimiterFactory { - val rateLimitjFactory: RateLimitjFactory = new RateLimitjFactory(RedisClient.create(redisConfiguration.redisURI)) + val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = + if (redisConfiguration.redisURI.value.size > 1) { + new RedisClusterRateLimiterFactory(RedisClusterClient.create(redisConfiguration.redisURI.value.asJava)) + } else { + new RedisSingleInstanceRateLimitjFactory(RedisClient.create(redisConfiguration.redisURI.value.last)) + } override def withSpecification(rules: Rules): RateLimiter = { val requestRateLimiter: ReactiveRequestRateLimiter = rateLimitjFactory.getInstanceReactive(rules.rules diff --git a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala index aa30400..7a84e19 100644 --- a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala +++ b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiterConfiguration.scala @@ -22,7 +22,11 @@ package org.apache.james.rate.limiter.redis import java.time.Duration import com.google.common.base.Preconditions +import eu.timepit.refined +import eu.timepit.refined.api.Refined +import eu.timepit.refined.collection.NonEmpty import io.lettuce.core.RedisURI +import org.apache.james.rate.limiter.redis.RedisUris.RedisUris import org.apache.mailet.MailetConfig object RedisRateLimiterConfiguration { @@ -31,8 +35,27 @@ object RedisRateLimiterConfiguration { def from(redisUri: String, windowPrecision: Option[Duration] = None): RedisRateLimiterConfiguration = { Preconditions.checkArgument(redisUri != null && !redisUri.isBlank) - RedisRateLimiterConfiguration(RedisURI.create(redisUri), windowPrecision) + RedisRateLimiterConfiguration(RedisUris.from(redisUri), windowPrecision) } } -case class RedisRateLimiterConfiguration(redisURI: RedisURI, windowPrecision: Option[Duration] = None) +object RedisUris { + type RedisUrisConstraint = NonEmpty + type RedisUris = List[RedisURI] Refined RedisUrisConstraint + + def validate(value: List[RedisURI]): Either[IllegalArgumentException, RedisUris] = + refined.refineV[RedisUrisConstraint](value) match { + case Right(value) => Right(value) + case Left(error) => Left(new IllegalArgumentException(error)) + } + + def liftOrThrow(value: List[RedisURI]): RedisUris = + validate(value) match { + case Right(value) => value + case Left(error) => throw error + } + + def from(value: String): RedisUris = liftOrThrow(value.split(',').toList.map(RedisURI.create)) +} + +case class RedisRateLimiterConfiguration(redisURI: RedisUris, windowPrecision: Option[Duration] = None) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
