This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1d6495ee13977c5159cf22e70bf22a4e173f61a4 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Tue Apr 1 11:27:12 2025 +0700 [JAMES-3693] Refactor further RedisHealthCheck to simplify and reduce code --- .../james/backends/redis/RedisHealthCheck.scala | 77 ++++++---------------- 1 file changed, 21 insertions(+), 56 deletions(-) diff --git a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala index 30154f154e..5ab511a8cc 100644 --- a/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala +++ b/backends-common/redis/src/main/java/org/apache/james/backends/redis/RedisHealthCheck.scala @@ -21,16 +21,14 @@ package org.apache.james.backends.redis import java.time.Duration -import io.lettuce.core.api.reactive.RedisReactiveCommands +import io.lettuce.core.api.reactive.RedisStringReactiveCommands import io.lettuce.core.cluster.RedisClusterClient -import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands import io.lettuce.core.codec.StringCodec import io.lettuce.core.masterreplica.MasterReplica import io.lettuce.core.{AbstractRedisClient, RedisClient} import jakarta.annotation.PreDestroy import jakarta.inject.Inject -import org.apache.commons.lang3.StringUtils -import org.apache.james.backends.redis.RedisHealthCheck.{healthcheckTimeout, redisComponent} +import org.apache.james.backends.redis.RedisHealthCheck.{healthCheckKey, healthCheckValue, healthcheckTimeout, redisComponent} import org.apache.james.core.healthcheck.{ComponentName, HealthCheck, Result} import org.reactivestreams.Publisher import reactor.core.publisher.Mono @@ -43,73 +41,40 @@ import scala.jdk.DurationConverters._ object RedisHealthCheck { val redisComponent: ComponentName = new ComponentName("Redis") val healthcheckTimeout: Duration = Duration.ofSeconds(3) + val healthCheckKey: String = "healthcheck" + val healthCheckValue: String = "healthy" } class RedisHealthCheck @Inject()(redisClientFactory: RedisClientFactory, redisConfiguration: RedisConfiguration) extends HealthCheck { private val rawRedisClient: AbstractRedisClient = redisClientFactory.createRawRedisClient() - private val healthcheckPerform: RedisHealthcheckPerform = rawRedisClient match { - case redisClusterClient: RedisClusterClient => new RedisClusterHealthCheckPerform(redisClusterClient) - case redisClient: RedisClient => new RedisRegularHealthCheckPerform(redisClient, redisConfiguration) - case _ => throw new NotImplementedError() - } - - override def componentName(): ComponentName = redisComponent - - override def check(): Publisher[Result] = - healthcheckPerform.check() - .onErrorResume(_ => SMono.just(Result.degraded(redisComponent, "Can not connect to Redis."))) - - @PreDestroy - def close(): Unit = - Mono.fromCompletionStage(rawRedisClient.shutdownAsync()) - .subscribeOn(Schedulers.boundedElastic()) - .subscribe() -} - -sealed trait RedisHealthcheckPerform { - def check(): SMono[Result] -} - -class RedisRegularHealthCheckPerform(val redisClient: RedisClient, redisConfiguration: RedisConfiguration) extends RedisHealthcheckPerform { - - private val PING_SUCCESS_RESPONSE = "PONG" - - private val redisCommand: RedisReactiveCommands[String, String] = redisConfiguration match { - case _: StandaloneRedisConfiguration => redisClient.connect().reactive() - case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => MasterReplica.connect(redisClient, + private val redisCommand: RedisStringReactiveCommands[String, String] = redisConfiguration match { + case _: StandaloneRedisConfiguration => rawRedisClient.asInstanceOf[RedisClient].connect().reactive() + case _: ClusterRedisConfiguration => rawRedisClient.asInstanceOf[RedisClusterClient].connect().reactive() + case masterReplicaRedisConfiguration: MasterReplicaRedisConfiguration => MasterReplica.connect(rawRedisClient.asInstanceOf[RedisClient], StringCodec.UTF8, masterReplicaRedisConfiguration.redisURI.value.asJava) .reactive() - case sentinelRedisConfiguration: SentinelRedisConfiguration => MasterReplica.connect(redisClient, + case sentinelRedisConfiguration: SentinelRedisConfiguration => MasterReplica.connect(rawRedisClient.asInstanceOf[RedisClient], StringCodec.UTF8, sentinelRedisConfiguration.redisURI) .reactive() } - override def check(): SMono[Result] = - SMono(redisCommand.ping()) + override def componentName(): ComponentName = redisComponent + + override def check(): Publisher[Result] = + SMono(redisCommand.set(healthCheckKey, healthCheckValue) + .`then`(redisCommand.getdel(healthCheckKey))) .timeout(healthcheckTimeout.toScala) - .filter(_ == PING_SUCCESS_RESPONSE) .map(_ => Result.healthy(redisComponent)) - .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not PING to Redis."))) -} - -class RedisClusterHealthCheckPerform(val redisClusterClient: RedisClusterClient) extends RedisHealthcheckPerform { - - private val CLUSTER_STATUS_OK: String = "ok" + .switchIfEmpty(SMono.just(Result.degraded(redisComponent, "Can not write to Redis."))) + .onErrorResume(_ => SMono.just(Result.degraded(redisComponent, "Can not connect to Redis."))) - private val redisCommand: RedisAdvancedClusterReactiveCommands[String, String] = { - redisClusterClient.getPartitions - redisClusterClient.connect().reactive() + @PreDestroy + def close(): Unit = { + Mono.fromCompletionStage(rawRedisClient.shutdownAsync()) + .subscribeOn(Schedulers.boundedElastic()) + .subscribe() } - - override def check(): SMono[Result] = - SMono(redisCommand.clusterInfo()) - .timeout(healthcheckTimeout.toScala) - .map(clusterInfo => StringUtils.substringBetween(clusterInfo, "cluster_state:", "\n").trim) - .map { - case CLUSTER_STATUS_OK => Result.healthy(redisComponent) - case unExpectedState => Result.degraded(redisComponent, "Redis cluster state: " + unExpectedState) - } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org