This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a8dcf2192885b4c523cca85d69b06194a98301fe Author: Matthieu Baechler <[email protected]> AuthorDate: Fri Mar 13 11:43:33 2020 +0100 JAMES-3144 Reactify ScrollSearch --- .../james/backends/es/search/ScrolledSearch.java | 75 +++++++++++++--------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java index 10146ea..cf8a83b 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java @@ -21,6 +21,7 @@ package org.apache.james.backends.es.search; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.james.backends.es.ReactorElasticSearchClient; import org.elasticsearch.action.search.ClearScrollRequest; @@ -31,11 +32,14 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.search.SearchHit; -import reactor.core.Disposable; +import com.github.fge.lambdas.Throwing; + import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; public class ScrolledSearch { + private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1); private final ReactorElasticSearchClient client; @@ -46,52 +50,59 @@ public class ScrolledSearch { this.searchRequest = searchRequest; } - public Flux<SearchHit> searchHits() { return searchResponses() - .flatMap(searchResponse -> Flux.fromArray(searchResponse.getHits().getHits())); + .concatMap(searchResponse -> Flux.just(searchResponse.getHits().getHits())); } public Flux<SearchResponse> searchResponses() { - return ensureClosing(Flux.from(startScrolling(searchRequest)) - .expand(this::nextResponse)); - } + return Flux.push(sink -> { + AtomicReference<Optional<String>> scrollId = new AtomicReference<>(Optional.empty()); + sink.onRequest(numberOfRequestedElements -> next(sink, scrollId, numberOfRequestedElements)); - private Mono<SearchResponse> startScrolling(SearchRequest searchRequest) { - return client.search(searchRequest, RequestOptions.DEFAULT); + sink.onDispose(() -> close(scrollId)); + }); } - public Mono<SearchResponse> nextResponse(SearchResponse previous) { - if (allSearchResponsesConsumed(previous)) { - return Mono.empty(); + private void next(FluxSink<SearchResponse> sink, AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) { + if (numberOfRequestedElements <= 0) { + return; } - return client.scroll( - new SearchScrollRequest() - .scrollId(previous.getScrollId()) - .scroll(TIMEOUT), - RequestOptions.DEFAULT); - } + Consumer<SearchResponse> onResponse = searchResponse -> { + scrollId.set(Optional.of(searchResponse.getScrollId())); + sink.next(searchResponse); - private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { - return searchResponse.getHits().getHits().length == 0; - } + boolean noHitsLeft = searchResponse.getHits().getHits().length == 0; + if (noHitsLeft) { + sink.complete(); + } else { + next(sink, scrollId, numberOfRequestedElements - 1); + } + }; - private Flux<SearchResponse> ensureClosing(Flux<SearchResponse> origin) { - AtomicReference<SearchResponse> latest = new AtomicReference<>(); - return origin - .doOnNext(latest::set) - .doOnTerminate(close(latest)); - } + Consumer<Throwable> onFailure = sink::error; - public Runnable close(AtomicReference<SearchResponse> latest) { - return () -> Optional.ofNullable(latest.getAndSet(null)).map(this::clearScroll); + buildRequest(scrollId.get()) + .subscribe(onResponse, onFailure); } - private Disposable clearScroll(SearchResponse current) { - ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); - clearScrollRequest.addScrollId(current.getScrollId()); + private Mono<SearchResponse> buildRequest(Optional<String> scrollId) { + return scrollId.map(id -> + client.scroll( + new SearchScrollRequest() + .scrollId(scrollId.get()) + .scroll(TIMEOUT), + RequestOptions.DEFAULT)) + .orElseGet(() -> client.search(searchRequest, RequestOptions.DEFAULT)); + } - return client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe(); + public void close(AtomicReference<Optional<String>> scrollId) { + scrollId.get().map(id -> { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(id); + return clearScrollRequest; + }).ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> client.clearScroll(clearScrollRequest, RequestOptions.DEFAULT).subscribe()).sneakyThrow()); } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
