This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 03729af5c0fd33433a958b3cbbaf31737a96a6f9
Author: Benoit Tellier <[email protected]>
AuthorDate: Mon Nov 2 08:21:34 2020 +0100

    JAMES-2037 Use Flux for MessageManager::search
---
 .../org/apache/james/mailbox/MessageManager.java   |   3 +-
 .../ElasticSearchListeningMessageSearchIndex.java  |   6 +-
 .../ElasticSearchIntegrationTest.java              |  24 +--
 ...asticSearchListeningMessageSearchIndexTest.java |  26 ++--
 .../lucene/search/LuceneMessageSearchIndex.java    |   5 +-
 .../LuceneMailboxMessageSearchIndexTest.java       | 106 +++++++-------
 .../james/vault/DeletedMessageVaultHookTest.java   |  12 +-
 .../james/mailbox/store/StoreMessageManager.java   |   7 +-
 .../store/search/LazyMessageSearchIndex.java       |   3 +-
 .../mailbox/store/search/MessageSearchIndex.java   |   3 +-
 .../store/search/SimpleMessageSearchIndex.java     |   5 +-
 .../store/AbstractCombinationManagerTest.java      |  12 +-
 .../search/AbstractMessageSearchIndexTest.java     | 162 +++++++++++----------
 .../imap/processor/AbstractMailboxProcessor.java   |   2 +-
 .../james/imap/processor/SearchProcessor.java      |   6 +-
 .../imap/processor/base/SelectedMailboxImpl.java   |  10 +-
 .../james/imap/processor/SearchProcessorTest.java  |   5 +-
 .../processor/base/MailboxEventAnalyserTest.java   |   5 +-
 .../processor/base/SelectedMailboxImplTest.java    |  13 +-
 .../java/org/apache/james/SearchModuleChooser.java |   3 +-
 .../org/apache/james/FakeMessageSearchIndex.java   |   3 +-
 21 files changed, 207 insertions(+), 214 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java 
