chibenwa commented on code in PR #1671:
URL: https://github.com/apache/james-project/pull/1671#discussion_r1295394799
##########
server/mailet/rate-limiter-redis/src/main/scala/org/apache/james/rate/limiter/redis/RedisHealthCheck.scala:
##########
@@ -20,33 +20,43 @@
package org.apache.james.rate.limiter.redis
import java.time.Duration
-import java.util.concurrent.TimeUnit
-import eu.timepit.refined.auto._
+import io.lettuce.core.api.StatefulConnection
+import io.lettuce.core.cluster.RedisClusterClient
+import io.lettuce.core.{RedisClient, RedisURI}
import javax.inject.Inject
import org.apache.james.core.healthcheck.{ComponentName, HealthCheck, Result}
-import org.apache.james.rate.limiter.api.{RateLimitingKey, Rule, Rules}
import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
import reactor.core.scala.publisher.SMono
-import scala.concurrent.duration.FiniteDuration
+import scala.jdk.CollectionConverters._
-sealed class RedisHealthCheckKey() extends RateLimitingKey {
- override def asString(): String = "RedisHealthCheck"
-}
-
-class RedisHealthCheck @Inject()(redisRateLimiterFactory:
RedisRateLimiterFactory) extends HealthCheck {
+class RedisHealthCheck @Inject()(redisConfiguration:
RedisRateLimiterConfiguration) extends HealthCheck {
private val redisComponent: ComponentName = new ComponentName("Redis")
- private val redisHealthcheckKey: RedisHealthCheckKey = new
RedisHealthCheckKey
- private val flexibleRateLimitRule = Rules(Seq(Rule(100000L,
Duration.ofSeconds(60))))
- private val redisHealthcheckRateLimiter =
redisRateLimiterFactory.withSpecification(flexibleRateLimitRule, None)
- private val healthcheckTimeout = new FiniteDuration(3, TimeUnit.SECONDS)
+ private val healthcheckTimeout = Duration.ofSeconds(3)
override def componentName(): ComponentName = redisComponent
override def check(): Publisher[Result] =
-
SMono.fromPublisher(redisHealthcheckRateLimiter.rateLimit(redisHealthcheckKey,
1))
- .timeout(healthcheckTimeout)
+ connectRedis()
.`then`(SMono.just(Result.healthy(redisComponent)))
- .onErrorResume(e => SMono.just(Result.unhealthy(redisComponent, "Can not
connect to Redis.", e)))
+ .onErrorResume(_ => SMono.just(Result.degraded(redisComponent, "Can not
connect to Redis.")))
+
+ private def connectRedis(): SMono[StatefulConnection[String, String]] =
+ if (redisConfiguration.isCluster) {
+ val redisUris = redisConfiguration.redisURI.value.asJava
+ redisUris.forEach(redisUri => redisUri.setTimeout(healthcheckTimeout))
+
+ SMono.fromPublisher(Mono.using(() =>
RedisClusterClient.create(redisUris),
+ (redisClusterClient: RedisClusterClient) => Mono.fromCallable(() =>
redisClusterClient.connect()),
+ (redisClusterClient: RedisClusterClient) =>
redisClusterClient.close()))
+ } else {
+ val redisUri: RedisURI = redisConfiguration.redisURI.value.last
+ redisUri.setTimeout(healthcheckTimeout)
+
+ SMono.fromPublisher(Mono.using(() => RedisClient.create(redisUri),
+ (redisClient: RedisClient) => Mono.fromCallable(() =>
redisClient.connect()),
Review Comment:
Is connect blocking? If yes, is there a way to do in on a non blocking
fashion?
If impossible please do this on boundedElastic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]