This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a8dcf2192885b4c523cca85d69b06194a98301fe
Author: Matthieu Baechler <[email protected]>
AuthorDate: Fri Mar 13 11:43:33 2020 +0100

    JAMES-3144 Reactify ScrollSearch
---
 .../james/backends/es/search/ScrolledSearch.java   | 75 +++++++++++++---------
 1 file changed, 43 insertions(+), 32 deletions(-)

diff --git 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
index 10146ea..cf8a83b 100644
--- 
a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
+++ 
b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/search/ScrolledSearch.java
@@ -21,6 +21,7 @@ package org.apache.james.backends.es.search;
 
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 
 import org.apache.james.backends.es.ReactorElasticSearchClient;
 import org.elasticsearch.action.search.ClearScrollRequest;
@@ -31,11 +32,14 @@ import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.search.SearchHit;
 
-import reactor.core.Disposable;
+import com.github.fge.lambdas.Throwing;
+
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
 import reactor.core.publisher.Mono;
 
 public class ScrolledSearch {
+
     private static final TimeValue TIMEOUT = TimeValue.timeValueMinutes(1);
 
     private final ReactorElasticSearchClient client;
@@ -46,52 +50,59 @@ public class ScrolledSearch {
         this.searchRequest = searchRequest;
     }
 
-
     public Flux<SearchHit> searchHits() {
         return searchResponses()
-            .flatMap(searchResponse -> 
Flux.fromArray(searchResponse.getHits().getHits()));
+            .concatMap(searchResponse -> 
Flux.just(searchResponse.getHits().getHits()));
     }
 
     public Flux<SearchResponse> searchResponses() {
-        return ensureClosing(Flux.from(startScrolling(searchRequest))
-            .expand(this::nextResponse));
-    }
+        return Flux.push(sink -> {
+            AtomicReference<Optional<String>> scrollId = new 
AtomicReference<>(Optional.empty());
+            sink.onRequest(numberOfRequestedElements -> next(sink, scrollId, 
numberOfRequestedElements));
 
-    private Mono<SearchResponse> startScrolling(SearchRequest searchRequest) {
-        return client.search(searchRequest, RequestOptions.DEFAULT);
+            sink.onDispose(() -> close(scrollId));
+        });
     }
 
-    public Mono<SearchResponse> nextResponse(SearchResponse previous) {
-        if (allSearchResponsesConsumed(previous)) {
-            return Mono.empty();
+    private void next(FluxSink<SearchResponse> sink, 
AtomicReference<Optional<String>> scrollId, long numberOfRequestedElements) {
+        if (numberOfRequestedElements <= 0) {
+            return;
         }
 
-        return client.scroll(
-            new SearchScrollRequest()
-                .scrollId(previous.getScrollId())
-                .scroll(TIMEOUT),
-            RequestOptions.DEFAULT);
-    }
+        Consumer<SearchResponse> onResponse = searchResponse -> {
+            scrollId.set(Optional.of(searchResponse.getScrollId()));
+            sink.next(searchResponse);
 
-    private boolean allSearchResponsesConsumed(SearchResponse searchResponse) {
-        return searchResponse.getHits().getHits().length == 0;
-    }
+            boolean noHitsLeft = searchResponse.getHits().getHits().length == 
0;
+            if (noHitsLeft) {
+                sink.complete();
+            } else {
+                next(sink, scrollId, numberOfRequestedElements - 1);
+            }
+        };
 
-    private Flux<SearchResponse> ensureClosing(Flux<SearchResponse> origin) {
-        AtomicReference<SearchResponse> latest = new AtomicReference<>();
-        return origin
-            .doOnNext(latest::set)
-            .doOnTerminate(close(latest));
-    }
+        Consumer<Throwable> onFailure = sink::error;
 
-    public Runnable close(AtomicReference<SearchResponse> latest) {
-        return () -> 
Optional.ofNullable(latest.getAndSet(null)).map(this::clearScroll);
+        buildRequest(scrollId.get())
+            .subscribe(onResponse, onFailure);
     }
 
-    private Disposable clearScroll(SearchResponse current) {
-        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
-        clearScrollRequest.addScrollId(current.getScrollId());
+    private Mono<SearchResponse> buildRequest(Optional<String> scrollId) {
+        return scrollId.map(id ->
+            client.scroll(
+                new SearchScrollRequest()
+                    .scrollId(scrollId.get())
+                    .scroll(TIMEOUT),
+                RequestOptions.DEFAULT))
+            .orElseGet(() -> client.search(searchRequest, 
RequestOptions.DEFAULT));
+    }
 
-        return client.clearScroll(clearScrollRequest, 
RequestOptions.DEFAULT).subscribe();
+    public void close(AtomicReference<Optional<String>> scrollId) {
+        scrollId.get().map(id -> {
+                ClearScrollRequest clearScrollRequest = new 
ClearScrollRequest();
+                clearScrollRequest.addScrollId(id);
+                return clearScrollRequest;
+            
}).ifPresent(Throwing.<ClearScrollRequest>consumer(clearScrollRequest -> 
client.clearScroll(clearScrollRequest, 
RequestOptions.DEFAULT).subscribe()).sneakyThrow());
     }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to