b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
index 6a8ab49..779d84e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -113,7 +112,7 @@ public interface MessageManager {
      * @throws MailboxException
      *             when search fails for other reasons
      */
-    Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession 
mailboxSession) throws MailboxException;
+    Publisher<MessageUid> search(SearchQuery searchQuery, MailboxSession 
mailboxSession) throws MailboxException;
 
     /**
      * Expunges messages in the given range from this mailbox by first 
retrieving the messages to be deleted
diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 2d0df24..b6245a8 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -34,7 +34,6 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -121,14 +120,13 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
     }
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         Optional<Integer> noLimit = Optional.empty();
 
         return searcher
             .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, 
noLimit)
-            .map(SearchResult::getMessageUid)
-            .toStream();
+            .map(SearchResult::getMessageUid);
     }
     
     @Override
diff --git 
a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
index 46eb518..5ae6e4c 100644
--- 
a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
+++ 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java
@@ -57,6 +57,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.google.common.base.Strings;
 
+import reactor.core.publisher.Flux;
+
 class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest {
 
     static final int BATCH_SIZE = 1;
@@ -137,7 +139,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 recipient)), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 recipient)), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -156,7 +158,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 recipient)), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 recipient)), session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -175,7 +177,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")),
 session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")),
 session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -194,7 +196,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")),
 session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")),
 session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -214,7 +216,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)),
 session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)),
 session)).toStream())
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -236,7 +238,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")),
 session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")),
 session)).toStream())
             .containsExactly(customDateHeaderMessageId.getUid(), 
customStringHeaderMessageId.getUid());
     }
 
@@ -258,7 +260,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        assertThat(messageManager.search(SearchQuery.of(SearchQuery.all()), 
session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), 
session)).toStream())
             .contains(customStringHeaderMessageId.getUid());
     }
 
@@ -290,7 +292,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "[email protected]")), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "[email protected]")), session)).toStream())
             .containsOnly(messageId2.getUid());
     }
 
@@ -322,7 +324,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "alice-test")), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "alice-test")), session)).toStream())
             .containsOnly(messageId2.getUid());
     }
 
@@ -354,7 +356,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "[email protected]")), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "[email protected]")), session)).toStream())
             .containsOnly(messageId1.getUid());
     }
 
@@ -386,7 +388,7 @@ class ElasticSearchIntegrationTest extends 
AbstractMessageSearchIndexTest {
 
         elasticSearch.awaitForElasticSearch();
 
-        
assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "domain-test.tld")), session))
+        
assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To,
 "domain-test.tld")), session)).toStream())
             .containsOnly(messageId1.getUid());
     }
 }
\ No newline at end of file
diff --git 
a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index 1dc6d7e..1c588af 100644
--- 
a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ 
b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -202,7 +202,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -213,7 +213,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid());
     }
 
@@ -225,7 +225,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -237,7 +237,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid(), MESSAGE_2.getUid());
     }
 
@@ -255,7 +255,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid());
     }
 
@@ -279,7 +279,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -294,7 +294,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_2.getUid());
     }
 
@@ -309,7 +309,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -323,7 +323,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -361,7 +361,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -382,7 +382,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -404,7 +404,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .containsExactly(MESSAGE_1.getUid());
     }
 
@@ -438,7 +438,7 @@ class ElasticSearchListeningMessageSearchIndexTest {
         elasticSearch.awaitForElasticSearch();
 
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        assertThat(testee.search(session, mailbox, query))
+        assertThat(testee.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
diff --git 
a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
 
b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index c9ddec0..0304099 100644
--- 
a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ 
b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -456,11 +456,10 @@ public class LuceneMessageSearchIndex extends 
ListeningMessageSearchIndex {
     
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
 
-        return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), 
searchQuery)
-            .stream()
+        return 
Flux.fromIterable(searchMultimap(ImmutableList.of(mailbox.getMailboxId()), 
searchQuery))
             .map(SearchResult::getMessageUid);
     }
 
diff --git 
a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
 
b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
index 3f913f0..32813af 100644
--- 
a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
+++ 
b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java
@@ -195,105 +195,105 @@ class LuceneMailboxMessageSearchIndexTest {
     @Test
     void bodySearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD + 
CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).isEmpty();
     }
     
     @Test
     void bodySearchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(RHUBARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void bodySearchNotMatchPhraseOnlyInFrom() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.bodyContains(FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.bodyContains(SUBJECT_PART));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void textSearchShouldMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD + 
CUSTARD));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).isEmpty();
     }
 
     @Test
     void textSearchMatchShouldBeCaseInsensitive() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US)));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void addressSearchShouldMatchToFullAddress() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.address(AddressType.To,FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void addressSearchShouldMatchToDisplayName() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.address(AddressType.To,"Harry"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void addressSearchShouldMatchToEmail() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.address(AddressType.To,"[email protected]"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void addressSearchShouldMatchFrom() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.address(AddressType.From,"[email protected]"));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
 
     @Test
     void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.mailContains(FROM_ADDRESS));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.mailContains(SUBJECT_PART));
-        Stream<MessageUid> result = index.search(session, mailbox3, query);
+        Stream<MessageUid> result = index.search(session, mailbox3, 
query).toStream();
         assertThat(result).containsExactly(uid5);
     }
     
     @Test
     void searchAllShouldMatchAllMailboxEmails() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        Stream<MessageUid> result = index.search(session, mailbox2, query);
+        Stream<MessageUid> result = index.search(session, mailbox2, 
query).toStream();
         assertThat(result).containsExactly(uid2);
     }
 
@@ -345,42 +345,42 @@ class LuceneMailboxMessageSearchIndexTest {
     @Test
     void flagSearchShouldMatch() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flag.DELETED));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
     
     @Test
     void bodySearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = SearchQuery.of(SearchQuery.bodyContains("body"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void textSearchShouldMatchSeveralEmails() throws Exception {    
         SearchQuery query = SearchQuery.of(SearchQuery.mailContains("body"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void headerSearchShouldMatch() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.headerContains("Subject", "test"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid4);
     }
     
     @Test
     void headerExistsShouldMatch() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.headerExists("Subject"));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid4);
     }
     
     @Test
     void flagUnsetShouldMatch() throws Exception {
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flag.DRAFT));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -390,7 +390,7 @@ class LuceneMailboxMessageSearchIndexTest {
         cal.setTime(new Date());
         SearchQuery query = 
SearchQuery.of(SearchQuery.internalDateBefore(cal.getTime(), 
DateResolution.Day));
         
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3);
     }
     
@@ -400,7 +400,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = 
SearchQuery.of(SearchQuery.internalDateAfter(cal.getTime(), 
DateResolution.Day));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4);
     }
     
@@ -411,7 +411,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = 
SearchQuery.of(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
@@ -420,7 +420,7 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.uid(new 
SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)}));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
@@ -429,35 +429,35 @@ class LuceneMailboxMessageSearchIndexTest {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());
         SearchQuery query = SearchQuery.of(SearchQuery.uid(new 
SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new 
SearchQuery.UidRange(uid3,uid4)}));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void sizeEqualsShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeEquals(200));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1);
     }
     
     @Test
     void sizeLessThanShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeLessThan(200));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
     
     @Test
     void sizeGreaterThanShouldMatch() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.sizeGreaterThan(6));
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
     @Test
     void uidShouldBeSorted() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.all());
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -465,7 +465,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void uidReverseSortShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Uid, 
Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4, uid3, uid1);
     }
     
@@ -473,7 +473,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSentDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.SentDate, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -481,7 +481,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSentDateShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.SentDate, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
 
@@ -489,7 +489,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSubjectShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.BaseSubject, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -497,7 +497,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSubjectShouldReturnWellOrderedResults() throws Exception 
{
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.BaseSubject, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -505,7 +505,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxFrom, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -513,7 +513,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxFromShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxFrom, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -521,7 +521,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxCCShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxCc, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -529,7 +529,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxCCShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxCc, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -537,7 +537,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnMailboxToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxTo, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -545,7 +545,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnMailboxToShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.MailboxTo, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -553,7 +553,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnDisplayToShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.DisplayTo, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -561,7 +561,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnDisplayToShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.DisplayTo, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -569,7 +569,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnDisplayFromShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.DisplayFrom, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -577,7 +577,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnDisplayFromShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.DisplayFrom, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid4, uid3);
     }
     
@@ -585,7 +585,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnArrivalDateShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.Arrival, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid1, uid4);
     }
     
@@ -593,7 +593,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnArrivalDateShouldReturnWellOrderedResults() throws 
Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.Arrival, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid4, uid1, uid3);
     }
     
@@ -601,7 +601,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void sortOnSizeShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.Size, Order.NATURAL));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4, uid1);
     }
     
@@ -609,7 +609,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void reverseSortOnSizeShouldReturnWellOrderedResults() throws Exception {
         SearchQuery query = SearchQuery.allSortedWith(new 
Sort(SortClause.Size, Order.REVERSE));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid1, uid3, uid4);
     }
     
@@ -617,7 +617,7 @@ class LuceneMailboxMessageSearchIndexTest {
     void notOperatorShouldReverseMatching() throws Exception {
         SearchQuery query = SearchQuery.of(SearchQuery.not(SearchQuery.uid(new 
SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)})));
 
-        Stream<MessageUid> result = index.search(session, mailbox, query);
+        Stream<MessageUid> result = index.search(session, mailbox, 
query).toStream();
         assertThat(result).containsExactly(uid3, uid4);
     }
 
@@ -634,7 +634,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), 
Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .containsExactly(uid2);
     }
 
@@ -651,7 +651,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), 
Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .isEmpty();
     }
 
@@ -669,7 +669,7 @@ class LuceneMailboxMessageSearchIndexTest {
         index.update(session, mailbox.getMailboxId(), 
Lists.newArrayList(updatedFlags)).block();
 
         SearchQuery query = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
-        assertThat(index.search(session, mailbox, query))
+        assertThat(index.search(session, mailbox, query).toStream())
             .containsExactly(uid2);
     }
 
diff --git 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
index 0a94c32..7251a98 100644
--- 
a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
+++ 
b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java
@@ -197,7 +197,7 @@ class DeletedMessageVaultHookTest {
         long messageSize = messageSize(bobMessageManager, composedMessageId);
 
         DeletedMessage deletedMessage = 
buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, 
messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, 
Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -218,7 +218,7 @@ class DeletedMessageVaultHookTest {
         MessageManager bobMessageManager = 
mailboxManager.getMailbox(aliceMailbox, bobSession);
         appendMessage(aliceMessageManager);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, 
Query.ALL)).collectList().block())
             .isEmpty();
@@ -244,7 +244,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = 
buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -268,7 +268,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, 
ImmutableList.of(bobMailbox), bobSession);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, 
Query.ALL)).collectList().block())
             .isEmpty();
@@ -294,7 +294,7 @@ class DeletedMessageVaultHookTest {
 
         long messageSize = messageSize(bobMessageManager, composedMessageId);
         DeletedMessage deletedMessage = 
buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize);
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst())
             .isEqualTo(deletedMessage);
@@ -318,7 +318,7 @@ class DeletedMessageVaultHookTest {
 
         messageIdManager.setInMailboxes(messageId, 
ImmutableList.of(aliceMailbox, bobMailbox), bobSession);
 
-        bobMessageManager.delete(bobMessageManager.search(searchQuery, 
bobSession).collect(Guavate.toImmutableList()), bobSession);
+        
bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, 
bobSession)).collect(Guavate.toImmutableList()).block(), bobSession);
 
         assertThat(Flux.from(messageVault.search(ALICE, 
Query.ALL)).collectList().block())
             .isEmpty();
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index fb4f79d..0103598 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -707,7 +706,7 @@ public class StoreMessageManager implements MessageManager {
     }
 
     @Override
-    public Stream<MessageUid> search(SearchQuery query, MailboxSession 
mailboxSession) throws MailboxException {
+    public Flux<MessageUid> search(SearchQuery query, MailboxSession 
mailboxSession) throws MailboxException {
         if (query.equals(LIST_ALL_QUERY)) {
             return listAllMessageUids(mailboxSession);
         }
@@ -859,11 +858,11 @@ public class StoreMessageManager implements 
MessageManager {
             .getApplicableFlag(mailbox);
     }
 
-    private Stream<MessageUid> listAllMessageUids(MailboxSession session) 
throws MailboxException {
+    private Flux<MessageUid> listAllMessageUids(MailboxSession session) throws 
MailboxException {
         final MessageMapper messageMapper = 
mapperFactory.getMessageMapper(session);
 
         return messageMapper.execute(
-            () -> messageMapper.listAllMessageUids(mailbox).toStream());
+            () -> messageMapper.listAllMessageUids(mailbox));
     }
 
     @Override
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index 7e44fea..65c588b 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -23,7 +23,6 @@ import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -111,7 +110,7 @@ public class LazyMessageSearchIndex extends 
ListeningMessageSearchIndex {
      * 
      */
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         MailboxId id = mailbox.getMailboxId();
         
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
index 4873099..30269ad 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java
@@ -22,7 +22,6 @@ package org.apache.james.mailbox.store.search;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Optional;
-import java.util.stream.Stream;
 
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
@@ -46,7 +45,7 @@ public interface MessageSearchIndex {
     /**
      * Return all uids of the previous indexed {@link Mailbox}'s which match 
the {@link SearchQuery}
      */
-    Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException;
+    Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException;
 
     /**
      * Return all uids of all {@link Mailbox}'s the current user has access to 
which match the {@link SearchQuery}
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
index 90c7cfe..63dc12c 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java
@@ -113,12 +113,11 @@ public class SimpleMessageSearchIndex implements 
MessageSearchIndex {
     }
     
     @Override
-    public Stream<MessageUid> search(MailboxSession session, final Mailbox 
mailbox, SearchQuery query) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, final Mailbox 
mailbox, SearchQuery query) throws MailboxException {
         Preconditions.checkArgument(session != null, "'session' is mandatory");
         return searchResults(session, Flux.just(mailbox), query)
             .filter(searchResult -> 
searchResult.getMailboxId().equals(mailbox.getMailboxId()))
-            .map(SearchResult::getMessageUid)
-            .toStream();
+            .map(SearchResult::getMessageUid);
     }
 
     private Set<MailboxMessage> searchResults(MailboxSession session, Mailbox 
mailbox, SearchQuery query) throws MailboxException {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
index f0a8f81..bdddff1 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java
@@ -114,7 +114,7 @@ public abstract class AbstractCombinationManagerTest {
 
         messageIdManager.setInMailboxes(messageId, 
ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session);
 
-        assertThat(messageManager2.search(query, session)).hasSize(1);
+        assertThat(Flux.from(messageManager2.search(query, 
session)).toStream()).hasSize(1);
     }
 
     @Test
@@ -131,7 +131,7 @@ public abstract class AbstractCombinationManagerTest {
             .get(0)
             .getUid();
 
-        assertThat(messageManager2.search(query, session)).hasSize(1)
+        assertThat(Flux.from(messageManager2.search(query, 
session)).toStream()).hasSize(1)
             .containsExactly(uidInMailbox2);
     }
 
@@ -144,7 +144,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.setInMailboxes(composedMessageId.getMessageId(),
             ImmutableList.of(mailbox1.getMailboxId(), 
mailbox2.getMailboxId()), session);
 
-        assertThat(messageManager1.search(query, session)).hasSize(1)
+        assertThat(Flux.from(messageManager1.search(query, 
session)).toStream()).hasSize(1)
             .containsExactly(composedMessageId.getUid());
     }
 
@@ -508,7 +508,7 @@ public abstract class AbstractCombinationManagerTest {
             .getUid();
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager2.search(searchQuery, session))
+        assertThat(Flux.from(messageManager2.search(searchQuery, 
session)).toStream())
             .hasSize(1)
             .containsOnly(uid2);
     }
@@ -522,7 +522,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.delete(messageId, 
ImmutableList.of(mailbox1.getMailboxId()), session);
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager1.search(searchQuery, session)).isEmpty();
+        assertThat(Flux.from(messageManager1.search(searchQuery, 
session)).toStream()).isEmpty();
     }
 
     @Test
@@ -537,7 +537,7 @@ public abstract class AbstractCombinationManagerTest {
         messageIdManager.delete(ImmutableList.of(messageId1, messageId2), 
session);
 
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
-        assertThat(messageManager1.search(searchQuery, session)).isEmpty();
+        assertThat(Flux.from(messageManager1.search(searchQuery, 
session)).toStream()).isEmpty();
     }
 
     private Predicate<MessageResult> messageInMailbox2() {
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
index ddbb186..5e3d2e9 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java
@@ -67,6 +67,8 @@ import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
+
 public abstract class AbstractMessageSearchIndexTest {
 
     private static final long LIMIT = 100L;
@@ -377,7 +379,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void emptySearchQueryShouldReturnAllUids() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.matchAll();
         
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -385,7 +387,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void allShouldReturnAllUids() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -394,7 +396,7 @@ public abstract class AbstractMessageSearchIndexTest {
         /* Only mail4.eml contains word MAILET-94 */
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.bodyContains("MAILET-94"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -404,7 +406,7 @@ public abstract class AbstractMessageSearchIndexTest {
            mail.eml contains created and thus matches the query with a low 
score */
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.bodyContains("created summary"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid(), m8.getUid());
     }
 
@@ -412,7 +414,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
hasAttachmentShouldOnlyReturnMessageThatHasAttachmentWhichAreNotInline() throws 
MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.hasAttachment());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             .containsOnly(mailWithAttachment.getUid());
     }
 
