This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 97e06e9ccafa1d5a8869fdcc2b5ead42564a3c90 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Fri Mar 13 16:05:21 2020 +0100 JAMES-3144 Use ElasticSearch reactively --- .../apache/james/backends/es/ClientProvider.java | 12 +- .../james/backends/es/DeleteByQueryPerformer.java | 17 +- .../backends/es/ElasticSearchHealthCheck.java | 5 +- .../james/backends/es/ElasticSearchIndexer.java | 67 ++++---- .../james/backends/es/IndexCreationFactory.java | 11 +- .../apache/james/backends/es/ListenerToFuture.java | 51 ------ .../james/backends/es/NodeMappingFactory.java | 7 +- .../backends/es/ReactorElasticSearchClient.java | 171 +++++++++++++++++++++ .../james/backends/es/search/ScrolledSearch.java | 113 ++++++-------- .../es/ClientProviderImplConnectionContract.java | 5 +- .../es/ElasticSearchHealthCheckConnectionTest.java | 3 +- .../backends/es/ElasticSearchIndexerTest.java | 72 +++++---- .../backends/es/IndexCreationFactoryTest.java | 3 +- .../backends/es/NodeMappingFactoryAuthTest.java | 3 +- .../james/backends/es/NodeMappingFactoryTest.java | 3 +- .../backends/es/search/ScrolledSearchTest.java | 37 +++-- .../elasticsearch/MailboxIndexCreationUtil.java | 14 +- .../ElasticSearchListeningMessageSearchIndex.java | 38 +++-- .../search/ElasticSearchSearcher.java | 23 +-- .../ElasticSearchIntegrationTest.java | 4 +- ...asticSearchListeningMessageSearchIndexTest.java | 10 +- .../search/ElasticSearchSearcherTest.java | 4 +- .../elasticsearch/ElasticSearchQuotaSearcher.java | 35 ++--- .../QuotaSearchIndexCreationUtil.java | 6 +- .../events/ElasticSearchQuotaMailboxListener.java | 8 +- ...lasticSearchQuotaSearchTestSystemExtension.java | 4 +- .../ElasticSearchQuotaMailboxListenerTest.java | 13 +- .../host/ElasticSearchHostSystem.java | 4 +- .../modules/mailbox/ElasticSearchClientModule.java | 4 +- .../mailbox/ElasticSearchMailboxModule.java | 10 +- .../mailbox/ElasticSearchQuotaSearcherModule.java | 10 +- .../modules/mailbox/ElasticSearchStartUpCheck.java | 6 +- .../test/java/org/apache/james/ESReporterTest.java | 5 +- .../routes/ElasticSearchQuotaSearchExtension.java | 4 +- 34 files changed, 450 insertions(+), 332 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java index b879a88..247ed50 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProvider.java @@ -57,7 +57,7 @@ import com.google.common.annotations.VisibleForTesting; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class ClientProvider implements Provider<RestHighLevelClient> { +public class ClientProvider implements Provider<ReactorElasticSearchClient> { private static class HttpAsyncClientConfigurer { @@ -165,15 +165,17 @@ public class ClientProvider implements Provider<RestHighLevelClient> { private static final Logger LOGGER = LoggerFactory.getLogger(ClientProvider.class); private final ElasticSearchConfiguration configuration; - private final RestHighLevelClient client; + private final RestHighLevelClient elasticSearchRestHighLevelClient; private final HttpAsyncClientConfigurer httpAsyncClientConfigurer; + private final ReactorElasticSearchClient client; @Inject @VisibleForTesting ClientProvider(ElasticSearchConfiguration configuration) { this.httpAsyncClientConfigurer = new HttpAsyncClientConfigurer(configuration); this.configuration = configuration; - this.client = connect(configuration); + this.elasticSearchRestHighLevelClient = connect(configuration); + this.client = new ReactorElasticSearchClient(this.elasticSearchRestHighLevelClient); } private RestHighLevelClient connect(ElasticSearchConfiguration configuration) { @@ -206,12 +208,12 @@ public class ClientProvider implements Provider<RestHighLevelClient> { } @Override - public RestHighLevelClient get() { + public ReactorElasticSearchClient get() { return client; } @PreDestroy public void close() throws IOException { - client.close(); + elasticSearchRestHighLevelClient.close(); } } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java index 66ad682..5277694 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/DeleteByQueryPerformer.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; @@ -34,18 +33,17 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import com.google.common.annotations.VisibleForTesting; -import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class DeleteByQueryPerformer { private static final TimeValue TIMEOUT = new TimeValue(60000); - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; private final int batchSize; private final WriteAliasName aliasName; @VisibleForTesting - DeleteByQueryPerformer(RestHighLevelClient client, int batchSize, WriteAliasName aliasName) { + DeleteByQueryPerformer(ReactorElasticSearchClient client, int batchSize, WriteAliasName aliasName) { this.client = client; this.batchSize = batchSize; this.aliasName = aliasName; @@ -54,9 +52,10 @@ public class DeleteByQueryPerformer { public Mono<Void> perform(QueryBuilder queryBuilder, RoutingKey routingKey) { SearchRequest searchRequest = prepareSearch(queryBuilder, routingKey); - return Flux.fromStream(new ScrolledSearch(client, searchRequest).searchResponses()) - .flatMap(searchResponse -> deleteRetrievedIds(client, searchResponse, routingKey)) - .thenEmpty(Mono.empty()); + return new ScrolledSearch(client, searchRequest).searchResponses() + .filter(searchResponse -> searchResponse.getHits().getHits().length > 0) + .flatMap(searchResponse -> deleteRetrievedIds(searchResponse, routingKey)) + .then(); } private SearchRequest prepareSearch(QueryBuilder queryBuilder, RoutingKey routingKey) { @@ -73,7 +72,7 @@ public class DeleteByQueryPerformer { .size(batchSize); } - private Mono<BulkResponse> deleteRetrievedIds(RestHighLevelClient client, SearchResponse searchResponse, RoutingKey routingKey) { + private Mono<BulkResponse> deleteRetrievedIds(SearchResponse searchResponse, RoutingKey routingKey) { BulkRequest request = new BulkRequest(); for (SearchHit hit : searchResponse.getHits()) { @@ -84,6 +83,6 @@ public class DeleteByQueryPerformer { .routing(routingKey.asString())); } - return Mono.fromCallable(() -> client.bulk(request, RequestOptions.DEFAULT)); + return client.bulk(request, RequestOptions.DEFAULT); } } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java index 1305420..58cd813 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchHealthCheck.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +43,10 @@ public class ElasticSearchHealthCheck implements HealthCheck { private static final ComponentName COMPONENT_NAME = new ComponentName("ElasticSearch Backend"); private final Set<IndexName> indexNames; - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; @Inject - ElasticSearchHealthCheck(RestHighLevelClient client, Set<IndexName> indexNames) { + ElasticSearchHealthCheck(ReactorElasticSearchClient client, Set<IndexName> indexNames) { this.client = client; this.indexNames = indexNames; } diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java index 99a9f80..0fcbc11 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ElasticSearchIndexer.java @@ -18,9 +18,7 @@ ****************************************************************/ package org.apache.james.backends.es; -import java.io.IOException; import java.util.List; -import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.bulk.BulkRequest; @@ -30,7 +28,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilder; @@ -40,23 +37,25 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import reactor.core.publisher.Mono; + public class ElasticSearchIndexer { private static final int DEBUG_MAX_LENGTH_CONTENT = 1000; private static final int DEFAULT_BATCH_SIZE = 100; private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchIndexer.class); - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; private final AliasName aliasName; private final DeleteByQueryPerformer deleteByQueryPerformer; - public ElasticSearchIndexer(RestHighLevelClient client, + public ElasticSearchIndexer(ReactorElasticSearchClient client, WriteAliasName aliasName) { this(client, aliasName, DEFAULT_BATCH_SIZE); } @VisibleForTesting - public ElasticSearchIndexer(RestHighLevelClient client, + public ElasticSearchIndexer(ReactorElasticSearchClient client, WriteAliasName aliasName, int batchSize) { this.client = client; @@ -64,7 +63,7 @@ public class ElasticSearchIndexer { this.aliasName = aliasName; } - public IndexResponse index(DocumentId id, String content, RoutingKey routingKey) throws IOException { + public Mono<IndexResponse> index(DocumentId id, String content, RoutingKey routingKey) { checkArgument(content); logContent(id, content); return client.index(new IndexRequest(aliasName.getValue()) @@ -81,37 +80,37 @@ public class ElasticSearchIndexer { } } - public Optional<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) throws IOException { - try { - Preconditions.checkNotNull(updatedDocumentParts); - Preconditions.checkNotNull(routingKey); - BulkRequest request = new BulkRequest(); - updatedDocumentParts.forEach(updatedDocumentPart -> request.add( - new UpdateRequest(aliasName.getValue(), - NodeMappingFactory.DEFAULT_MAPPING_NAME, - updatedDocumentPart.getId().asString()) + public Mono<BulkResponse> update(List<UpdatedRepresentation> updatedDocumentParts, RoutingKey routingKey) { + Preconditions.checkNotNull(updatedDocumentParts); + Preconditions.checkNotNull(routingKey); + BulkRequest request = new BulkRequest(); + updatedDocumentParts.forEach(updatedDocumentPart -> request.add( + new UpdateRequest(aliasName.getValue(), + NodeMappingFactory.DEFAULT_MAPPING_NAME, + updatedDocumentPart.getId().asString()) .doc(updatedDocumentPart.getUpdatedDocumentPart(), XContentType.JSON) .routing(routingKey.asString()))); - return Optional.of(client.bulk(request, RequestOptions.DEFAULT)); - } catch (ValidationException e) { - LOGGER.warn("Error while updating index", e); - return Optional.empty(); - } + + return client.bulk(request, RequestOptions.DEFAULT) + .onErrorResume(ValidationException.class, exception -> { + LOGGER.warn("Error while updating index", exception); + return Mono.empty(); + }); } - public Optional<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) throws IOException { - try { - BulkRequest request = new BulkRequest(); - ids.forEach(id -> request.add( - new DeleteRequest(aliasName.getValue()) - .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .id(id.asString()) - .routing(routingKey.asString()))); - return Optional.of(client.bulk(request, RequestOptions.DEFAULT)); - } catch (ValidationException e) { - LOGGER.warn("Error while deleting index", e); - return Optional.empty(); - } + public Mono<BulkResponse> delete(List<DocumentId> ids, RoutingKey routingKey) { + BulkRequest request = new BulkRequest(); + ids.forEach(id -> request.add( + new DeleteRequest(aliasName.getValue()) + .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .id(id.asString()) + .routing(routingKey.asString()))); + + return client.bulk(request, RequestOptions.DEFAULT) + .onErrorResume(ValidationException.class, exception -> { + LOGGER.warn("Error while deleting index", exception); + return Mono.empty(); + }); } public void deleteAllMatchingQuery(QueryBuilder queryBuilder, RoutingKey routingKey) { diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java index 9d6f834..07e41e8 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/IndexCreationFactory.java @@ -31,7 +31,6 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +62,7 @@ public class IndexCreationFactory { return this; } - public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) { + public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) { return new IndexCreationPerformer(nbShards, nbReplica, waitForActiveShards, indexName, aliases.build()).createIndexAndAliases(client); } } @@ -83,7 +82,7 @@ public class IndexCreationFactory { this.aliases = aliases; } - public RestHighLevelClient createIndexAndAliases(RestHighLevelClient client) { + public ReactorElasticSearchClient createIndexAndAliases(ReactorElasticSearchClient client) { Preconditions.checkNotNull(indexName); try { createIndexIfNeeded(client, indexName, generateSetting(nbShards, nbReplica, waitForActiveShards)); @@ -95,7 +94,7 @@ public class IndexCreationFactory { return client; } - private void createAliasIfNeeded(RestHighLevelClient client, IndexName indexName, AliasName aliasName) throws IOException { + private void createAliasIfNeeded(ReactorElasticSearchClient client, IndexName indexName, AliasName aliasName) throws IOException { if (!aliasExist(client, aliasName)) { client.indices() .updateAliases( @@ -107,12 +106,12 @@ public class IndexCreationFactory { } } - private boolean aliasExist(RestHighLevelClient client, AliasName aliasName) throws IOException { + private boolean aliasExist(ReactorElasticSearchClient client, AliasName aliasName) throws IOException { return client.indices() .existsAlias(new GetAliasesRequest().aliases(aliasName.getValue()), RequestOptions.DEFAULT); } - private void createIndexIfNeeded(RestHighLevelClient client, IndexName indexName, XContentBuilder settings) throws IOException { + private void createIndexIfNeeded(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder settings) throws IOException { try { client.indices() .create( diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java deleted file mode 100644 index 1df57b0..0000000 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ListenerToFuture.java +++ /dev/null @@ -1,51 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backends.es; - -import java.util.concurrent.CompletableFuture; - -import org.elasticsearch.action.ActionListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ListenerToFuture<T> implements ActionListener<T> { - private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class); - - private CompletableFuture<T> future; - - public ListenerToFuture() { - this.future = new CompletableFuture<>(); - } - - @Override - public void onResponse(T t) { - future.complete(t); - } - - @Override - public void onFailure(Exception e) { - LOGGER.warn("Error while waiting ElasticSearch query execution: ", e); - future.completeExceptionally(e); - } - - public CompletableFuture<T> getFuture() { - return future; - } -} diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java index f4359f4..25b82e7 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/NodeMappingFactory.java @@ -25,7 +25,6 @@ import org.apache.http.HttpStatus; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.ResponseException; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; public class NodeMappingFactory { @@ -55,7 +54,7 @@ public class NodeMappingFactory { public static final String SNOWBALL = "snowball"; public static final String IGNORE_ABOVE = "ignore_above"; - public static RestHighLevelClient applyMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException { + public static ReactorElasticSearchClient applyMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException { if (!mappingAlreadyExist(client, indexName)) { createMapping(client, indexName, mappingsSources); } @@ -64,7 +63,7 @@ public class NodeMappingFactory { // ElasticSearch 6.3.2 does not support field master_timeout that is set up by 6.4.3 REST client when relying on getMapping @SuppressWarnings("deprecation") - public static boolean mappingAlreadyExist(RestHighLevelClient client, IndexName indexName) throws IOException { + public static boolean mappingAlreadyExist(ReactorElasticSearchClient client, IndexName indexName) throws IOException { try { client.getLowLevelClient().performRequest("GET", "/" + indexName.getValue() + "/_mapping/" + NodeMappingFactory.DEFAULT_MAPPING_NAME); return true; @@ -76,7 +75,7 @@ public class NodeMappingFactory { return false; } - public static void createMapping(RestHighLevelClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException { + public static void createMapping(ReactorElasticSearchClient client, IndexName indexName, XContentBuilder mappingsSources) throws IOException { client.indices().putMapping( new PutMappingRequest(indexName.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java new file mode 100644 index 0000000..3b0c18b --- /dev/null +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ReactorElasticSearchClient.java @@ -0,0 +1,171 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es; + +import java.io.IOException; +import java.util.function.Consumer; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest; +import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptResponse; +import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest; +import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.explain.ExplainRequest; +import org.elasticsearch.action.explain.ExplainResponse; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.main.MainResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.MultiSearchRequest; +import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.ClusterClient; +import org.elasticsearch.client.IndicesClient; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.rankeval.RankEvalRequest; +import org.elasticsearch.index.rankeval.RankEvalResponse; +import org.elasticsearch.script.mustache.MultiSearchTemplateRequest; +import org.elasticsearch.script.mustache.MultiSearchTemplateResponse; +import org.elasticsearch.script.mustache.SearchTemplateRequest; +import org.elasticsearch.script.mustache.SearchTemplateResponse; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; +import reactor.core.scheduler.Schedulers; + +public class ReactorElasticSearchClient implements AutoCloseable { + private final RestHighLevelClient client; + + public ReactorElasticSearchClient(RestHighLevelClient client) { + this.client = client; + } + + public Mono<BulkResponse> bulk(BulkRequest bulkRequest, RequestOptions options) { + return toReactor(listener -> client.bulkAsync(bulkRequest, options, listener)); + } + + public Mono<ClearScrollResponse> clearScroll(ClearScrollRequest clearScrollRequest, RequestOptions options) { + return toReactor(listener -> client.clearScrollAsync(clearScrollRequest, options, listener)); + } + + public ClusterClient cluster() { + return client.cluster(); + } + + public DeleteResponse delete(DeleteRequest deleteRequest, RequestOptions options) throws IOException { + return client.delete(deleteRequest, options); + } + + public Mono<DeleteStoredScriptResponse> deleteScript(DeleteStoredScriptRequest request, RequestOptions options) { + return toReactor(listener -> client.deleteScriptAsync(request, options, listener)); + } + + public Mono<ExplainResponse> explain(ExplainRequest explainRequest, RequestOptions options) { + return toReactor(listener -> client.explainAsync(explainRequest, options, listener)); + } + + public Mono<FieldCapabilitiesResponse> fieldCaps(FieldCapabilitiesRequest fieldCapabilitiesRequest, RequestOptions options) { + return toReactor(listener -> client.fieldCapsAsync(fieldCapabilitiesRequest, options, listener)); + } + + public RestClient getLowLevelClient() { + return client.getLowLevelClient(); + } + + public Mono<GetStoredScriptResponse> getScript(GetStoredScriptRequest request, RequestOptions options) { + return toReactor(listener -> client.getScriptAsync(request, options, listener)); + } + + public Mono<IndexResponse> index(IndexRequest indexRequest, RequestOptions options) { + return toReactor(listener -> client.indexAsync(indexRequest, options, listener)); + } + + public IndicesClient indices() { + return client.indices(); + } + + public MainResponse info(RequestOptions options) throws IOException { + return client.info(options); + } + + public Mono<MultiSearchResponse> msearch(MultiSearchRequest multiSearchRequest, RequestOptions options) { + return toReactor(listener -> client.msearchAsync(multiSearchRequest, options, listener)); + } + + public Mono<MultiSearchTemplateResponse> msearchTemplate(MultiSearchTemplateRequest multiSearchTemplateRequest, RequestOptions options) { + return toReactor(listener -> client.msearchTemplateAsync(multiSearchTemplateRequest, options, listener)); + } + + public Mono<RankEvalResponse> rankEval(RankEvalRequest rankEvalRequest, RequestOptions options) { + return toReactor(listener -> client.rankEvalAsync(rankEvalRequest, options, listener)); + } + + public Mono<SearchResponse> scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) { + return toReactor(listener -> client.scrollAsync(searchScrollRequest, options, listener)); + } + + @Deprecated + public Mono<SearchResponse> search(SearchRequest searchRequest) { + return toReactor(listener -> client.searchAsync(searchRequest, listener)); + } + + public Mono<SearchResponse> search(SearchRequest searchRequest, RequestOptions options) { + return toReactor(listener -> client.searchAsync(searchRequest, options, listener)); + } + + public Mono<SearchTemplateResponse> searchTemplate(SearchTemplateRequest searchTemplateRequest, RequestOptions options) { + return toReactor(listener -> client.searchTemplateAsync(searchTemplateRequest, options, listener)); + } + + @Override + public void close() throws IOException { + client.close(); + } + + private static <T> Mono<T> toReactor(Consumer<ActionListener<T>> async) { + return Mono.<T>create(sink -> async.accept(getListener(sink))) + .subscribeOn(Schedulers.elastic()); + } + + private static <T> ActionListener<T> getListener(MonoSink<T> sink) { + return new ActionListener<T>() { + @Override + public void onResponse(T t) { + sink.success(t); + } + + @Override + public void onFailure(Exception e) { + sink.error(e); + } + }; + } +} diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java index 834f801..10146ea 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java @@ -19,94 +19,79 @@ package org.apache.james.backends.es.search; -import java.io.Closeable; -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Stream; - -import org.apache.james.backends.es.ListenerToFuture; -import org.apache.james.util.streams.Iterators; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; -import com.github.fge.lambdas.Throwing; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class ScrolledSearch { - private static class ScrollIterator implements Iterator<SearchResponse>, Closeable { - private final RestHighLevelClient client; - private CompletableFuture<SearchResponse> searchResponseFuture; + private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); - ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) { - this.client = client; - ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>(); - client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener); + private final ReactorElasticSearchClient client; + private final SearchRequest searchRequest; - this.searchResponseFuture = listener.getFuture(); - } + public ScrolledSearch(ReactorElasticSearchClient client, SearchRequest searchRequest) { + this.client = client; + this.searchRequest = searchRequest; + } - @Override - public void close() throws IOException { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(searchResponseFuture.join().getScrollId()); - client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT); - } - @Override - public boolean hasNext() { - SearchResponse join = searchResponseFuture.join(); - return !allSearchResponsesConsumed(join); - } + public Flux<SearchHit> searchHits() { + return searchResponses() + .flatMap(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits())); + } - @Override - public SearchResponse next() { - SearchResponse result = searchResponseFuture.join(); - ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>(); - client.scrollAsync( - new SearchScrollRequest() - .scrollId(result.getScrollId()) - .scroll(TIMEOUT), - RequestOptions.DEFAULT, - listener); - searchResponseFuture = listener.getFuture(); - return result; - } + public Flux<SearchResponse> searchResponses() { + return ensureClosing(Flux.from(startScrolling(searchRequest)) + .expand(this::nextResponse)); + } - public Stream<SearchResponse> stream() { - return Iterators.toStream(this) - .onClose(Throwing.runnable(this::close)); - } + private Mono<SearchResponse> startScrolling(SearchRequest searchRequest) { + return client.search(searchRequest, RequestOptions.DEFAULT); + } - private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { - return searchResponse.getHits().getHits().length == 0; + public Mono<SearchResponse> nextResponse(SearchResponse previous) { + if (allSearchResponsesConsumed(previous)) { + return Mono.empty(); } - } - private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); + return client.scroll( + new SearchScrollRequest() + .scrollId(previous.getScrollId()) + .scroll(TIMEOUT), + RequestOptions.DEFAULT); + } - private final RestHighLevelClient client; - private final SearchRequest searchRequest; + private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { + return searchResponse.getHits().getHits().length == 0; + } - public ScrolledSearch(RestHighLevelClient client, SearchRequest searchRequest) { - this.client = client; - this.searchRequest = searchRequest; + private Flux<SearchResponse> ensureClosing(Flux<SearchResponse> origin) { + AtomicReference<SearchResponse> latest = new AtomicReference<>(); + return origin + .doOnNext(latest::set) + .doOnTerminate(close(latest)); } - public Stream<SearchHit> searchHits() { - return searchResponses() - .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())); + public Runnable close(AtomicReference<SearchResponse> latest) { + return () -> Optional.ofNullable(latest.getAndSet(null)).map(this::clearScroll); } - @SuppressWarnings("resource") - public Stream<SearchResponse> searchResponses() { - return new ScrollIterator(client, searchRequest) - .stream(); + private Disposable clearScroll(SearchResponse current) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(current.getScrollId()); + + return client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe(); } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java index 05a7ec4..4a4328c 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ClientProviderImplConnectionContract.java @@ -25,7 +25,6 @@ import org.apache.james.backends.es.ElasticSearchClusterExtension.ElasticSearchC import org.awaitility.Awaitility; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.Test; @@ -75,11 +74,11 @@ interface ClientProviderImplConnectionContract { } default boolean isConnected(ClientProvider clientProvider) { - try (RestHighLevelClient client = clientProvider.get()) { + try (ReactorElasticSearchClient client = clientProvider.get()) { client.search( new SearchRequest() .source(new SearchSourceBuilder().query(QueryBuilders.existsQuery("any"))), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT).block(); return true; } catch (Exception e) { LOGGER.info("Caught exception while trying to connect", e); diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java index 51e1b2e..0f5ddd6 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchHealthCheckConnectionTest.java @@ -22,7 +22,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.time.Duration; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -38,7 +37,7 @@ class ElasticSearchHealthCheckConnectionTest { @BeforeEach void setUp() { - RestHighLevelClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get(); + ReactorElasticSearchClient client = elasticSearch.getDockerElasticSearch().clientProvider(REQUEST_TIMEOUT).get(); elasticSearchHealthCheck = new ElasticSearchHealthCheck(client, ImmutableSet.of()); } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java index c7372a5..f5f8a40 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/ElasticSearchIndexerTest.java @@ -33,7 +33,6 @@ import org.awaitility.core.ConditionFactory; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.AfterEach; @@ -62,7 +61,7 @@ class ElasticSearchIndexerTest { @RegisterExtension public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); private ElasticSearchIndexer testee; - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @BeforeEach void setup() { @@ -80,17 +79,18 @@ class ElasticSearchIndexerTest { } @Test - void indexMessageShouldWork() throws Exception { + void indexMessageShouldWork() { DocumentId documentId = DocumentId.fromString("1"); String content = "{\"message\": \"trying out Elasticsearch\"}"; - testee.index(documentId, content, useDocumentId(documentId)); + testee.index(documentId, content, useDocumentId(documentId)).block(); elasticSearch.awaitForElasticSearch(); - + SearchResponse searchResponse = client.search( new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "trying"))), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); } @@ -101,64 +101,66 @@ class ElasticSearchIndexerTest { } @Test - void updateMessages() throws Exception { + void updateMessages() { String content = "{\"message\": \"trying out Elasticsearch\",\"field\":\"Should be unchanged\"}"; - testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)); + testee.index(DOCUMENT_ID, content, useDocumentId(DOCUMENT_ID)).block(); elasticSearch.awaitForElasticSearch(); - testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)); + testee.update(ImmutableList.of(new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), useDocumentId(DOCUMENT_ID)).block(); elasticSearch.awaitForElasticSearch(); SearchResponse searchResponse = client.search( new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("message", "mastering"))), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); SearchResponse searchResponse2 = client.search( new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchQuery("field", "unchanged"))), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); assertThat(searchResponse2.getHits().getTotalHits()).isEqualTo(1); } @Test void updateMessageShouldThrowWhenJsonIsNull() { assertThatThrownBy(() -> testee.update(ImmutableList.of( - new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING)) + new UpdatedRepresentation(DOCUMENT_ID, null)), ROUTING).block()) .isInstanceOf(IllegalArgumentException.class); } @Test void updateMessageShouldThrowWhenIdIsNull() { assertThatThrownBy(() -> testee.update(ImmutableList.of( - new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING)) + new UpdatedRepresentation(null, "{\"message\": \"mastering out Elasticsearch\"}")), ROUTING).block()) .isInstanceOf(NullPointerException.class); } @Test void updateMessageShouldThrowWhenJsonIsEmpty() { assertThatThrownBy(() -> testee.update(ImmutableList.of( - new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING)) + new UpdatedRepresentation(DOCUMENT_ID, "")), ROUTING).block()) .isInstanceOf(IllegalArgumentException.class); } @Test void updateMessageShouldThrowWhenRoutingKeyIsNull() { assertThatThrownBy(() -> testee.update(ImmutableList.of( - new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null)) + new UpdatedRepresentation(DOCUMENT_ID, "{\"message\": \"mastering out Elasticsearch\"}")), null).block()) .isInstanceOf(NullPointerException.class); } @Test - void deleteByQueryShouldWorkOnSingleMessage() throws Exception { + void deleteByQueryShouldWorkOnSingleMessage() { DocumentId documentId = DocumentId.fromString("1:2"); String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; RoutingKey routingKey = useDocumentId(documentId); - testee.index(documentId, content, routingKey); + testee.index(documentId, content, routingKey).block(); elasticSearch.awaitForElasticSearch(); testee.deleteAllMatchingQuery(termQuery("property", "1"), routingKey); @@ -169,25 +171,26 @@ class ElasticSearchIndexerTest { new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())), RequestOptions.DEFAULT) + .block() .getHits().getTotalHits() == 0); } @Test - void deleteByQueryShouldWorkWhenMultipleMessages() throws Exception { + void deleteByQueryShouldWorkWhenMultipleMessages() { DocumentId documentId = DocumentId.fromString("1:1"); String content = "{\"message\": \"trying out Elasticsearch\", \"property\":\"1\"}"; - testee.index(documentId, content, ROUTING); + testee.index(documentId, content, ROUTING).block(); DocumentId documentId2 = DocumentId.fromString("1:2"); String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"property\":\"1\"}"; - testee.index(documentId2, content2, ROUTING); + testee.index(documentId2, content2, ROUTING).block(); DocumentId documentId3 = DocumentId.fromString("2:3"); String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"property\":\"2\"}"; - testee.index(documentId3, content3, ROUTING); + testee.index(documentId3, content3, ROUTING).block(); elasticSearch.awaitForElasticSearch(); testee.deleteAllMatchingQuery(termQuery("property", "1"), ROUTING); @@ -198,64 +201,67 @@ class ElasticSearchIndexerTest { new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())), RequestOptions.DEFAULT) + .block() .getHits().getTotalHits() == 1); } @Test - void deleteMessage() throws Exception { + void deleteMessage() { DocumentId documentId = DocumentId.fromString("1:2"); String content = "{\"message\": \"trying out Elasticsearch\"}"; - testee.index(documentId, content, useDocumentId(documentId)); + testee.index(documentId, content, useDocumentId(documentId)).block(); elasticSearch.awaitForElasticSearch(); - testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)); + testee.delete(ImmutableList.of(documentId), useDocumentId(documentId)).block(); elasticSearch.awaitForElasticSearch(); SearchResponse searchResponse = client.search( new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(0); } @Test - void deleteShouldWorkWhenMultipleMessages() throws Exception { + void deleteShouldWorkWhenMultipleMessages() { DocumentId documentId = DocumentId.fromString("1:1"); String content = "{\"message\": \"trying out Elasticsearch\", \"mailboxId\":\"1\"}"; - testee.index(documentId, content, ROUTING); + testee.index(documentId, content, ROUTING).block(); DocumentId documentId2 = DocumentId.fromString("1:2"); String content2 = "{\"message\": \"trying out Elasticsearch 2\", \"mailboxId\":\"1\"}"; - testee.index(documentId2, content2, ROUTING); + testee.index(documentId2, content2, ROUTING).block(); DocumentId documentId3 = DocumentId.fromString("2:3"); String content3 = "{\"message\": \"trying out Elasticsearch 3\", \"mailboxId\":\"2\"}"; - testee.index(documentId3, content3, ROUTING); + testee.index(documentId3, content3, ROUTING).block(); elasticSearch.awaitForElasticSearch(); - testee.delete(ImmutableList.of(documentId, documentId3), ROUTING); + testee.delete(ImmutableList.of(documentId, documentId3), ROUTING).block(); elasticSearch.awaitForElasticSearch(); SearchResponse searchResponse = client.search( new SearchRequest(INDEX_NAME.getValue()) .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); } @Test void updateMessagesShouldNotThrowWhenEmptyList() { - assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING)) + assertThatCode(() -> testee.update(ImmutableList.of(), ROUTING).block()) .doesNotThrowAnyException(); } @Test void deleteMessagesShouldNotThrowWhenEmptyList() { - assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING)) + assertThatCode(() -> testee.delete(ImmutableList.of(), ROUTING).block()) .doesNotThrowAnyException(); } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java index 0862711..6ff2f7e 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/IndexCreationFactoryTest.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.IOException; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,7 +34,7 @@ class IndexCreationFactoryTest { @RegisterExtension public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @BeforeEach void setUp() { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java index dddd34a..8c29fca 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryAuthTest.java @@ -25,7 +25,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.IOException; import java.util.Optional; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -42,7 +41,7 @@ class NodeMappingFactoryAuthTest { DockerAuthElasticSearchSingleton.INSTANCE, new DockerElasticSearch.WithAuth())); - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @BeforeEach void setUp(ElasticSearchClusterExtension.ElasticSearchCluster esCluster) throws Exception { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java index 7d2f52e..e686473 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/NodeMappingFactoryTest.java @@ -24,7 +24,6 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import java.io.IOException; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -38,7 +37,7 @@ class NodeMappingFactoryTest { @RegisterExtension public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @BeforeEach void setUp() throws Exception { diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java index c4e9e2b..e07368e 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/search/ScrolledSearchTest.java @@ -29,13 +29,13 @@ import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.IndexCreationFactory; import org.apache.james.backends.es.IndexName; import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.backends.es.ReadAliasName; import org.awaitility.Duration; import org.awaitility.core.ConditionFactory; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; @@ -56,7 +56,7 @@ class ScrolledSearchTest { @RegisterExtension public DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @BeforeEach void setUp() { @@ -81,7 +81,7 @@ class ScrolledSearchTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) .isEmpty(); } @@ -92,7 +92,8 @@ class ScrolledSearchTest { .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); elasticSearch.awaitForElasticSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id)); @@ -103,26 +104,28 @@ class ScrolledSearchTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) .extracting(SearchHit::getId) .containsOnly(id); } @Test - void scrollIterableShouldWorkWhenSizeElement() throws Exception { + void scrollIterableShouldWorkWhenSizeElement() { String id1 = "1"; client.index(new IndexRequest(INDEX_NAME.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id1) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); String id2 = "2"; client.index(new IndexRequest(INDEX_NAME.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id2) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); elasticSearch.awaitForElasticSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2)); @@ -133,33 +136,36 @@ class ScrolledSearchTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) .extracting(SearchHit::getId) .containsOnly(id1, id2); } @Test - void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception { + void scrollIterableShouldWorkWhenMoreThanSizeElement() { String id1 = "1"; client.index(new IndexRequest(INDEX_NAME.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id1) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); String id2 = "2"; client.index(new IndexRequest(INDEX_NAME.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id2) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); String id3 = "3"; client.index(new IndexRequest(INDEX_NAME.getValue()) .type(NodeMappingFactory.DEFAULT_MAPPING_NAME) .id(id3) .source(MESSAGE, "Sample message"), - RequestOptions.DEFAULT); + RequestOptions.DEFAULT) + .block(); elasticSearch.awaitForElasticSearch(); WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3)); @@ -170,18 +176,19 @@ class ScrolledSearchTest { .query(QueryBuilders.matchAllQuery()) .size(SIZE)); - assertThat(new ScrolledSearch(client, searchRequest).searchHits()) + assertThat(new ScrolledSearch(client, searchRequest).searchHits().collectList().block()) .extracting(SearchHit::getId) .containsOnly(id1, id2, id3); } - private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException { + private void hasIdsInIndex(ReactorElasticSearchClient client, String... ids) { SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) .scroll(TIMEOUT) .source(new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery())); SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT) + .block() .getHits() .getHits(); diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java index 4b76dad..9871600 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/MailboxIndexCreationUtil.java @@ -25,17 +25,17 @@ import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.IndexCreationFactory; import org.apache.james.backends.es.IndexName; import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.backends.es.ReadAliasName; import org.apache.james.backends.es.WriteAliasName; -import org.elasticsearch.client.RestHighLevelClient; public class MailboxIndexCreationUtil { - public static RestHighLevelClient prepareClient(RestHighLevelClient client, - ReadAliasName readAlias, - WriteAliasName writeAlias, - IndexName indexName, - ElasticSearchConfiguration configuration) throws IOException { + public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client, + ReadAliasName readAlias, + WriteAliasName writeAlias, + IndexName indexName, + ElasticSearchConfiguration configuration) throws IOException { return NodeMappingFactory.applyMapping( new IndexCreationFactory(configuration) .useIndex(indexName) @@ -46,7 +46,7 @@ public class MailboxIndexCreationUtil { MailboxMappingFactory.getMappingContent()); } - public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException { + public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException { return prepareClient(client, MailboxElasticSearchConstants.DEFAULT_MAILBOX_READ_ALIAS, MailboxElasticSearchConstants.DEFAULT_MAILBOX_WRITE_ALIAS, diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index c3d8595..1430111 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -52,7 +52,6 @@ import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; -import org.apache.james.util.OptionalUtils; import org.elasticsearch.index.query.TermQueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +62,8 @@ import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; + public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex { public static class ElasticSearchListeningMessageSearchIndexGroup extends Group { @@ -112,7 +113,8 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe return searcher .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit) - .map(SearchResult::getMessageUid); + .map(SearchResult::getMessageUid) + .toStream(); } @Override @@ -123,15 +125,14 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe return ImmutableList.of(); } - try (Stream<SearchResult> searchResults = searcher.search(mailboxIds, searchQuery, Optional.empty())) { - return searchResults - .peek(this::logIfNoMessageId) - .map(SearchResult::getMessageId) - .flatMap(OptionalUtils::toStream) - .distinct() - .limit(limit) - .collect(Guavate.toImmutableList()); - } + return searcher.search(mailboxIds, searchQuery, Optional.empty()) + .doOnNext(this::logIfNoMessageId) + .map(SearchResult::getMessageId) + .flatMap(Mono::justOrEmpty) + .distinct() + .take(limit) + .collect(Guavate.toImmutableList()) + .block(); } @Override @@ -144,7 +145,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe String jsonContent = generateIndexedJson(mailbox, message, session); - elasticSearchIndexer.index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId())); + elasticSearchIndexer + .index(indexIdFor(mailbox, message.getUid()), jsonContent, routingKeyFactory.from(mailbox.getMailboxId())) + .block(); } private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException { @@ -162,12 +165,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe } @Override - public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) throws IOException { + public void delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) { elasticSearchIndexer .delete(expungedUids.stream() .map(uid -> indexIdFor(mailbox, uid)) .collect(Guavate.toImmutableList()), - routingKeyFactory.from(mailbox.getMailboxId())); + routingKeyFactory.from(mailbox.getMailboxId())) + .block(); } @Override @@ -181,14 +185,16 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe } @Override - public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws IOException { + public void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) { ImmutableList<UpdatedRepresentation> updates = updatedFlagsList.stream() .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function( updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailbox, updatedFlags)) .sneakyThrow()) .collect(Guavate.toImmutableList()); - elasticSearchIndexer.update(updates, routingKeyFactory.from(mailbox.getMailboxId())); + elasticSearchIndexer + .update(updates, routingKeyFactory.from(mailbox.getMailboxId())) + .block(); } private UpdatedRepresentation createUpdatedDocumentPartFromUpdatedFlags(Mailbox mailbox, UpdatedFlags updatedFlags) throws JsonProcessingException { diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java index 85cceb1..7dd6509 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcher.java @@ -21,10 +21,10 @@ package org.apache.james.mailbox.elasticsearch.search; import java.util.Collection; import java.util.Optional; -import java.util.stream.Stream; import org.apache.james.backends.es.AliasName; import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.backends.es.ReadAliasName; import org.apache.james.backends.es.RoutingKey; import org.apache.james.backends.es.search.ScrolledSearch; @@ -37,7 +37,6 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; @@ -47,6 +46,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + public class ElasticSearchSearcher { public static final int DEFAULT_SEARCH_SIZE = 100; private static final Logger LOGGER = LoggerFactory.getLogger(ElasticSearchSearcher.class); @@ -55,7 +56,7 @@ public class ElasticSearchSearcher { JsonMessageConstants.UID, JsonMessageConstants.MESSAGE_ID); private static final int MAX_ROUTING_KEY = 5; - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; private final QueryConverter queryConverter; private final int size; private final MailboxId.Factory mailboxIdFactory; @@ -63,7 +64,7 @@ public class ElasticSearchSearcher { private final AliasName aliasName; private final RoutingKey.Factory<MailboxId> routingKeyFactory; - public ElasticSearchSearcher(RestHighLevelClient client, QueryConverter queryConverter, int size, + public ElasticSearchSearcher(ReactorElasticSearchClient client, QueryConverter queryConverter, int size, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, ReadAliasName aliasName, RoutingKey.Factory<MailboxId> routingKeyFactory) { this.client = client; @@ -75,14 +76,14 @@ public class ElasticSearchSearcher { this.routingKeyFactory = routingKeyFactory; } - public Stream<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query, - Optional<Integer> limit) { + public Flux<MessageSearchIndex.SearchResult> search(Collection<MailboxId> mailboxIds, SearchQuery query, + Optional<Integer> limit) { SearchRequest searchRequest = prepareSearch(mailboxIds, query, limit); - Stream<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest) + Flux<MessageSearchIndex.SearchResult> pairStream = new ScrolledSearch(client, searchRequest) .searchHits() .flatMap(this::extractContentFromHit); - return limit.map(pairStream::limit) + return limit.map(pairStream::take) .orElse(pairStream); } @@ -122,20 +123,20 @@ public class ElasticSearchSearcher { .orElse(size); } - private Stream<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) { + private Flux<MessageSearchIndex.SearchResult> extractContentFromHit(SearchHit hit) { DocumentField mailboxId = hit.field(JsonMessageConstants.MAILBOX_ID); DocumentField uid = hit.field(JsonMessageConstants.UID); Optional<DocumentField> id = retrieveMessageIdField(hit); if (mailboxId != null && uid != null) { Number uidAsNumber = uid.getValue(); - return Stream.of( + return Flux.just( new MessageSearchIndex.SearchResult( id.map(field -> messageIdFactory.fromString(field.getValue())), mailboxIdFactory.fromString(mailboxId.getValue()), MessageUid.of(uidAsNumber.longValue()))); } else { LOGGER.warn("Can not extract UID, MessageID and/or MailboxId for search result {}", hit.getId()); - return Stream.empty(); + return Flux.empty(); } } diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java index 04bc2d6..1813ef3 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java @@ -27,6 +27,7 @@ import java.time.ZoneId; import org.apache.james.backends.es.DockerElasticSearchExtension; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MailboxSessionUtil; import org.apache.james.mailbox.MessageManager; @@ -48,7 +49,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl; import org.apache.james.mailbox.tika.TikaTextExtractor; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.mime4j.dom.Message; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -67,7 +67,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); TikaTextExtractor textExtractor; - RestHighLevelClient client; + ReactorElasticSearchClient client; @AfterEach void tearDown() throws IOException { diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java index f6d1391..95b67f8 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java @@ -33,6 +33,7 @@ import javax.mail.util.SharedByteArrayInputStream; import org.apache.james.backends.es.DockerElasticSearchExtension; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.core.Username; import org.apache.james.mailbox.DefaultMailboxes; import org.apache.james.mailbox.MailboxSession; @@ -73,7 +74,6 @@ import org.apache.james.mailbox.store.extractor.DefaultTextExtractor; import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; import org.awaitility.Duration; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -155,7 +155,7 @@ class ElasticSearchListeningMessageSearchIndexTest { InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory(); - RestHighLevelClient client = MailboxIndexCreationUtil.prepareDefaultClient( + ReactorElasticSearchClient client = MailboxIndexCreationUtil.prepareDefaultClient( elasticSearch.getDockerElasticSearch().clientProvider().get(), elasticSearch.getDockerElasticSearch().configuration()); @@ -256,7 +256,7 @@ class ElasticSearchListeningMessageSearchIndexTest { Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it assertThatThrownBy(() -> testee.add(session, mailbox, MESSAGE_1)) - .isInstanceOf(IOException.class); + .hasCauseInstanceOf(IOException.class); elasticSearch.getDockerElasticSearch().unpause(); } @@ -330,7 +330,7 @@ class ElasticSearchListeningMessageSearchIndexTest { Thread.sleep(Duration.FIVE_SECONDS.getValueInMS()); // Docker pause is asynchronous and we found no way to poll for it assertThatThrownBy(() -> testee.delete(session, mailbox, Lists.newArrayList(MESSAGE_UID_1))) - .isInstanceOf(IOException.class); + .hasCauseInstanceOf(IOException.class); elasticSearch.getDockerElasticSearch().unpause(); } @@ -413,7 +413,7 @@ class ElasticSearchListeningMessageSearchIndexTest { .build(); assertThatThrownBy(() -> testee.update(session, mailbox, Lists.newArrayList(updatedFlags))) - .isInstanceOf(IOException.class); + .hasCauseInstanceOf(IOException.class); elasticSearch.getDockerElasticSearch().unpause(); } diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java index f8be9a0..f7d6564 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java @@ -30,6 +30,7 @@ import java.util.stream.IntStream; import org.apache.james.backends.es.DockerElasticSearchExtension; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MailboxSessionUtil; @@ -58,7 +59,6 @@ import org.apache.james.mailbox.tika.TikaHttpClientImpl; import org.apache.james.mailbox.tika.TikaTextExtractor; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.mime4j.dom.Message; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,7 +80,7 @@ class ElasticSearchSearcherTest { DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); TikaTextExtractor textExtractor; - RestHighLevelClient client; + ReactorElasticSearchClient client; private InMemoryMailboxManager storeMailboxManager; @BeforeEach diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java index 0a1e082..8765345 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearcher.java @@ -21,13 +21,11 @@ package org.apache.james.quota.search.elasticsearch; import static org.apache.james.quota.search.elasticsearch.json.JsonMessageConstants.USER; -import java.io.IOException; -import java.util.Arrays; import java.util.List; -import java.util.stream.Stream; import org.apache.james.backends.es.AliasName; import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.backends.es.ReadAliasName; import org.apache.james.backends.es.search.ScrolledSearch; import org.apache.james.core.Username; @@ -35,7 +33,6 @@ import org.apache.james.quota.search.QuotaQuery; import org.apache.james.quota.search.QuotaSearcher; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -44,14 +41,16 @@ import org.elasticsearch.search.sort.SortOrder; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Flux; + public class ElasticSearchQuotaSearcher implements QuotaSearcher { private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; private final AliasName readAlias; private final QuotaQueryConverter quotaQueryConverter; - public ElasticSearchQuotaSearcher(RestHighLevelClient client, ReadAliasName readAlias) { + public ElasticSearchQuotaSearcher(ReactorElasticSearchClient client, ReadAliasName readAlias) { this.client = client; this.readAlias = readAlias; this.quotaQueryConverter = new QuotaQueryConverter(); @@ -60,18 +59,17 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher { @Override public List<Username> search(QuotaQuery query) { try { - try (Stream<SearchHit> searchHits = searchHits(query)) { - return searchHits - .map(SearchHit::getId) - .map(Username::of) - .collect(Guavate.toImmutableList()); - } - } catch (IOException e) { + return searchHits(query) + .map(SearchHit::getId) + .map(Username::of) + .collect(Guavate.toImmutableList()) + .block(); + } catch (Exception e) { throw new RuntimeException("Unexpected exception while executing " + query, e); } } - private Stream<SearchHit> searchHits(QuotaQuery query) throws IOException { + private Flux<SearchHit> searchHits(QuotaQuery query) { if (query.getLimit().isLimited()) { return executeSingleSearch(query); } else { @@ -79,7 +77,7 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher { } } - private Stream<SearchHit> executeSingleSearch(QuotaQuery query) throws IOException { + private Flux<SearchHit> executeSingleSearch(QuotaQuery query) { SearchSourceBuilder searchSourceBuilder = searchSourceBuilder(query) .from(query.getOffset().getValue()); query.getLimit().getValue() @@ -89,12 +87,11 @@ public class ElasticSearchQuotaSearcher implements QuotaSearcher { .types(NodeMappingFactory.DEFAULT_MAPPING_NAME) .source(searchSourceBuilder); - return Arrays.stream(client.search(searchRequest, RequestOptions.DEFAULT) - .getHits() - .getHits()); + return client.search(searchRequest, RequestOptions.DEFAULT) + .flatMapMany(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits())); } - private Stream<SearchHit> executeScrolledSearch(QuotaQuery query) { + private Flux<SearchHit> executeScrolledSearch(QuotaQuery query) { return new ScrolledSearch(client, new SearchRequest(readAlias.getValue()) .types(NodeMappingFactory.DEFAULT_MAPPING_NAME) diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java index c1918a9..cf1aad6 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/QuotaSearchIndexCreationUtil.java @@ -26,11 +26,11 @@ import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.IndexCreationFactory; import org.apache.james.backends.es.IndexName; import org.apache.james.backends.es.NodeMappingFactory; -import org.elasticsearch.client.RestHighLevelClient; +import org.apache.james.backends.es.ReactorElasticSearchClient; public class QuotaSearchIndexCreationUtil { - public static RestHighLevelClient prepareClient(RestHighLevelClient client, + public static ReactorElasticSearchClient prepareClient(ReactorElasticSearchClient client, AliasName readAlias, AliasName writeAlias, IndexName indexName, @@ -46,7 +46,7 @@ public class QuotaSearchIndexCreationUtil { QuotaRatioMappingFactory.getMappingContent()); } - public static RestHighLevelClient prepareDefaultClient(RestHighLevelClient client, ElasticSearchConfiguration configuration) throws IOException { + public static ReactorElasticSearchClient prepareDefaultClient(ReactorElasticSearchClient client, ElasticSearchConfiguration configuration) throws IOException { return prepareClient(client, QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS, QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_WRITE_ALIAS, diff --git a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java index b124c18..0a80a56 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/main/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListener.java @@ -70,9 +70,11 @@ public class ElasticSearchQuotaMailboxListener implements MailboxListener.GroupM private void handleEvent(QuotaUsageUpdatedEvent event) throws IOException { Username user = event.getUsername(); - indexer.index(toDocumentId(user), - quotaRatioToElasticSearchJson.convertToJson(event), - routingKeyFactory.from(user)); + indexer + .index(toDocumentId(user), + quotaRatioToElasticSearchJson.convertToJson(event), + routingKeyFactory.from(user)) + .block(); } private DocumentId toDocumentId(Username user) { diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java index cc7d8b2..991c5d2 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/ElasticSearchQuotaSearchTestSystemExtension.java @@ -26,6 +26,7 @@ import java.io.IOException; import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.dnsservice.api.DNSService; import org.apache.james.domainlist.memory.MemoryDomainList; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; @@ -34,7 +35,6 @@ import org.apache.james.quota.search.QuotaSearchTestSystem; import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener; import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson; import org.apache.james.user.memory.MemoryUsersRepository; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -45,7 +45,7 @@ import org.junit.jupiter.api.extension.ParameterResolver; public class ElasticSearchQuotaSearchTestSystemExtension implements ParameterResolver, BeforeEachCallback, AfterEachCallback { private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE; - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @Override public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { diff --git a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java index ba482aa..9e5671b 100644 --- a/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java +++ b/mailbox/plugin/quota-search-elasticsearch/src/test/java/org/apache/james/quota/search/elasticsearch/events/ElasticSearchQuotaMailboxListenerTest.java @@ -30,6 +30,7 @@ import org.apache.james.backends.es.DockerElasticSearchExtension; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.ElasticSearchIndexer; import org.apache.james.backends.es.NodeMappingFactory; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.mailbox.events.Event; import org.apache.james.mailbox.events.Group; import org.apache.james.mailbox.quota.QuotaFixture.Counts; @@ -41,7 +42,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory; import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.AfterEach; @@ -58,7 +58,7 @@ class ElasticSearchQuotaMailboxListenerTest { DockerElasticSearchExtension elasticSearch = new DockerElasticSearchExtension(); ElasticSearchQuotaMailboxListener quotaMailboxListener; - RestHighLevelClient client; + ReactorElasticSearchClient client; @BeforeEach void setUp() throws IOException { @@ -100,10 +100,11 @@ class ElasticSearchQuotaMailboxListenerTest { elasticSearch.awaitForElasticSearch(); - SearchResponse searchResponse = client.search(new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue()) - .types(NodeMappingFactory.DEFAULT_MAPPING_NAME) - .source(new SearchSourceBuilder() - .query(QueryBuilders.matchAllQuery()))); + SearchRequest searchRequest = new SearchRequest(QuotaRatioElasticSearchConstants.DEFAULT_QUOTA_RATIO_READ_ALIAS.getValue()) + .types(NodeMappingFactory.DEFAULT_MAPPING_NAME) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery())); + SearchResponse searchResponse = client.search(searchRequest).block(); assertThat(searchResponse.getHits().getTotalHits()).isEqualTo(1); } diff --git a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java index 38e1d5c..233fb26 100644 --- a/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java +++ b/mpt/impl/imap-mailbox/elasticsearch/src/test/java/org/apache/james/mpt/imapmailbox/elasticsearch/host/ElasticSearchHostSystem.java @@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.core.quota.QuotaCountLimit; import org.apache.james.core.quota.QuotaSizeLimit; import org.apache.james.imap.api.process.ImapProcessor; @@ -54,7 +55,6 @@ import org.apache.james.metrics.logger.DefaultMetricFactory; import org.apache.james.mpt.api.ImapFeatures; import org.apache.james.mpt.api.ImapFeatures.Feature; import org.apache.james.mpt.host.JamesImapHostSystem; -import org.elasticsearch.client.RestHighLevelClient; public class ElasticSearchHostSystem extends JamesImapHostSystem { @@ -63,7 +63,7 @@ public class ElasticSearchHostSystem extends JamesImapHostSystem { private DockerElasticSearch dockerElasticSearch; private StoreMailboxManager mailboxManager; - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @Override public void beforeTest() throws Exception { diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java index 7aab691..ed9b27e 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchClientModule.java @@ -24,10 +24,10 @@ import java.util.Set; import org.apache.james.backends.es.ClientProvider; import org.apache.james.backends.es.ElasticSearchHealthCheck; import org.apache.james.backends.es.IndexName; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.mailbox.elasticsearch.ElasticSearchMailboxConfiguration; import org.apache.james.quota.search.elasticsearch.ElasticSearchQuotaConfiguration; -import org.elasticsearch.client.RestHighLevelClient; import com.google.common.collect.ImmutableSet; import com.google.inject.AbstractModule; @@ -41,7 +41,7 @@ public class ElasticSearchClientModule extends AbstractModule { @Override protected void configure() { bind(ClientProvider.class).in(Scopes.SINGLETON); - bind(RestHighLevelClient.class).toProvider(ClientProvider.class); + bind(ReactorElasticSearchClient.class).toProvider(ClientProvider.class); Multibinder.newSetBinder(binder(), HealthCheck.class) .addBinding() diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java index bcc8148..b02c837 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchMailboxModule.java @@ -32,6 +32,7 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.backends.es.RoutingKey; import org.apache.james.lifecycle.api.StartUpCheck; import org.apache.james.lifecycle.api.Startable; @@ -51,7 +52,6 @@ import org.apache.james.mailbox.store.search.MessageSearchIndex; import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.james.utils.PropertiesProvider; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,12 +68,12 @@ public class ElasticSearchMailboxModule extends AbstractModule { private final ElasticSearchConfiguration configuration; private final ElasticSearchMailboxConfiguration mailboxConfiguration; - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; @Inject MailboxIndexCreator(ElasticSearchConfiguration configuration, ElasticSearchMailboxConfiguration mailboxConfiguration, - RestHighLevelClient client) { + ReactorElasticSearchClient client) { this.configuration = configuration; this.mailboxConfiguration = mailboxConfiguration; this.client = client; @@ -114,7 +114,7 @@ public class ElasticSearchMailboxModule extends AbstractModule { @Provides @Singleton @Named(MailboxElasticSearchConstants.InjectionNames.MAILBOX) - private ElasticSearchIndexer createMailboxElasticSearchIndexer(RestHighLevelClient client, + private ElasticSearchIndexer createMailboxElasticSearchIndexer(ReactorElasticSearchClient client, ElasticSearchMailboxConfiguration configuration) { return new ElasticSearchIndexer( client, @@ -123,7 +123,7 @@ public class ElasticSearchMailboxModule extends AbstractModule { @Provides @Singleton - private ElasticSearchSearcher createMailboxElasticSearchSearcher(RestHighLevelClient client, + private ElasticSearchSearcher createMailboxElasticSearchSearcher(ReactorElasticSearchClient client, QueryConverter queryConverter, MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java index eb4f9b4..123b02f 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchQuotaSearcherModule.java @@ -30,6 +30,7 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.lifecycle.api.Startable; import org.apache.james.mailbox.events.MailboxListener; import org.apache.james.quota.search.QuotaSearcher; @@ -42,7 +43,6 @@ import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearc import org.apache.james.utils.InitializationOperation; import org.apache.james.utils.InitilizationOperationBuilder; import org.apache.james.utils.PropertiesProvider; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,12 +57,12 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule { static class ElasticSearchQuotaIndexCreator implements Startable { private final ElasticSearchConfiguration configuration; private final ElasticSearchQuotaConfiguration quotaConfiguration; - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; @Inject ElasticSearchQuotaIndexCreator(ElasticSearchConfiguration configuration, ElasticSearchQuotaConfiguration quotaConfiguration, - RestHighLevelClient client) { + ReactorElasticSearchClient client) { this.configuration = configuration; this.quotaConfiguration = quotaConfiguration; this.client = client; @@ -88,7 +88,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule { @Provides @Singleton - public QuotaSearcher provideSearcher(RestHighLevelClient client, ElasticSearchQuotaConfiguration configuration) { + public QuotaSearcher provideSearcher(ReactorElasticSearchClient client, ElasticSearchQuotaConfiguration configuration) { return new ElasticSearchQuotaSearcher(client, configuration.getReadAliasQuotaRatioName()); } @@ -107,7 +107,7 @@ public class ElasticSearchQuotaSearcherModule extends AbstractModule { @Provides @Singleton - public ElasticSearchQuotaMailboxListener provideListener(RestHighLevelClient client, + public ElasticSearchQuotaMailboxListener provideListener(ReactorElasticSearchClient client, ElasticSearchQuotaConfiguration configuration) { return new ElasticSearchQuotaMailboxListener( new ElasticSearchIndexer(client, diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java index 4bf89d7..3565b38 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/ElasticSearchStartUpCheck.java @@ -24,10 +24,10 @@ import java.io.IOException; import javax.inject.Inject; import org.apache.james.backends.es.ElasticSearchConfiguration; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.lifecycle.api.StartUpCheck; import org.elasticsearch.Version; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,10 +40,10 @@ public class ElasticSearchStartUpCheck implements StartUpCheck { public static final String CHECK_NAME = "ElasticSearchStartUpCheck"; - private final RestHighLevelClient client; + private final ReactorElasticSearchClient client; @Inject - private ElasticSearchStartUpCheck(RestHighLevelClient client) { + private ElasticSearchStartUpCheck(ReactorElasticSearchClient client) { this.client = client; } diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java index 23e7cff..81aa4c0 100644 --- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java +++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/ESReporterTest.java @@ -34,6 +34,7 @@ import java.util.TimerTask; import java.util.stream.Collectors; import org.apache.commons.net.imap.IMAPClient; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.core.Username; import org.apache.james.jmap.AccessToken; import org.apache.james.jmap.draft.JmapGuiceProbe; @@ -45,7 +46,6 @@ import org.apache.james.modules.protocols.ImapGuiceProbe; import org.apache.james.utils.DataProbeImpl; import org.awaitility.Duration; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.junit.jupiter.api.AfterEach; @@ -152,12 +152,13 @@ class ESReporterTest { } private boolean checkMetricRecordedInElasticSearch() { - try (RestHighLevelClient client = elasticSearchExtension.getDockerES().clientProvider().get()) { + try (ReactorElasticSearchClient client = elasticSearchExtension.getDockerES().clientProvider().get()) { SearchRequest searchRequest = new SearchRequest() .source(new SearchSourceBuilder() .query(QueryBuilders.matchAllQuery())); return !Arrays.stream(client .search(searchRequest) + .block() .getHits() .getHits()) .filter(searchHit -> searchHit.getIndex().startsWith(TestDockerESMetricReporterModule.METRICS_INDEX)) diff --git a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java index e57771f..392b859 100644 --- a/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java +++ b/server/protocols/webadmin/webadmin-mailbox/src/test/java/org/apache/james/webadmin/routes/ElasticSearchQuotaSearchExtension.java @@ -27,6 +27,7 @@ import org.apache.james.backends.es.DockerElasticSearch; import org.apache.james.backends.es.DockerElasticSearchSingleton; import org.apache.james.backends.es.ElasticSearchConfiguration; import org.apache.james.backends.es.ElasticSearchIndexer; +import org.apache.james.backends.es.ReactorElasticSearchClient; import org.apache.james.dnsservice.api.DNSService; import org.apache.james.domainlist.memory.MemoryDomainList; import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources; @@ -39,7 +40,6 @@ import org.apache.james.quota.search.elasticsearch.UserRoutingKeyFactory; import org.apache.james.quota.search.elasticsearch.events.ElasticSearchQuotaMailboxListener; import org.apache.james.quota.search.elasticsearch.json.QuotaRatioToElasticSearchJson; import org.apache.james.user.memory.MemoryUsersRepository; -import org.elasticsearch.client.RestHighLevelClient; import org.junit.jupiter.api.extension.AfterEachCallback; import org.junit.jupiter.api.extension.BeforeEachCallback; import org.junit.jupiter.api.extension.ExtensionContext; @@ -53,7 +53,7 @@ public class ElasticSearchQuotaSearchExtension implements ParameterResolver, Bef private final DockerElasticSearch elasticSearch = DockerElasticSearchSingleton.INSTANCE; private WebAdminQuotaSearchTestSystem restQuotaSearchTestSystem; private TemporaryFolder temporaryFolder = new TemporaryFolder(); - private RestHighLevelClient client; + private ReactorElasticSearchClient client; @Override public void beforeEach(ExtensionContext context) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
