This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7c8d8b19443879fc967ffaf679eca28258e68c08 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Apr 17 08:18:03 2020 +0700 JAMES-3117 Reactive ElasticSearch healthCHeck --- .../james/backends/es/ElasticSearchHealthCheck.java | 17 +++++++---------- .../james/backends/es/ReactorElasticSearchClient.java | 12 +++++++----- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java index 89037e7..4dded30 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java @@ -30,11 +30,12 @@ import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.core.healthcheck.Result; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Requests; import com.google.common.annotations.VisibleForTesting; +import reactor.core.publisher.Mono; + public class ElasticSearchHealthCheck implements HealthCheck { private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend"); @@ -54,20 +55,16 @@ public class ElasticSearchHealthCheck implements HealthCheck { } @Override - public Result check() { + public Mono<Result> checkReactive() { String[] indices = indexNames.stream() .map(IndexName::getValue) .toArray(String[]::new); ClusterHealthRequest request = Requests.clusterHealthRequest(indices); - try { - ClusterHealthResponse response = client.cluster() - .health(request, RequestOptions.DEFAULT); - - return toHealthCheckResult(response); - } catch (IOException e) { - return Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e); - } + return client.health(request) + .map(this::toHealthCheckResult) + .onErrorResume(IOException.class, e -> Mono.just( + Result.unhealthy(COMPONENT_NAME, "Error while contacting cluster", e))); } @VisibleForTesting diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java index 3b0c18b..4885b08 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java @@ -23,6 +23,8 @@ import java.io.IOException; import java.util.function.Consumer; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest; import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest; @@ -45,7 +47,6 @@ import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; -import org.elasticsearch.client.ClusterClient; import org.elasticsearch.client.IndicesClient; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; @@ -76,10 +77,6 @@ public class ReactorElasticSearchClient implements AutoCloseable { return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener)); } - public ClusterClient cluster() { - return client.cluster(); - } - public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { return client.delete(deleteRequest, options); } @@ -141,6 +138,11 @@ public class ReactorElasticSearchClient implements AutoCloseable { return toReactor(listener -> client.searchAsync(searchRequest, options, listener)); } + public Mono<ClusterHealthResponse> health(ClusterHealthRequest request) { + return toReactor(listener -> client.cluster() + .healthAsync(request, RequestOptions.DEFAULT, listener)); + } + public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) { return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