@@ -427,7 +429,7 @@ public abstract class AbstractMessageSearchIndexTest {
         
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.all());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             .contains(mailWithDotsInHeader.getUid());
     }
 
@@ -442,7 +444,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.headerExists("X-header.with.dots"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             .contains(mailWithDotsInHeader.getUid());
     }
 
@@ -468,7 +470,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.attachmentContains(emailToSearch),
             SearchQuery.bodyContains(emailToSearch))));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m11.getUid());
     }
 
@@ -476,7 +478,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
hasNoAttachmenShouldOnlyReturnMessageThatHasNoAttachmentWhichAreNotInline() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.hasNoAttachment());
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             .containsOnly(mOther.getUid(), mailWithInlinedAttachment.getUid());
     }
 
@@ -484,7 +486,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsSetShouldReturnUidOfMessageMarkedAsDeletedWhenUsedWithFlagDeleted() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DELETED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -492,7 +494,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsSetShouldReturnUidOfMessageMarkedAsAnsweredWhenUsedWithFlagAnswered() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid());
     }
 
@@ -500,7 +502,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidOfMessageMarkedAsDraftWhenUsedWithFlagDraft() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -509,7 +511,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 7 is not marked as RECENT
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.RECENT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -517,7 +519,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsSetShouldReturnUidOfMessageMarkedAsFlaggedWhenUsedWithFlagFlagged() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.FLAGGED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -526,7 +528,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is marked as read.
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m6.getUid());
     }
 
