This is an automated email from the ASF dual-hosted git repository.

hqtran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 86e4c4272c27c3060d5b109aa793255cc2d5556f
Author: Quan Tran <hqt...@linagora.com>
AuthorDate: Mon Mar 31 14:50:29 2025 +0700

    JAMES-4124 Rate limiting tests with Kvrocks Sentinel
---
 .../limiter/KvrocksSentinelRateLimiterTest.scala   | 100 +++++++++++++++++++++
 1 file changed, 100 insertions(+)

diff --git 
a/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/KvrocksSentinelRateLimiterTest.scala
 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/KvrocksSentinelRateLimiterTest.scala
new file mode 100644
index 0000000000..f50ba54918
--- /dev/null
+++ 
b/server/mailet/rate-limiter-redis/src/test/java/org/apache/james/rate/limiter/KvrocksSentinelRateLimiterTest.scala
@@ -0,0 +1,100 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.rate.limiter
+
+import java.time.Duration
+import java.util.UUID
+import java.util.concurrent.TimeUnit
+
+import eu.timepit.refined.auto._
+import org.apache.james.backends.redis.KvrocksSentinelExtension.KvrocksSentinel
+import org.apache.james.backends.redis.{KvrocksSentinelExtension, 
RedisClientFactory}
+import org.apache.james.rate.limiter.KvrocksSentinelRateLimiterTest.{RULES, 
SLIDING_WIDOW_PRECISION}
+import org.apache.james.rate.limiter.api._
+import org.apache.james.rate.limiter.redis.RedisRateLimiterFactory
+import org.apache.james.server.core.filesystem.FileSystemImpl
+import org.assertj.core.api.Assertions.{assertThat, assertThatCode}
+import org.awaitility.Awaitility
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{AfterEach, Test}
+import reactor.core.scala.publisher.SMono
+
+object KvrocksSentinelRateLimiterTest {
+  val SLIDING_WIDOW_PRECISION: Option[Duration] = Some(Duration.ofSeconds(1))
+  val RULES: Rules = Rules(Seq(Rule(4L, Duration.ofSeconds(2))))
+}
+
+@ExtendWith(Array(classOf[KvrocksSentinelExtension]))
+class KvrocksSentinelRateLimiterTest {
+
+  @AfterEach
+  def afterEach(kvrocksSentinel: KvrocksSentinel): Unit = {
+    kvrocksSentinel.redisSentinelContainerList().unPauseFirstNode()
+    kvrocksSentinel.kvrocksMasterReplicaContainerList().unPauseMasterNode()
+  }
+
+  @Test
+  def rateLimitShouldBeAcceptableWhenLimitIsAcceptable(kvrocksSentinel: 
KvrocksSentinel): Unit = {
+    val rateLimiterFactory: RedisRateLimiterFactory = new 
RedisRateLimiterFactory(kvrocksSentinel.redisSentinelContainerList.getRedisConfiguration,
+      new RedisClientFactory(FileSystemImpl.forTesting()))
+    val rateLimiter = rateLimiterFactory.withSpecification(RULES, 
SLIDING_WIDOW_PRECISION)
+    val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key" 
+ UUID.randomUUID().toString), 4)).block()
+    assertThat(actual).isEqualTo(AcceptableRate)
+  }
+
+  @Test
+  def rateLimitShouldWorkNormallyWhenLimitExceeded(kvrocksSentinel: 
KvrocksSentinel): Unit = {
+    val rateLimiterFactory: RedisRateLimiterFactory = new 
RedisRateLimiterFactory(kvrocksSentinel.redisSentinelContainerList.getRedisConfiguration,
+      new RedisClientFactory(FileSystemImpl.forTesting()))
+    val rateLimiter = rateLimiterFactory.withSpecification(RULES, 
SLIDING_WIDOW_PRECISION)
+    val actual: RateLimitingResult = SMono(rateLimiter.rateLimit(TestKey("key" 
+ UUID.randomUUID().toString), 5)).block()
+    assertThat(actual).isEqualTo(RateExceeded)
+  }
+
+  @Test
+  def rateLimitShouldWorkNormallyAfterFailoverComplete(kvrocksSentinel: 
KvrocksSentinel): Unit = {
+    val rateLimiterFactory: RedisRateLimiterFactory = new 
RedisRateLimiterFactory(kvrocksSentinel.redisSentinelContainerList.getRedisConfiguration,
+      new RedisClientFactory(FileSystemImpl.forTesting()))
+    val rateLimiter = rateLimiterFactory.withSpecification(RULES, 
SLIDING_WIDOW_PRECISION)
+
+    // Before failover, the rate limit should be working normally
+    assertThat(SMono(rateLimiter.rateLimit(TestKey("key" + 
UUID.randomUUID().toString), 5)).block())
+      .isEqualTo(RateExceeded)
+
+    // Pause first sentinel node
+    kvrocksSentinel.redisSentinelContainerList().pauseFirstNode()
+    // Give stop master node
+    kvrocksSentinel.kvrocksMasterReplicaContainerList().pauseMasterNode()
+    // Sleep for a while to let sentinel detect the failover. Here is 5 seconds
+    Thread.sleep(5000)
+
+    // After failover, the rate limit should be working normally
+    Awaitility.await()
+      .pollInterval(2, TimeUnit.SECONDS)
+      .atMost(100, TimeUnit.SECONDS)
+      .untilAsserted(() => assertThatCode(() => 
SMono(rateLimiter.rateLimit(TestKey("key" + UUID.randomUUID().toString), 
1)).block())
+        .doesNotThrowAnyException())
+
+    assertThat(SMono(rateLimiter.rateLimit(TestKey("key" + 
UUID.randomUUID().toString), 10)).block())
+      .isNotNull
+    assertThat(SMono(rateLimiter.rateLimit(TestKey("key" + 
UUID.randomUUID().toString), 3)).block())
+      .isEqualTo(AcceptableRate)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to