This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e77a6b87c3e7ac469ddbb081863d6b110c555dcb Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 16 17:27:33 2019 +0700 JAMES-2765 Backend-ES v6 should still include ScrollIterable --- .../james/backends/es/v6/IndexCreationFactory.java | 2 +- .../backends/es/v6/search/ListenerToFuture.java | 51 ++++++ .../backends/es/v6/search/ScrollIterable.java | 91 ++++++++++ .../backends/es/v6/search/ScrollIterableTest.java | 197 +++++++++++++++++++++ 4 files changed, 340 insertions(+), 1 deletion(-) diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java index 2cba4ad..111bbcf 100644 --- a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java +++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/IndexCreationFactory.java @@ -42,7 +42,7 @@ import com.google.common.collect.ImmutableList; public class IndexCreationFactory { - static class AliasSpecificationStep { + public static class AliasSpecificationStep { private final int nbShards; private final int nbReplica; private final IndexName indexName; diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java new file mode 100644 index 0000000..1ae43b5 --- /dev/null +++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ListenerToFuture.java @@ -0,0 +1,51 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es.v6.search; + +import java.util.concurrent.CompletableFuture; + +import org.elasticsearch.action.ActionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ListenerToFuture<T> implements ActionListener<T> { + private static final Logger LOGGER = LoggerFactory.getLogger(ListenerToFuture.class); + + private CompletableFuture<T> future; + + public ListenerToFuture() { + this.future = new CompletableFuture<>(); + } + + @Override + public void onResponse(T t) { + future.complete(t); + } + + @Override + public void onFailure(Exception e) { + LOGGER.warn("Error while waiting ElasticSearch query execution: ", e); + future.completeExceptionally(e); + } + + public CompletableFuture<T> getFuture() { + return future; + } +} diff --git a/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java new file mode 100644 index 0000000..a5e4a70 --- /dev/null +++ b/backends-common/elasticsearch-v6/src/main/java/org/apache/james/backends/es/v6/search/ScrollIterable.java @@ -0,0 +1,91 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es.v6.search; + +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import org.apache.james.util.streams.Iterators; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; + +public class ScrollIterable implements Iterable<SearchResponse> { + private static final TimeValue TIMEOUT = new TimeValue(60000); + + private final RestHighLevelClient client; + private final SearchRequest searchRequest; + + public ScrollIterable(RestHighLevelClient client, SearchRequest searchRequest) { + this.client = client; + this.searchRequest = searchRequest; + } + + @Override + public Iterator<SearchResponse> iterator() { + return new ScrollIterator(client, searchRequest); + } + + public Stream<SearchResponse> stream() { + return Iterators.toStream(iterator()); + } + + public static class ScrollIterator implements Iterator<SearchResponse> { + private final RestHighLevelClient client; + private CompletableFuture<SearchResponse> searchResponseFuture; + + ScrollIterator(RestHighLevelClient client, SearchRequest searchRequest) { + this.client = client; + ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>(); + client.searchAsync(searchRequest, RequestOptions.DEFAULT, listener); + + this.searchResponseFuture = listener.getFuture(); + } + + @Override + public boolean hasNext() { + SearchResponse join = searchResponseFuture.join(); + return !allSearchResponsesConsumed(join); + } + + @Override + public SearchResponse next() { + SearchResponse result = searchResponseFuture.join(); + ListenerToFuture<SearchResponse> listener = new ListenerToFuture<>(); + client.scrollAsync( + new SearchScrollRequest() + .scrollId(result.getScrollId()) + .scroll(TIMEOUT), + RequestOptions.DEFAULT, + listener); + searchResponseFuture = listener.getFuture(); + return result; + } + + private boolean allSearchResponsesConsumed(SearchResponse searchResponse) { + return searchResponse.getHits().getHits().length == 0; + } + } + +} diff --git a/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java new file mode 100644 index 0000000..c9709b1 --- /dev/null +++ b/backends-common/elasticsearch-v6/src/test/java/org/apache/james/backends/es/v6/search/ScrollIterableTest.java @@ -0,0 +1,197 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.backends.es.v6.search; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.james.backends.es.v6.ClientProvider; +import org.apache.james.backends.es.v6.DockerElasticSearchRule; +import org.apache.james.backends.es.v6.ElasticSearchConfiguration; +import org.apache.james.backends.es.v6.IndexCreationFactory; +import org.apache.james.backends.es.v6.IndexName; +import org.apache.james.backends.es.v6.ReadAliasName; +import org.apache.james.backends.es.v6.TypeName; +import org.awaitility.Duration; +import org.awaitility.core.ConditionFactory; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +public class ScrollIterableTest { + + private static final TimeValue TIMEOUT = new TimeValue(6000); + private static final int SIZE = 2; + private static final String MESSAGE = "message"; + private static final IndexName INDEX_NAME = new IndexName("index"); + private static final ReadAliasName ALIAS_NAME = new ReadAliasName("alias"); + private static final TypeName TYPE_NAME = new TypeName("messages"); + + private static final ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS); + + @Rule + public DockerElasticSearchRule elasticSearch = new DockerElasticSearchRule(); + private ClientProvider clientProvider; + + @Before + public void setUp() { + clientProvider = elasticSearch.clientProvider(); + new IndexCreationFactory(ElasticSearchConfiguration.DEFAULT_CONFIGURATION) + .useIndex(INDEX_NAME) + .addAlias(ALIAS_NAME) + .createIndexAndAliases(clientProvider.get()); + elasticSearch.awaitForElasticSearch(); + } + + @Test + public void scrollIterableShouldWorkWhenEmpty() throws Exception { + try (RestHighLevelClient client = clientProvider.get()) { + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .types(TYPE_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(new ScrollIterable(client, searchRequest)) + .isEmpty(); + } + } + + @Test + public void scrollIterableShouldWorkWhenOneElement() throws Exception { + try (RestHighLevelClient client = clientProvider.get()) { + String id = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .types(TYPE_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + .containsOnly(id); + } + } + + @Test + public void scrollIterableShouldWorkWhenSizeElement() throws Exception { + try (RestHighLevelClient client = clientProvider.get()) { + String id1 = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + String id2 = "2"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .types(TYPE_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + .containsOnly(id1, id2); + } + } + + @Test + public void scrollIterableShouldWorkWhenMoreThanSizeElement() throws Exception { + try (RestHighLevelClient client = clientProvider.get()) { + String id1 = "1"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id1) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + String id2 = "2"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id2) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + String id3 = "3"; + client.index(new IndexRequest(INDEX_NAME.getValue(), TYPE_NAME.getValue(), id3) + .source(MESSAGE, "Sample message"), + RequestOptions.DEFAULT); + + elasticSearch.awaitForElasticSearch(); + WAIT_CONDITION.untilAsserted(() -> hasIdsInIndex(client, id1, id2, id3)); + + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .types(TYPE_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery()) + .size(SIZE)); + + assertThat(convertToIdList(new ScrollIterable(client, searchRequest))) + .containsOnly(id1, id2, id3); + } + } + + private List<String> convertToIdList(ScrollIterable scrollIterable) { + return scrollIterable.stream() + .flatMap(searchResponse -> Arrays.stream(searchResponse.getHits().getHits())) + .map(SearchHit::getId) + .collect(Collectors.toList()); + } + + private void hasIdsInIndex(RestHighLevelClient client, String... ids) throws IOException { + SearchRequest searchRequest = new SearchRequest(INDEX_NAME.getValue()) + .types(TYPE_NAME.getValue()) + .scroll(TIMEOUT) + .source(new SearchSourceBuilder() + .query(QueryBuilders.matchAllQuery())); + + SearchHit[] hits = client.search(searchRequest, RequestOptions.DEFAULT) + .getHits() + .getHits(); + + assertThat(hits) + .extracting(SearchHit::getId) + .contains(ids); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