@@ -542,7 +544,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .contains(m5.getUid());
     }
     
@@ -635,7 +637,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsSetShouldReturnUidsOfMessageContainingAGivenUserFlag() throws 
MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsSet("Hello"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -643,7 +645,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void userFlagsShouldBeMatchedExactly() throws MailboxException {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet("Hello 
bonjour"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .isEmpty();
     }
 
@@ -651,7 +653,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDeletedWhenUsedWithFlagDeleted() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DELETED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), 
m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -659,7 +661,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsUnSetShouldReturnUidOfMessageNotMarkedAsAnsweredWhenUsedWithFlagAnswered()
 throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.ANSWERED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), 
m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -667,7 +669,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDraftWhenUsedWithFlagDraft() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DRAFT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m4.getUid(), m5.getUid(), 
m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -676,7 +678,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 7 is not marked as RECENT
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.RECENT));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m7.getUid());
     }
 
@@ -684,7 +686,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
flagIsUnSetShouldReturnUidOfMessageNotMarkedAsFlaggedWhenUsedWithFlagFlagged() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.FLAGGED));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -693,7 +695,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is marked as read.
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.SEEN));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -701,7 +703,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void flagIsUnSetShouldReturnUidsOfMessageNotContainingAGivenUserFlag() 
throws MailboxException {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.flagIsUnSet("Hello"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m7.getUid(),  m9.getUid());
     }
 
