This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 12105c553b8ada9604927315f541f57c2f4f7755 Author: Tung Tran <[email protected]> AuthorDate: Mon May 13 11:53:36 2024 +0700 JAMES-3693 Refactor RedisConfiguration class Change to interface with 3 impl: - StandaloneRedisConfiguration - MasterReplicaRedisConfiguration - ClusterRedisConfiguration --- .../james/backends/redis/RedisConfiguration.scala | 123 +++++++++++++++------ .../james/backends/redis/RedisHealthCheck.scala | 27 +++-- .../backends/redis/RedisClusterExtension.java | 5 +- .../backends/redis/RedisConfigurationTest.scala | 23 +--- .../james/backends/redis/RedisExtension.java | 4 +- .../redis/RedisMasterReplicaExtension.java | 23 ++-- .../redis/RedisMasterReplicaHealthCheckTest.scala | 11 +- .../redis/RedisStandaloneHealthCheckTest.scala | 2 +- .../rate/limiter/redis/RedisRateLimiter.scala | 32 +++--- .../james/rate/limiter/RedisRateLimiterTest.scala | 4 +- ...sRateLimiterWithMasterReplicaTopologyTest.scala | 5 +- 11 files changed, 149 insertions(+), 110 deletions(-) diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala index 0cdf80a971..537774b6f0 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisConfiguration.scala @@ -19,12 +19,13 @@ package org.apache.james.backends.redis -import com.google.common.base.Preconditions +import com.google.common.base.{MoreObjects, Preconditions} import eu.timepit.refined import eu.timepit.refined.api.Refined import eu.timepit.refined.collection.NonEmpty -import io.lettuce.core.RedisURI +import io.lettuce.core.{ReadFrom, RedisURI} import org.apache.commons.configuration2.Configuration +import org.apache.james.backends.redis.RedisConfiguration.{CLUSTER_TOPOLOGY, MASTER_REPLICA_TOPOLOGY, STANDALONE_TOPOLOGY} import org.apache.james.backends.redis.RedisUris.RedisUris import org.slf4j.{Logger, LoggerFactory} @@ -35,36 +36,21 @@ object RedisConfiguration { val LOGGER: Logger = LoggerFactory.getLogger(classOf[RedisConfiguration]) - def from(config: Configuration): RedisConfiguration = { - val configuration = from(config.getStringArray("redisURL"), - config.getString("redis.topology", STANDALONE_TOPOLOGY) match { - case STANDALONE_TOPOLOGY => Standalone - case CLUSTER_TOPOLOGY => Cluster - case MASTER_REPLICA_TOPOLOGY => MasterReplica - case _ => throw new NotImplementedError() - }, - Option(config.getInteger("redis.ioThreads", null)).map(Integer2int), - Option(config.getInteger("redis.workerThreads", null)).map(Integer2int)) - - LOGGER.info("Redis was loaded with configuration: \n" + - "redisURL: {}\n" + - "redisTopology: {}\n" + - "redis.ioThreads: {}\n" + - "redis.workerThreads: {}", configuration.redisURI.value.map(_.toString).mkString(";"), - configuration.redisTopology, configuration.ioThreads, configuration.workerThreads) - - configuration - } + def redisIoThreadsFrom(config: Configuration): Option[Int] = Option(config.getInteger("redis.ioThreads", null)).map(Integer2int) - def from(redisUri: String, redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration = - from(Array(redisUri), redisTopology, ioThreads, workerThreads) + def redisWorkerThreadsFrom(config: Configuration): Option[Int] = Option(config.getInteger("redis.workerThreads", null)).map(Integer2int) - def from(redisUris: Array[String], redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads: Option[Int]): RedisConfiguration = { - Preconditions.checkArgument(redisUris != null && redisUris.length > 0) - RedisConfiguration(RedisUris.from(redisUris), redisTopology, ioThreads, workerThreads) - } + def from(config: Configuration): RedisConfiguration = { + val redisConfiguration: RedisConfiguration = config.getString("redis.topology", STANDALONE_TOPOLOGY) match { + case STANDALONE_TOPOLOGY => StandaloneRedisConfiguration.from(config) + case CLUSTER_TOPOLOGY => ClusterRedisConfiguration.from(config) + case MASTER_REPLICA_TOPOLOGY => MasterReplicaRedisConfiguration.from(config) + case _ => throw new NotImplementedError() + } - def from(redisUri: String, redisTopology: RedisTopology): RedisConfiguration = from(redisUri, redisTopology, None, None) + LOGGER.info(s"Configured Redis with: ${redisConfiguration.asString}") + redisConfiguration + } } object RedisUris { @@ -88,12 +74,81 @@ object RedisUris { def from(value: Array[String]): RedisUris = liftOrThrow(value.toList.map(RedisURI.create)) } -sealed trait RedisTopology +trait RedisConfiguration { + def ioThreads: Option[Int] + + def workerThreads: Option[Int] + + def asString: String +} + +object StandaloneRedisConfiguration { + def from(config: Configuration): StandaloneRedisConfiguration = from( + config.getString("redisURL"), + RedisConfiguration.redisIoThreadsFrom(config), + RedisConfiguration.redisWorkerThreadsFrom(config)) + + def from(redisUri: String): StandaloneRedisConfiguration = from(redisUri, None, None) + + def from(redisUri: String, ioThreads: Option[Int] = None, workerThreads: Option[Int] = None): StandaloneRedisConfiguration = + StandaloneRedisConfiguration(RedisURI.create(redisUri), ioThreads, workerThreads) +} + +case class StandaloneRedisConfiguration(redisURI: RedisURI, ioThreads: Option[Int], workerThreads: Option[Int]) extends RedisConfiguration { + override def asString: String = MoreObjects.toStringHelper(this) + .add("topology", STANDALONE_TOPOLOGY) + .add("redisURI", redisURI.toString) + .add("redis.ioThreads", ioThreads) + .add("redis.workerThreads", workerThreads) + .toString +} + +object MasterReplicaRedisConfiguration { + def from(config: Configuration): MasterReplicaRedisConfiguration = + from(config.getStringArray("redisURL"), + Option(config.getString("redis.readFrom", null)).map(ReadFrom.valueOf).getOrElse(ReadFrom.MASTER), + RedisConfiguration.redisIoThreadsFrom(config), + RedisConfiguration.redisWorkerThreadsFrom(config)) + + def from(redisUris: Array[String], + readFrom: ReadFrom, + ioThreads: Option[Int] = None, + workerThreads: Option[Int] = None): MasterReplicaRedisConfiguration = { + Preconditions.checkArgument(redisUris != null && redisUris.length > 0) + MasterReplicaRedisConfiguration(RedisUris.from(redisUris), + readFrom, + ioThreads, workerThreads) + } +} -case object Standalone extends RedisTopology +case class MasterReplicaRedisConfiguration(redisURI: RedisUris, readFrom: ReadFrom, ioThreads: Option[Int], workerThreads: Option[Int]) extends RedisConfiguration { + override def asString: String = MoreObjects.toStringHelper(this) + .add("topology", MASTER_REPLICA_TOPOLOGY) + .add("redisURI", redisURI.value.map(_.toString).mkString(";")) + .add("redis.ioThreads", ioThreads) + .add("redis.workerThreads", workerThreads) + .toString +} -case object Cluster extends RedisTopology +object ClusterRedisConfiguration { + def from(config: Configuration): ClusterRedisConfiguration = + from(config.getStringArray("redisURL"), + RedisConfiguration.redisIoThreadsFrom(config), + RedisConfiguration.redisWorkerThreadsFrom(config)) -case object MasterReplica extends RedisTopology + def from(redisUris: Array[String], + ioThreads: Option[Int] = None, + workerThreads: Option[Int] = None): ClusterRedisConfiguration = { + Preconditions.checkArgument(redisUris != null && redisUris.length > 0) + ClusterRedisConfiguration(RedisUris.from(redisUris), ioThreads, workerThreads) + } +} -case class RedisConfiguration(redisURI: RedisUris, redisTopology: RedisTopology, ioThreads: Option[Int], workerThreads:Option[Int]) +case class ClusterRedisConfiguration(redisURI: RedisUris, ioThreads: Option[Int], workerThreads: Option[Int]) extends RedisConfiguration { + override def asString: String = MoreObjects.toStringHelper(this) + .add("topology", CLUSTER_TOPOLOGY) + .add("redisURI", redisURI.value.map(_.toString).mkString(";")) + .add("redis.ioThreads", ioThreads) + .add("redis.workerThreads", workerThreads) + .toString +} \ No newline at end of file diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala index d149f3f060..5cdfe5a9c1 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala @@ -46,10 +46,11 @@ object RedisHealthCheck { class RedisHealthCheck @Inject()(redisConfiguration: RedisConfiguration) extends HealthCheck { private val healthcheckTimeout: Duration = Duration.ofSeconds(3) - private val healthcheckPerform: RedisHealthcheckPerform = redisConfiguration.redisTopology match { - case Cluster => new RedisClusterHealthCheckPerform(redisConfiguration, healthcheckTimeout) - case MasterReplica => new RedisMasterReplicaHealthCheckPerform(redisConfiguration, healthcheckTimeout) - case Standalone => new RedisStandaloneHealthCheckPerform(redisConfiguration, healthcheckTimeout) + private val healthcheckPerform: RedisHealthcheckPerform = redisConfiguration match { + case standaloneConfiguration: StandaloneRedisConfiguration => new RedisStandaloneHealthCheckPerform(standaloneConfiguration, healthcheckTimeout) + case clusterConfiguration: ClusterRedisConfiguration => new RedisClusterHealthCheckPerform(clusterConfiguration, healthcheckTimeout) + case masterReplicaConfiguration: MasterReplicaRedisConfiguration => new RedisMasterReplicaHealthCheckPerform(masterReplicaConfiguration, healthcheckTimeout) + case _ => throw new NotImplementedError() } override def componentName(): ComponentName = redisComponent @@ -68,7 +69,7 @@ sealed trait RedisHealthcheckPerform { def close(): Unit } -class RedisClusterHealthCheckPerform(val redisConfiguration: RedisConfiguration, +class RedisClusterHealthCheckPerform(val redisConfiguration: ClusterRedisConfiguration, val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { private val CLUSTER_STATUS_OK: String = "ok" @@ -98,17 +99,15 @@ class RedisClusterHealthCheckPerform(val redisConfiguration: RedisConfiguration, .subscribe() } -class RedisStandaloneHealthCheckPerform(val redisConfiguration: RedisConfiguration, +class RedisStandaloneHealthCheckPerform(val redisConfiguration: StandaloneRedisConfiguration, val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { private val PING_SUCCESS_RESPONSE = "PONG" - private val redisClient: RedisClient = RedisClient.create( - redisConfiguration.redisURI.value - .map(rURI => { - rURI.setTimeout(healthcheckTimeout) - rURI - }).last) + private val redisClient: RedisClient = { + redisConfiguration.redisURI.setTimeout(healthcheckTimeout) + RedisClient.create(redisConfiguration.redisURI) + } private val redisCommand: RedisReactiveCommands[String, String] = redisClient.connect().reactive() @@ -126,8 +125,8 @@ class RedisStandaloneHealthCheckPerform(val redisConfiguration: RedisConfigurati } -class RedisMasterReplicaHealthCheckPerform(val redisConfiguration: RedisConfiguration, - val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { +class RedisMasterReplicaHealthCheckPerform(val redisConfiguration: MasterReplicaRedisConfiguration, + val healthcheckTimeout: Duration) extends RedisHealthcheckPerform { private val PING_SUCCESS_RESPONSE = "PONG" diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java index 4090faa567..315b5031dc 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisClusterExtension.java @@ -61,12 +61,11 @@ public class RedisClusterExtension implements GuiceModuleTestExtension { super(c); } - public RedisConfiguration getRedisConfiguration() { - return RedisConfiguration.from(this.stream() + public ClusterRedisConfiguration getRedisConfiguration() { + return ClusterRedisConfiguration.from(this.stream() .map(redisURIFunction()) .map(URI::toString) .toArray(String[]::new), - Cluster$.MODULE$, OptionConverters.toScala(Optional.empty()), OptionConverters.toScala(Optional.empty())); } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala index 2638525331..f56e61c577 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisConfigurationTest.scala @@ -19,7 +19,7 @@ package org.apache.james.backends.redis -import io.lettuce.core.RedisURI +import io.lettuce.core.{ReadFrom, RedisURI} import org.apache.commons.configuration2.PropertiesConfiguration import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler import org.scalatest.flatspec.AnyFlatSpec @@ -35,12 +35,7 @@ class RedisConfigurationTest extends AnyFlatSpec with Matchers { config.addProperty("redis.workerThreads", 32) val redisConfig = RedisConfiguration.from(config) - - redisConfig.redisURI.value should have length 1 - redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379")) - redisConfig.redisTopology shouldEqual MasterReplica - redisConfig.ioThreads shouldEqual Some(16) - redisConfig.workerThreads shouldEqual Some(32) + redisConfig shouldEqual MasterReplicaRedisConfiguration.from(Array("redis://localhost:6379"), ReadFrom.MASTER, Some(16), Some(32)) } it should "parse multiple Redis URIs from config" in { @@ -52,12 +47,7 @@ class RedisConfigurationTest extends AnyFlatSpec with Matchers { config.addProperty("redis.workerThreads", 32) val redisConfig = RedisConfiguration.from(config) - - redisConfig.redisURI.value should have length 2 - redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380")) - redisConfig.redisTopology shouldEqual Cluster - redisConfig.ioThreads shouldEqual Some(16) - redisConfig.workerThreads shouldEqual Some(32) + redisConfig shouldEqual ClusterRedisConfiguration(RedisUris.liftOrThrow(List(RedisURI.create("redis://localhost:6379"), RedisURI.create("redis://localhost:6380"))), Some(16), Some(32)) } it should "use default values for missing config values" in { @@ -65,12 +55,7 @@ class RedisConfigurationTest extends AnyFlatSpec with Matchers { config.addProperty("redisURL", "redis://localhost:6379") val redisConfig = RedisConfiguration.from(config) - - redisConfig.redisURI.value should have length 1 - redisConfig.redisURI.value should contain theSameElementsAs List(RedisURI.create("redis://localhost:6379")) - redisConfig.redisTopology shouldEqual Standalone - redisConfig.ioThreads shouldEqual None - redisConfig.workerThreads shouldEqual None + redisConfig shouldEqual StandaloneRedisConfiguration(RedisURI.create("redis://localhost:6379"), None, None) } it should "throw exception for invalid Redis URI" in { diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java index 4936ef2a0b..d90e636269 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisExtension.java @@ -53,8 +53,8 @@ public class RedisExtension implements GuiceModuleTestExtension { return new AbstractModule() { @Provides @Singleton - public RedisConfiguration provideConfig() { - return RedisConfiguration.from(dockerRedis().redisURI().toString(), Standalone$.MODULE$); + public RedisConfiguration provideConfig() { + return StandaloneRedisConfiguration.from(dockerRedis().redisURI().toString()); } }; } diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java index 2f6cf2c02a..22c3ec206a 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaExtension.java @@ -50,22 +50,23 @@ import com.google.inject.AbstractModule; import com.google.inject.Module; import com.google.inject.Provides; +import io.lettuce.core.ReadFrom; import scala.Function2; import scala.jdk.javaapi.OptionConverters; public class RedisMasterReplicaExtension implements GuiceModuleTestExtension { - public static class RedisClusterContainer extends ArrayList<GenericContainer> { - public RedisClusterContainer(Collection<? extends GenericContainer> c) { + public static class RedisMasterReplicaContainer extends ArrayList<GenericContainer> { + public RedisMasterReplicaContainer(Collection<? extends GenericContainer> c) { super(c); } - public RedisConfiguration getRedisConfiguration() { - return RedisConfiguration.from(this.stream() + public MasterReplicaRedisConfiguration getRedisConfiguration() { + return MasterReplicaRedisConfiguration.from(this.stream() .map(redisURIFunction()) .map(URI::toString) .toArray(String[]::new), - MasterReplica$.MODULE$, + ReadFrom.MASTER, OptionConverters.toScala(Optional.empty()), OptionConverters.toScala(Optional.empty())); } @@ -101,7 +102,7 @@ public class RedisMasterReplicaExtension implements GuiceModuleTestExtension { static final GenericContainer redis2 = redisContainerSupplier.apply("redis2", true); static final GenericContainer redis3 = redisContainerSupplier.apply("redis3", true); - private RedisClusterContainer redisClusterContainer; + private RedisMasterReplicaContainer redisMasterReplicaContainer; private final Network network; public RedisMasterReplicaExtension() { @@ -120,7 +121,7 @@ public class RedisMasterReplicaExtension implements GuiceModuleTestExtension { redis1.start(); redis2.start(); redis3.start(); - redisClusterContainer = new RedisClusterContainer(List.of(redis1, redis2, redis3)); + redisMasterReplicaContainer = new RedisMasterReplicaContainer(List.of(redis1, redis2, redis3)); } @Override @@ -134,7 +135,7 @@ public class RedisMasterReplicaExtension implements GuiceModuleTestExtension { @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - redisClusterContainer.forEach(Throwing.consumer(container -> container.execInContainer("redis-cli", "flushall"))); + redisMasterReplicaContainer.forEach(Throwing.consumer(container -> container.execInContainer("redis-cli", "flushall"))); } @Override @@ -143,19 +144,19 @@ public class RedisMasterReplicaExtension implements GuiceModuleTestExtension { @Provides @Singleton public RedisConfiguration provideRedisConfiguration() { - return redisClusterContainer.getRedisConfiguration(); + return redisMasterReplicaContainer.getRedisConfiguration(); } }; } @Override public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return parameterContext.getParameter().getType() == RedisClusterContainer.class; + return parameterContext.getParameter().getType() == RedisMasterReplicaContainer.class; } @Override public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { - return new RedisClusterContainer(List.of(redis1, redis2, redis3)); + return new RedisMasterReplicaContainer(List.of(redis1, redis2, redis3)); } private static Function<GenericContainer, URI> redisURIFunction() { diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala index cda4a2686b..12b49a151d 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisMasterReplicaHealthCheckTest.scala @@ -20,8 +20,7 @@ package org.apache.james.backends.redis import java.util.concurrent.TimeUnit - -import org.apache.james.backends.redis.RedisMasterReplicaExtension.RedisClusterContainer +import org.apache.james.backends.redis.RedisMasterReplicaExtension.RedisMasterReplicaContainer import org.assertj.core.api.Assertions.assertThat import org.awaitility.Awaitility import org.junit.jupiter.api.extension.ExtendWith @@ -33,12 +32,12 @@ class RedisMasterReplicaHealthCheckTest { var redisHealthCheck: RedisHealthCheck = _ @BeforeEach - def setup(redis: RedisClusterContainer): Unit = { + def setup(redis: RedisMasterReplicaContainer): Unit = { redisHealthCheck = new RedisHealthCheck(redis.getRedisConfiguration) } @AfterEach - def afterEach(redis: RedisClusterContainer): Unit = { + def afterEach(redis: RedisMasterReplicaContainer): Unit = { redis.unPauseOne(); } @@ -50,7 +49,7 @@ class RedisMasterReplicaHealthCheckTest { } @Test - def checkShouldReturnDegradedWhenRedisIsDown(redis: RedisClusterContainer): Unit = { + def checkShouldReturnDegradedWhenRedisIsDown(redis: RedisMasterReplicaContainer): Unit = { redis.pauseOne() Awaitility.await() @@ -60,7 +59,7 @@ class RedisMasterReplicaHealthCheckTest { } @Test - def checkShouldReturnHealthyWhenRedisIsRecovered(redis: RedisClusterContainer): Unit = { + def checkShouldReturnHealthyWhenRedisIsRecovered(redis: RedisMasterReplicaContainer): Unit = { redis.pauseOne() redis.unPauseOne() diff --git a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala index c450ec482b..0b8de209b0 100644 --- a/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala +++ b/backends-common/redis/src/test/java/org/apache/james/backends/redis/RedisStandaloneHealthCheckTest.scala @@ -33,7 +33,7 @@ class RedisStandaloneHealthCheckTest { @BeforeEach def setup(redis: DockerRedis): Unit = { - val redisConfiguration: RedisConfiguration = RedisConfiguration.from(redis.redisURI().toString, Standalone) + val redisConfiguration: StandaloneRedisConfiguration = StandaloneRedisConfiguration.from(redis.redisURI().toString) redisHealthCheck = new RedisHealthCheck(redisConfiguration) } diff --git a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala index 25514136d5..027e24f2f2 100644 --- a/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala +++ b/server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisRateLimiter.scala @@ -28,7 +28,7 @@ import io.lettuce.core.RedisClient import io.lettuce.core.cluster.RedisClusterClient import io.lettuce.core.resource.ClientResources import jakarta.inject.Inject -import org.apache.james.backends.redis.{Cluster, MasterReplica, RedisConfiguration, Standalone} +import org.apache.james.backends.redis.{ClusterRedisConfiguration, MasterReplicaRedisConfiguration, RedisConfiguration, StandaloneRedisConfiguration} import org.apache.james.rate.limiter.api.Increment.Increment import org.apache.james.rate.limiter.api.{AcceptableRate, RateExceeded, RateLimiter, RateLimiterFactory, RateLimitingKey, RateLimitingResult, Rule, Rules} import org.apache.james.util.concurrent.NamedThreadFactory @@ -52,21 +52,21 @@ class RedisRateLimiterModule() extends AbstractModule { } class RedisRateLimiterFactory @Inject()(redisConfiguration: RedisConfiguration) extends RateLimiterFactory { - val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = { - redisConfiguration.redisTopology match { - case Cluster => - val resourceBuilder = ClientResources.builder() - .threadFactoryProvider(poolName => NamedThreadFactory.withName(s"redis-driver-$poolName")) - redisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value)) - redisConfiguration.workerThreads.foreach(value =>resourceBuilder.computationThreadPoolSize(value)) - new RedisClusterRateLimiterFactory(RedisClusterClient.create(resourceBuilder.build(), - redisConfiguration.redisURI.value.asJava)) - case MasterReplica => - new RedisMasterReplicaRateLimiterFactory(RedisClient.create(redisConfiguration.redisURI.value.last), redisConfiguration.redisURI.value.asJava) - case Standalone => - new RedisSingleInstanceRateLimitjFactory(RedisClient.create(redisConfiguration.redisURI.value.last)) - case _ => throw new NotImplementedError() - } + val rateLimitjFactory: AbstractRequestRateLimiterFactory[RedisSlidingWindowRequestRateLimiter] = redisConfiguration match { + case standaloneConfiguration: StandaloneRedisConfiguration => new RedisSingleInstanceRateLimitjFactory(RedisClient.create(standaloneConfiguration.redisURI)) + + case clusterRedisConfiguration: ClusterRedisConfiguration => + val resourceBuilder: ClientResources.Builder = ClientResources.builder() + .threadFactoryProvider(poolName => NamedThreadFactory.withName(s"redis-driver-$poolName")) + redisConfiguration.ioThreads.foreach(value => resourceBuilder.ioThreadPoolSize(value)) + redisConfiguration.workerThreads.foreach(value => resourceBuilder.computationThreadPoolSize(value)) + new RedisClusterRateLimiterFactory(RedisClusterClient.create(resourceBuilder.build(), clusterRedisConfiguration.redisURI.value.asJava)) + + case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => new RedisMasterReplicaRateLimiterFactory( + RedisClient.create(masterReplicaRedisConfiguration.redisURI.value.last), + masterReplicaRedisConfiguration.redisURI.value.asJava) + + case _ => throw new NotImplementedError() } override def withSpecification(rules: Rules, precision: Option[Duration]): RateLimiter = diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala index 79a5fd52a0..19d39b8aa6 100644 --- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala +++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterTest.scala @@ -19,9 +19,9 @@ package org.apache.james.rate.limiter -import org.apache.james.backends.redis.{DockerRedis, RedisConfiguration, RedisExtension, Standalone} import java.time.Duration +import org.apache.james.backends.redis.{RedisConfiguration, DockerRedis, RedisExtension, StandaloneRedisConfiguration} import org.apache.james.rate.limiter.api.{RateLimiterContract, RateLimiterFactory} import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory import org.junit.jupiter.api.BeforeEach @@ -34,7 +34,7 @@ class RedisRateLimiterTest extends RateLimiterContract { @BeforeEach def setup(redis: DockerRedis): Unit = { - redisRateLimiterConfiguration = RedisConfiguration.from(redis.redisURI().toString, Standalone) + redisRateLimiterConfiguration = StandaloneRedisConfiguration.from(redis.redisURI().toString) } override def testee(): RateLimiterFactory = new RedisRateLimiterFactory(redisRateLimiterConfiguration) diff --git a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala index ee0c33e15e..36987d75d7 100644 --- a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala +++ b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/RedisRateLimiterWithMasterReplicaTopologyTest.scala @@ -23,6 +23,7 @@ import java.time.Duration import eu.timepit.refined.auto._ import org.apache.james.backends.redis.RedisMasterReplicaExtension +import org.apache.james.backends.redis.RedisMasterReplicaExtension.RedisMasterReplicaContainer import org.apache.james.rate.limiter.RedisRateLimiterWithMasterReplicaTopologyTest.{RULES, SLIDING_WIDOW_PRECISION} import org.apache.james.rate.limiter.api.{AcceptableRate, RateLimitingKey, RateLimitingResult, Rule, Rules} import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory @@ -41,12 +42,12 @@ class RedisRateLimiterWithMasterReplicaTopologyTest { var rateLimiterFactory: RedisRateLimiterFactory = _ @BeforeEach - def setup(redisClusterContainer: RedisMasterReplicaExtension.RedisClusterContainer): Unit = { + def setup(redisClusterContainer: RedisMasterReplicaContainer): Unit = { rateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.getRedisConfiguration) } @Test - def test(redisClusterContainer: RedisMasterReplicaExtension.RedisClusterContainer): Unit = { + def test(redisClusterContainer: RedisMasterReplicaContainer): Unit = { val rateLimiterFactory = new RedisRateLimiterFactory(redisClusterContainer.getRedisConfiguration) val rateLimiter = rateLimiterFactory.withSpecification(RULES, SLIDING_WIDOW_PRECISION) val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key1"), 4)).block() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