@@ -712,7 +714,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/07/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -723,7 +725,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/02/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid());
     }
 
@@ -734,7 +736,7 @@ public abstract class AbstractMessageSearchIndexTest {
             DateResolution.Day));
         // Date : 2014/03/02 00:00:00.000 ( Paris time zone )
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -746,7 +748,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m3.getUid(), m2.getUid());
     }
 
@@ -758,7 +760,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -770,7 +772,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m4.getUid(), m9.getUid());
     }
 
@@ -778,7 +780,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void modSeqEqualsShouldReturnUidsOfMessageHavingAGivenModSeq() 
throws Exception {
         SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqEquals(2L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid());
     }
 
@@ -786,7 +788,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void 
modSeqGreaterThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws 
Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.modSeqGreaterThan(7L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -794,7 +796,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void 
modSeqLessThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.modSeqLessThan(3L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid());
     }
 
@@ -803,7 +805,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 6 is over 6.8 KB
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.sizeGreaterThan(6800L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m6.getUid());
     }
 
@@ -812,7 +814,7 @@ public abstract class AbstractMessageSearchIndexTest {
         // Only message 2 3 4 5 7 9 are under 5 KB
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.sizeLessThan(5000L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), 
m7.getUid(), m9.getUid());
     }
 
@@ -820,7 +822,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void 
headerContainsShouldReturnUidsOfMessageHavingThisHeaderWithTheSpecifiedValue() 
throws Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.headerContains("Precedence", "list"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -828,7 +830,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void headerExistsShouldReturnUidsOfMessageHavingThisHeader() throws 
Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.headerExists("Precedence"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -836,7 +838,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void 
addressShouldReturnUidHavingRightExpeditorWhenFromIsSpecified() throws 
Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.From, "[email protected]"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -848,7 +850,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.From, "murari"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -860,7 +862,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.From, "gmail.com"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -872,7 +874,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.To, "Üsteliğhan"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid());
     }
 
@@ -880,7 +882,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void addressShouldReturnUidHavingRightRecipientWhenToIsSpecified() throws 
Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.To, "[email protected]"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -892,7 +894,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.To, "root"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -904,7 +906,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.To, "listes.minet.net"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid());
     }
 
@@ -915,7 +917,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Cc, "[email protected]"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -926,7 +928,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Cc, "monkey"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -937,7 +939,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Cc, "any.com"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -949,7 +951,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Bcc, "monkey"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -960,7 +962,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .contains(MailboxManager.SearchCapabilities.PartialEmailMatch));
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Bcc, "any.com"));
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -972,7 +974,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.address(AddressType.Bcc, "[email protected]"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m9.getUid());
     }
 
@@ -981,7 +983,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery.UidRange[] numericRanges = {new 
SearchQuery.UidRange(m2.getUid(), m4.getUid()), new 
SearchQuery.UidRange(m6.getUid(), m7.getUid())};
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.uid(numericRanges));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m6.getUid(), 
m7.getUid());
     }
 
@@ -990,7 +992,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery.UidRange[] numericRanges = {};
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.uid(numericRanges));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -998,7 +1000,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void youShouldBeAbleToSpecifySeveralCriterionOnASingleQuery() 
throws Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.headerExists("Precedence"), 
SearchQuery.modSeqGreaterThan(6L));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -1008,7 +1010,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.headerExists("Precedence"),
             SearchQuery.modSeqGreaterThan(6L)));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m8.getUid(), m9.getUid());
     }
 
@@ -1019,7 +1021,7 @@ public abstract class AbstractMessageSearchIndexTest {
             SearchQuery.uid(numericRanges),
             SearchQuery.modSeqGreaterThan(6L)));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m7.getUid(), 
m8.getUid(), m9.getUid());
     }
 
@@ -1028,7 +1030,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery searchQuery = SearchQuery.of(
             SearchQuery.not(SearchQuery.headerExists("Precedence")));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m7.getUid());
     }
 
@@ -1036,7 +1038,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void sortShouldOrderMessages() throws Exception {
         SearchQuery searchQuery = SearchQuery.allSortedWith(new 
Sort(SortClause.Arrival));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m1.getUid(), m2.getUid(), m3.getUid(), 
m5.getUid(), m4.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid());
     }
 
@@ -1044,7 +1046,7 @@ public abstract class AbstractMessageSearchIndexTest {
     void revertSortingShouldReturnElementsInAReversedOrder() throws Exception {
         SearchQuery searchQuery = SearchQuery.allSortedWith(new 
Sort(SortClause.Arrival, Order.REVERSE));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m9.getUid(), m8.getUid(), m7.getUid(), 
m6.getUid(), m4.getUid(), m5.getUid(), m3.getUid(), m2.getUid(), m1.getUid());
     }
 
@@ -1056,7 +1058,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m3.getUid(), m2.getUid());
     }
 
@@ -1068,7 +1070,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m5.getUid());
     }
 
@@ -1080,7 +1082,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Arrival, Order.REVERSE))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m4.getUid(), m9.getUid());
     }
 
@@ -1088,7 +1090,7 @@ public abstract class AbstractMessageSearchIndexTest {
     protected void 
mailsContainsShouldIncludeMailHavingAttachmentsMatchingTheRequest() throws 
Exception {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.mailContains("root mailing list"));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m1.getUid(), m6.getUid());
     }
 
@@ -1100,7 +1102,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxCc))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m3.getUid(), m5.getUid(), m4.getUid(), 
m2.getUid());
         // 2 : No cc
         // 3 : Cc : [email protected]
@@ -1116,7 +1118,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxFrom))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), 
m5.getUid());
         // m3 : [email protected]
         // m2 : [email protected]
@@ -1132,7 +1134,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.MailboxTo))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m5.getUid(), m3.getUid(), m2.getUid(), 
m4.getUid());
         // 5 : "zzz" <[email protected]>
         // 3 : "aaa" <[email protected]>
@@ -1148,7 +1150,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.BaseSubject))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m4.getUid(), m3.getUid(), m2.getUid(), 
m5.getUid());
         // 2 : [jira] [Created] (MAILBOX-234) Convert Message into JSON
         // 3 : [jira] [Closed] (MAILBOX-217) We should index attachment in 
elastic search
@@ -1164,7 +1166,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Size))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m2.getUid(), m3.getUid(), m5.getUid(), 
m4.getUid());
         // 2 : 3210 o
         // 3 : 3647 o
@@ -1180,7 +1182,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.DisplayFrom))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m4.getUid(), m3.getUid(), m5.getUid(), 
m2.getUid());
         // 2 : Tellier Benoit (JIRA)
         // 3 : efij
@@ -1196,7 +1198,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.DisplayTo))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), 
m5.getUid());
         // 2 : abc
         // 3 : aaa
@@ -1212,7 +1214,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.SentDate))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m5.getUid(), m4.getUid(), m2.getUid(), 
m3.getUid());
         // 2 : 4 Jun 2015 09:23:37
         // 3 : 4 Jun 2015 09:27:37
@@ -1228,7 +1230,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .sorts(new Sort(SortClause.Uid))
             .build();
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsExactly(m2.getUid(), m3.getUid(), m4.getUid(), 
m5.getUid());
     }
 
@@ -1243,7 +1245,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.mailContains("User message banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             
.containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid());
     }
 
@@ -1258,7 +1260,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.attachmentContains("beautiful banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             
.containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid());
     }
 
@@ -1282,7 +1284,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.attachmentContains("beautiful banana"));
 
-        assertThat(messageSearchIndex.search(session, mailbox2, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox2, 
searchQuery).toStream())
             
.containsExactly(messageWithBeautifulBananaAsPDFAttachment.getUid());
     }
 
@@ -1334,7 +1336,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.allSortedWith(new 
Sort(SortClause.SentDate));
 
-        assertThat(messageManager.search(searchQuery, session))
+        assertThat(Flux.from(messageManager.search(searchQuery, 
session)).toStream())
             .containsExactly(message2.getUid(),
                 message1.getUid(),
                 message3.getUid());
@@ -1373,7 +1375,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = SearchQuery.allSortedWith(new 
Sort(SortClause.SentDate));
 
-        assertThat(messageManager.search(searchQuery, session))
+        assertThat(Flux.from(messageManager.search(searchQuery, 
session)).toStream())
             .containsExactly(message2.getUid(),
                 message1.getUid(),
                 message3.getUid());
@@ -1384,7 +1386,7 @@ public abstract class AbstractMessageSearchIndexTest {
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.mimeMessageID("<[email protected]>"));
         // Correspond to mail.eml
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(m3.getUid());
     }
 
@@ -1406,7 +1408,7 @@ public abstract class AbstractMessageSearchIndexTest {
             .await()
             .atMost(30, TimeUnit.SECONDS)
             .until(
-                () -> messageSearchIndex.search(session, 
newBox.getMailboxEntity(), searchQuery).count() == 9);
+                () -> messageSearchIndex.search(session, 
newBox.getMailboxEntity(), searchQuery).toStream().count() == 9);
     }
 
     @Test
@@ -1433,7 +1435,7 @@ public abstract class AbstractMessageSearchIndexTest {
 
         SearchQuery searchQuery = 
SearchQuery.of(SearchQuery.attachmentFileName(fileName));
 
-        assertThat(messageSearchIndex.search(session, mailbox, searchQuery))
+        assertThat(messageSearchIndex.search(session, mailbox, 
searchQuery).toStream())
             .containsOnly(mWithFileName.getUid());
     }
 }
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
index 4f23ebc..df5bfde 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java
@@ -571,7 +571,7 @@ public abstract class AbstractMailboxProcessor<R extends 
ImapRequest> extends Ab
             }
             searchQuery.andCriteria(SearchQuery.uid(nRanges));
             
searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince));
-            try (Stream<MessageUid> uids = mailbox.search(searchQuery.build(), 
session)) {
+            try (Stream<MessageUid> uids = 
Flux.from(mailbox.search(searchQuery.build(), session)).toStream()) {
                 uids.forEach(vanishedUids::remove);
             }
             UidRange[] vanishedIdRanges = 
uidRanges(MessageRange.toRanges(vanishedUids));
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
index e86e850..d93742f 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java
@@ -225,9 +225,9 @@ public class SearchProcessor extends 
AbstractMailboxProcessor<SearchRequest> imp
     }
 
     private Collection<MessageUid> performUidSearch(MessageManager mailbox, 
SearchQuery query, MailboxSession msession) throws MailboxException {
-        try (Stream<MessageUid> stream = mailbox.search(query, msession)) {
-            return stream.collect(Guavate.toImmutableList());
-        }
+        return Flux.from(mailbox.search(query, msession))
+            .collect(Guavate.toImmutableList())
+            .block();
     }
 
     private long[] toArray(Collection<Long> results) {
diff --git 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
index 1c8d8db..087e4ad 100644
--- 
a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
+++ 
b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -58,7 +57,9 @@ import org.apache.james.mailbox.model.UpdatedFlags;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
@@ -151,9 +152,10 @@ public class SelectedMailboxImpl implements 
SelectedMailbox, MailboxListener {
         synchronized (applicableFlagsLock) {
             applicableFlags = 
applicableFlags.updateWithNewFlags(messageManager.getApplicableFlags(mailboxSession));
         }
-        try (Stream<MessageUid> stream = 
messageManager.search(SearchQuery.of(SearchQuery.all()), mailboxSession)) {
-            uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList()));
-        }
+        ImmutableList<MessageUid> uids = 
Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), 
mailboxSession))
+            .collect(Guavate.toImmutableList())
+            .block();
+        uidMsnConverter.addAll(uids);
     }
 
     @Override
diff --git 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
index 1d48407..97bdcbc 100644
--- 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
+++ 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java
@@ -34,7 +34,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.TimeZone;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 import javax.mail.Flags.Flag;
@@ -71,6 +70,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class SearchProcessorTest {
     private static final int DAY = 6;
 
@@ -463,7 +464,7 @@ public class SearchProcessorTest {
 
     private void check(SearchKey key, final SearchQuery query) throws 
Exception {
         session.setMailboxSession(mailboxSession);
-        when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty());
+        when(mailbox.search(query, mailboxSession)).thenReturn(Flux.empty());
         when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags());
         when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false);
 
diff --git 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
index f15d929..57b95b3 100644
--- 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
+++ 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Date;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -60,6 +59,8 @@ import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.junit.Before;
 import org.junit.Test;
 
+import reactor.core.publisher.Flux;
+
 public class MailboxEventAnalyserTest {
     private static final MessageUid UID = MessageUid.of(900);
     private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = 
UpdatedFlags.builder()
@@ -154,7 +155,7 @@ public class MailboxEventAnalyserTest {
         when(messageManager.getApplicableFlags(any())).thenReturn(new Flags());
         when(messageManager.getId()).thenReturn(MAILBOX_ID);
         when(messageManager.search(any(), any()))
-            .thenReturn(Stream.of(MESSAGE_UID));
+            .thenReturn(Flux.just(MESSAGE_UID));
         when(messageManager.getMessages(any(), any(), any()))
             .thenReturn(new SingleMessageResultIterator(messageResult));
 
diff --git 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
index 02fd802..02bd5a4 100644
--- 
a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
+++ 
b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java
@@ -30,13 +30,13 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.Date;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 
@@ -115,7 +116,8 @@ class SelectedMailboxImplTest {
         when(messageManager.getApplicableFlags(any(MailboxSession.class)))
             .thenReturn(new Flags());
         when(messageManager.search(any(SearchQuery.class), 
any(MailboxSession.class)))
-            .then(delayedSearchAnswer());
+            .thenReturn(Flux.just(MessageUid.of(1), MessageUid.of(3))
+                .delayElements(Duration.ofSeconds(1)));
         when(messageManager.getId()).thenReturn(mailboxId);
 
         imapSession.setMailboxSession(mock(MailboxSession.class));
@@ -186,13 +188,6 @@ class SelectedMailboxImplTest {
             .isEqualTo(1);
     }
 
-    Answer<Stream<MessageUid>> delayedSearchAnswer() {
-        return invocation -> {
-            Thread.sleep(1000);
-            return Stream.of(MessageUid.of(1), MessageUid.of(3));
-        };
-    }
-
     Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) {
         return generateEmitEventAnswer(event(), success);
     }
diff --git 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
index a35b5d9..1377ac7 100644
--- 
a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
+++ 
b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java
@@ -22,7 +22,6 @@ package org.apache.james;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -102,7 +101,7 @@ public class SearchModuleChooser {
         }
 
         @Override
-        public Stream<MessageUid> search(MailboxSession session, Mailbox 
mailbox, SearchQuery searchQuery) throws MailboxException {
+        public Flux<MessageUid> search(MailboxSession session, Mailbox 
mailbox, SearchQuery searchQuery) throws MailboxException {
             throw new NotImplementedException("not implemented");
         }
 
diff --git 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index 7dbe612..9b3fb21 100644
--- 
a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ 
b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -22,7 +22,6 @@ package org.apache.james;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.stream.Stream;
 
 import javax.mail.Flags;
 
@@ -80,7 +79,7 @@ public class FakeMessageSearchIndex extends 
ListeningMessageSearchIndex {
     }
 
     @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
+    public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, 
SearchQuery searchQuery) throws MailboxException {
         throw new NotImplementedException("not implemented");
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to