This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 03729af5c0fd33433a958b3cbbaf31737a96a6f9 Author: Benoit Tellier <[email protected]> AuthorDate: Mon Nov 2 08:21:34 2020 +0100 JAMES-2037 Use Flux for MessageManager::search --- .../org/apache/james/mailbox/MessageManager.java | 3 +- .../ElasticSearchListeningMessageSearchIndex.java | 6 +- .../ElasticSearchIntegrationTest.java | 24 +-- ...asticSearchListeningMessageSearchIndexTest.java | 26 ++-- .../lucene/search/LuceneMessageSearchIndex.java | 5 +- .../LuceneMailboxMessageSearchIndexTest.java | 106 +++++++------- .../james/vault/DeletedMessageVaultHookTest.java | 12 +- .../james/mailbox/store/StoreMessageManager.java | 7 +- .../store/search/LazyMessageSearchIndex.java | 3 +- .../mailbox/store/search/MessageSearchIndex.java | 3 +- .../store/search/SimpleMessageSearchIndex.java | 5 +- .../store/AbstractCombinationManagerTest.java | 12 +- .../search/AbstractMessageSearchIndexTest.java | 162 +++++++++++---------- .../imap/processor/AbstractMailboxProcessor.java | 2 +- .../james/imap/processor/SearchProcessor.java | 6 +- .../imap/processor/base/SelectedMailboxImpl.java | 10 +- .../james/imap/processor/SearchProcessorTest.java | 5 +- .../processor/base/MailboxEventAnalyserTest.java | 5 +- .../processor/base/SelectedMailboxImplTest.java | 13 +- .../java/org/apache/james/SearchModuleChooser.java | 3 +- .../org/apache/james/FakeMessageSearchIndex.java | 3 +- 21 files changed, 207 insertions(+), 214 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java index 6a8ab49..779d84e 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MessageManager.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.stream.Stream; import javax.mail.Flags; @@ -113,7 +112,7 @@ public interface MessageManager { * @throws MailboxException * when search fails for other reasons */ - Stream<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException; + Publisher<MessageUid> search(SearchQuery searchQuery, MailboxSession mailboxSession) throws MailboxException; /** * Expunges messages in the given range from this mailbox by first retrieving the messages to be deleted diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index 2d0df24..b6245a8 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -34,7 +34,6 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Stream; import javax.inject.Inject; import javax.inject.Named; @@ -121,14 +120,13 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe } @Override - public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) { + public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) { Preconditions.checkArgument(session != null, "'session' is mandatory"); Optional<Integer> noLimit = Optional.empty(); return searcher .search(ImmutableList.of(mailbox.getMailboxId()), searchQuery, noLimit) - .map(SearchResult::getMessageUid) - .toStream(); + .map(SearchResult::getMessageUid); } @Override diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java index 46eb518..5ae6e4c 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/ElasticSearchIntegrationTest.java @@ -57,6 +57,8 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.google.common.base.Strings; +import reactor.core.publisher.Flux; + class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { static final int BATCH_SIZE = 1; @@ -137,7 +139,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)).toStream()) .containsExactly(composedMessageId.getUid()); } @@ -156,7 +158,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, recipient)), session)).toStream()) .containsExactly(composedMessageId.getUid()); } @@ -175,7 +177,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("0123456789")), session)).toStream()) .containsExactly(composedMessageId.getUid()); } @@ -194,7 +196,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains("matchMe")), session)).toStream()) .containsExactly(composedMessageId.getUid()); } @@ -214,7 +216,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.bodyContains(reasonableLongTerm)), session)).toStream()) .containsExactly(composedMessageId.getUid()); } @@ -236,7 +238,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.headerExists("Custom-header")), session)).toStream()) .containsExactly(customDateHeaderMessageId.getUid(), customStringHeaderMessageId.getUid()); } @@ -258,7 +260,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.all()), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), session)).toStream()) .contains(customStringHeaderMessageId.getUid()); } @@ -290,7 +292,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "[email protected]")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "[email protected]")), session)).toStream()) .containsOnly(messageId2.getUid()); } @@ -322,7 +324,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "alice-test")), session)).toStream()) .containsOnly(messageId2.getUid()); } @@ -354,7 +356,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "[email protected]")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "[email protected]")), session)).toStream()) .containsOnly(messageId1.getUid()); } @@ -386,7 +388,7 @@ class ElasticSearchIntegrationTest extends AbstractMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); - assertThat(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "domain-test.tld")), session)) + assertThat(Flux.from(messageManager.search(SearchQuery.of(SearchQuery.address(SearchQuery.AddressType.To, "domain-test.tld")), session)).toStream()) .containsOnly(messageId1.getUid()); } } \ No newline at end of file diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java index 1dc6d7e..1c588af 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java @@ -202,7 +202,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_1.getUid()); } @@ -213,7 +213,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid()); } @@ -225,7 +225,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_1.getUid()); } @@ -237,7 +237,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_1.getUid(), MESSAGE_2.getUid()); } @@ -255,7 +255,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_WITH_ATTACHMENT.getUid()); } @@ -279,7 +279,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .isEmpty(); } @@ -294,7 +294,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_2.getUid()); } @@ -309,7 +309,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .isEmpty(); } @@ -323,7 +323,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .isEmpty(); } @@ -361,7 +361,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED)); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_1.getUid()); } @@ -382,7 +382,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED)); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .isEmpty(); } @@ -404,7 +404,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED)); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .containsExactly(MESSAGE_1.getUid()); } @@ -438,7 +438,7 @@ class ElasticSearchListeningMessageSearchIndexTest { elasticSearch.awaitForElasticSearch(); SearchQuery query = SearchQuery.of(SearchQuery.all()); - assertThat(testee.search(session, mailbox, query)) + assertThat(testee.search(session, mailbox, query).toStream()) .isEmpty(); } diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java index c9ddec0..0304099 100644 --- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java +++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java @@ -456,11 +456,10 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex { @Override - public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); - return searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery) - .stream() + return Flux.fromIterable(searchMultimap(ImmutableList.of(mailbox.getMailboxId()), searchQuery)) .map(SearchResult::getMessageUid); } diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java index 3f913f0..32813af 100644 --- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java +++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java @@ -195,105 +195,105 @@ class LuceneMailboxMessageSearchIndexTest { @Test void bodySearchShouldMatchPhraseInBody() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void bodySearchShouldNotMatchAbsentPhraseInBody() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(CUSTARD + CUSTARD)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).isEmpty(); } @Test void bodySearchShouldBeCaseInsensitive() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(RHUBARD)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void bodySearchNotMatchPhraseOnlyInFrom() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(FROM_ADDRESS)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).isEmpty(); } @Test void bodySearchShouldNotMatchPhraseOnlyInSubject() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains(SUBJECT_PART)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).isEmpty(); } @Test void textSearchShouldMatchPhraseInBody() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void textSearchShouldNotAbsentMatchPhraseInBody() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains(CUSTARD + CUSTARD)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).isEmpty(); } @Test void textSearchMatchShouldBeCaseInsensitive() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains(RHUBARD.toLowerCase(Locale.US))); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void addressSearchShouldMatchToFullAddress() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,FROM_ADDRESS)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void addressSearchShouldMatchToDisplayName() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,"Harry")); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void addressSearchShouldMatchToEmail() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.To,"[email protected]")); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void addressSearchShouldMatchFrom() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.address(AddressType.From,"[email protected]")); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void textSearchShouldMatchPhraseOnlyInToHeader() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains(FROM_ADDRESS)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void textSearchShouldMatchPhraseOnlyInSubjectHeader() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains(SUBJECT_PART)); - Stream<MessageUid> result = index.search(session, mailbox3, query); + Stream<MessageUid> result = index.search(session, mailbox3, query).toStream(); assertThat(result).containsExactly(uid5); } @Test void searchAllShouldMatchAllMailboxEmails() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.all()); - Stream<MessageUid> result = index.search(session, mailbox2, query); + Stream<MessageUid> result = index.search(session, mailbox2, query).toStream(); assertThat(result).containsExactly(uid2); } @@ -345,42 +345,42 @@ class LuceneMailboxMessageSearchIndexTest { @Test void flagSearchShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flag.DELETED)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4); } @Test void bodySearchShouldMatchSeveralEmails() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.bodyContains("body")); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @Test void textSearchShouldMatchSeveralEmails() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.mailContains("body")); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @Test void headerSearchShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.headerContains("Subject", "test")); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid4); } @Test void headerExistsShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.headerExists("Subject")); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid4); } @Test void flagUnsetShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.flagIsUnSet(Flag.DRAFT)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -390,7 +390,7 @@ class LuceneMailboxMessageSearchIndexTest { cal.setTime(new Date()); SearchQuery query = SearchQuery.of(SearchQuery.internalDateBefore(cal.getTime(), DateResolution.Day)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3); } @@ -400,7 +400,7 @@ class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); SearchQuery query = SearchQuery.of(SearchQuery.internalDateAfter(cal.getTime(), DateResolution.Day)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4); } @@ -411,7 +411,7 @@ class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); SearchQuery query = SearchQuery.of(SearchQuery.internalDateOn(cal.getTime(), DateResolution.Day)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1); } @@ -420,7 +420,7 @@ class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); SearchQuery query = SearchQuery.of(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1)})); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1); } @@ -429,35 +429,35 @@ class LuceneMailboxMessageSearchIndexTest { Calendar cal = Calendar.getInstance(); cal.setTime(new Date()); SearchQuery query = SearchQuery.of(SearchQuery.uid(new SearchQuery.UidRange[] {new SearchQuery.UidRange(uid1), new SearchQuery.UidRange(uid3,uid4)})); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @Test void sizeEqualsShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.sizeEquals(200)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1); } @Test void sizeLessThanShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.sizeLessThan(200)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4); } @Test void sizeGreaterThanShouldMatch() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.sizeGreaterThan(6)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @Test void uidShouldBeSorted() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.all()); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -465,7 +465,7 @@ class LuceneMailboxMessageSearchIndexTest { void uidReverseSortShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Uid, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4, uid3, uid1); } @@ -473,7 +473,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnSentDateShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.SentDate, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -481,7 +481,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnSentDateShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.SentDate, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -489,7 +489,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnSubjectShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.BaseSubject, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -497,7 +497,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnSubjectShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.BaseSubject, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -505,7 +505,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnMailboxFromShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxFrom, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -513,7 +513,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnMailboxFromShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxFrom, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -521,7 +521,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnMailboxCCShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxCc, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -529,7 +529,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnMailboxCCShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxCc, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -537,7 +537,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnMailboxToShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxTo, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -545,7 +545,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnMailboxToShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.MailboxTo, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -553,7 +553,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnDisplayToShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayTo, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -561,7 +561,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnDisplayToShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayTo, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -569,7 +569,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnDisplayFromShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayFrom, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -577,7 +577,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnDisplayFromShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.DisplayFrom, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid4, uid3); } @@ -585,7 +585,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnArrivalDateShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid1, uid4); } @@ -593,7 +593,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnArrivalDateShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid4, uid1, uid3); } @@ -601,7 +601,7 @@ class LuceneMailboxMessageSearchIndexTest { void sortOnSizeShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Size, Order.NATURAL)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4, uid1); } @@ -609,7 +609,7 @@ class LuceneMailboxMessageSearchIndexTest { void reverseSortOnSizeShouldReturnWellOrderedResults() throws Exception { SearchQuery query = SearchQuery.allSortedWith(new Sort(SortClause.Size, Order.REVERSE)); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid1, uid3, uid4); } @@ -617,7 +617,7 @@ class LuceneMailboxMessageSearchIndexTest { void notOperatorShouldReverseMatching() throws Exception { SearchQuery query = SearchQuery.of(SearchQuery.not(SearchQuery.uid(new SearchQuery.UidRange[] { new SearchQuery.UidRange(uid1)}))); - Stream<MessageUid> result = index.search(session, mailbox, query); + Stream<MessageUid> result = index.search(session, mailbox, query).toStream(); assertThat(result).containsExactly(uid3, uid4); } @@ -634,7 +634,7 @@ class LuceneMailboxMessageSearchIndexTest { index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT)); - assertThat(index.search(session, mailbox, query)) + assertThat(index.search(session, mailbox, query).toStream()) .containsExactly(uid2); } @@ -651,7 +651,7 @@ class LuceneMailboxMessageSearchIndexTest { index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT)); - assertThat(index.search(session, mailbox, query)) + assertThat(index.search(session, mailbox, query).toStream()) .isEmpty(); } @@ -669,7 +669,7 @@ class LuceneMailboxMessageSearchIndexTest { index.update(session, mailbox.getMailboxId(), Lists.newArrayList(updatedFlags)).block(); SearchQuery query = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT)); - assertThat(index.search(session, mailbox, query)) + assertThat(index.search(session, mailbox, query).toStream()) .containsExactly(uid2); } diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java index 0a94c32..7251a98 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/DeletedMessageVaultHookTest.java @@ -197,7 +197,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(aliceMailbox), messageId, ALICE, messageSize); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).blockFirst()) .isEqualTo(deletedMessage); @@ -218,7 +218,7 @@ class DeletedMessageVaultHookTest { MessageManager bobMessageManager = mailboxManager.getMailbox(aliceMailbox, bobSession); appendMessage(aliceMessageManager); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).collectList().block()) .isEmpty(); @@ -244,7 +244,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst()) .isEqualTo(deletedMessage); @@ -268,7 +268,7 @@ class DeletedMessageVaultHookTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(bobMailbox), bobSession); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).collectList().block()) .isEmpty(); @@ -294,7 +294,7 @@ class DeletedMessageVaultHookTest { long messageSize = messageSize(bobMessageManager, composedMessageId); DeletedMessage deletedMessage = buildDeletedMessage(ImmutableList.of(bobMailbox), messageId, BOB, messageSize); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(BOB, Query.ALL)).blockFirst()) .isEqualTo(deletedMessage); @@ -318,7 +318,7 @@ class DeletedMessageVaultHookTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(aliceMailbox, bobMailbox), bobSession); - bobMessageManager.delete(bobMessageManager.search(searchQuery, bobSession).collect(Guavate.toImmutableList()), bobSession); + bobMessageManager.delete(Flux.from(bobMessageManager.search(searchQuery, bobSession)).collect(Guavate.toImmutableList()).block(), bobSession); assertThat(Flux.from(messageVault.search(ALICE, Query.ALL)).collectList().block()) .isEmpty(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index fb4f79d..0103598 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; -import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -707,7 +706,7 @@ public class StoreMessageManager implements MessageManager { } @Override - public Stream<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException { + public Flux<MessageUid> search(SearchQuery query, MailboxSession mailboxSession) throws MailboxException { if (query.equals(LIST_ALL_QUERY)) { return listAllMessageUids(mailboxSession); } @@ -859,11 +858,11 @@ public class StoreMessageManager implements MessageManager { .getApplicableFlag(mailbox); } - private Stream<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException { + private Flux<MessageUid> listAllMessageUids(MailboxSession session) throws MailboxException { final MessageMapper messageMapper = mapperFactory.getMessageMapper(session); return messageMapper.execute( - () -> messageMapper.listAllMessageUids(mailbox).toStream()); + () -> messageMapper.listAllMessageUids(mailbox)); } @Override diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java index 7e44fea..65c588b 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java @@ -23,7 +23,6 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; import javax.mail.Flags; @@ -111,7 +110,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex { * */ @Override - public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); MailboxId id = mailbox.getMailboxId(); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java index 4873099..30269ad 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java @@ -22,7 +22,6 @@ package org.apache.james.mailbox.store.search; import java.util.Collection; import java.util.EnumSet; import java.util.Optional; -import java.util.stream.Stream; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; @@ -46,7 +45,7 @@ public interface MessageSearchIndex { /** * Return all uids of the previous indexed {@link Mailbox}'s which match the {@link SearchQuery} */ - Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException; + Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException; /** * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery} diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java index 90c7cfe..63dc12c 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java @@ -113,12 +113,11 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { } @Override - public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException { + public Flux<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); return searchResults(session, Flux.just(mailbox), query) .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId())) - .map(SearchResult::getMessageUid) - .toStream(); + .map(SearchResult::getMessageUid); } private Set<MailboxMessage> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java index f0a8f81..bdddff1 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java @@ -114,7 +114,7 @@ public abstract class AbstractCombinationManagerTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session); - assertThat(messageManager2.search(query, session)).hasSize(1); + assertThat(Flux.from(messageManager2.search(query, session)).toStream()).hasSize(1); } @Test @@ -131,7 +131,7 @@ public abstract class AbstractCombinationManagerTest { .get(0) .getUid(); - assertThat(messageManager2.search(query, session)).hasSize(1) + assertThat(Flux.from(messageManager2.search(query, session)).toStream()).hasSize(1) .containsExactly(uidInMailbox2); } @@ -144,7 +144,7 @@ public abstract class AbstractCombinationManagerTest { messageIdManager.setInMailboxes(composedMessageId.getMessageId(), ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session); - assertThat(messageManager1.search(query, session)).hasSize(1) + assertThat(Flux.from(messageManager1.search(query, session)).toStream()).hasSize(1) .containsExactly(composedMessageId.getUid()); } @@ -508,7 +508,7 @@ public abstract class AbstractCombinationManagerTest { .getUid(); SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); - assertThat(messageManager2.search(searchQuery, session)) + assertThat(Flux.from(messageManager2.search(searchQuery, session)).toStream()) .hasSize(1) .containsOnly(uid2); } @@ -522,7 +522,7 @@ public abstract class AbstractCombinationManagerTest { messageIdManager.delete(messageId, ImmutableList.of(mailbox1.getMailboxId()), session); SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); - assertThat(messageManager1.search(searchQuery, session)).isEmpty(); + assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty(); } @Test @@ -537,7 +537,7 @@ public abstract class AbstractCombinationManagerTest { messageIdManager.delete(ImmutableList.of(messageId1, messageId2), session); SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); - assertThat(messageManager1.search(searchQuery, session)).isEmpty(); + assertThat(Flux.from(messageManager1.search(searchQuery, session)).toStream()).isEmpty(); } private Predicate<MessageResult> messageInMailbox2() { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java index ddbb186..5e3d2e9 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java @@ -67,6 +67,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + public abstract class AbstractMessageSearchIndexTest { private static final long LIMIT = 100L; @@ -377,7 +379,7 @@ public abstract class AbstractMessageSearchIndexTest { void emptySearchQueryShouldReturnAllUids() throws MailboxException { SearchQuery searchQuery = SearchQuery.matchAll(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -385,7 +387,7 @@ public abstract class AbstractMessageSearchIndexTest { void allShouldReturnAllUids() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -394,7 +396,7 @@ public abstract class AbstractMessageSearchIndexTest { /* Only mail4.eml contains word MAILET-94 */ SearchQuery searchQuery = SearchQuery.of(SearchQuery.bodyContains("MAILET-94")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -404,7 +406,7 @@ public abstract class AbstractMessageSearchIndexTest { mail.eml contains created and thus matches the query with a low score */ SearchQuery searchQuery = SearchQuery.of(SearchQuery.bodyContains("created summary")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid(), m8.getUid()); } @@ -412,7 +414,7 @@ public abstract class AbstractMessageSearchIndexTest { void hasAttachmentShouldOnlyReturnMessageThatHasAttachmentWhichAreNotInline() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.hasAttachment()); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .containsOnly(mailWithAttachment.getUid()); } @@ -427,7 +429,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.all()); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .contains(mailWithDotsInHeader.getUid()); } @@ -442,7 +444,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("X-header.with.dots")); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .contains(mailWithDotsInHeader.getUid()); } @@ -468,7 +470,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery.attachmentContains(emailToSearch), SearchQuery.bodyContains(emailToSearch)))); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m11.getUid()); } @@ -476,7 +478,7 @@ public abstract class AbstractMessageSearchIndexTest { void hasNoAttachmenShouldOnlyReturnMessageThatHasNoAttachmentWhichAreNotInline() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.hasNoAttachment()); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .containsOnly(mOther.getUid(), mailWithInlinedAttachment.getUid()); } @@ -484,7 +486,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsSetShouldReturnUidOfMessageMarkedAsDeletedWhenUsedWithFlagDeleted() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DELETED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid()); } @@ -492,7 +494,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsSetShouldReturnUidOfMessageMarkedAsAnsweredWhenUsedWithFlagAnswered() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.ANSWERED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid()); } @@ -500,7 +502,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsSetShouldReturnUidOfMessageMarkedAsDraftWhenUsedWithFlagDraft() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.DRAFT)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m3.getUid()); } @@ -509,7 +511,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 7 is not marked as RECENT SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.RECENT)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid()); } @@ -517,7 +519,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsSetShouldReturnUidOfMessageMarkedAsFlaggedWhenUsedWithFlagFlagged() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.FLAGGED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -526,7 +528,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 6 is marked as read. SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m6.getUid()); } @@ -542,7 +544,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet(Flags.Flag.SEEN)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .contains(m5.getUid()); } @@ -635,7 +637,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsSetShouldReturnUidsOfMessageContainingAGivenUserFlag() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet("Hello")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid()); } @@ -643,7 +645,7 @@ public abstract class AbstractMessageSearchIndexTest { void userFlagsShouldBeMatchedExactly() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsSet("Hello bonjour")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .isEmpty(); } @@ -651,7 +653,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDeletedWhenUsedWithFlagDeleted() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DELETED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -659,7 +661,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsAnsweredWhenUsedWithFlagAnswered() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.ANSWERED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -667,7 +669,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsDraftWhenUsedWithFlagDraft() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.DRAFT)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -676,7 +678,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 7 is not marked as RECENT SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.RECENT)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m7.getUid()); } @@ -684,7 +686,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsUnSetShouldReturnUidOfMessageNotMarkedAsFlaggedWhenUsedWithFlagFlagged() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.FLAGGED)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -693,7 +695,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 6 is marked as read. SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet(Flags.Flag.SEEN)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -701,7 +703,7 @@ public abstract class AbstractMessageSearchIndexTest { void flagIsUnSetShouldReturnUidsOfMessageNotContainingAGivenUserFlag() throws MailboxException { SearchQuery searchQuery = SearchQuery.of(SearchQuery.flagIsUnSet("Hello")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m9.getUid()); } @@ -712,7 +714,7 @@ public abstract class AbstractMessageSearchIndexTest { DateResolution.Day)); // Date : 2014/07/02 00:00:00.000 ( Paris time zone ) - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m7.getUid(), m8.getUid(), m9.getUid()); } @@ -723,7 +725,7 @@ public abstract class AbstractMessageSearchIndexTest { DateResolution.Day)); // Date : 2014/02/02 00:00:00.000 ( Paris time zone ) - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid()); } @@ -734,7 +736,7 @@ public abstract class AbstractMessageSearchIndexTest { DateResolution.Day)); // Date : 2014/03/02 00:00:00.000 ( Paris time zone ) - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m3.getUid()); } @@ -746,7 +748,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m3.getUid(), m2.getUid()); } @@ -758,7 +760,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -770,7 +772,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m4.getUid(), m9.getUid()); } @@ -778,7 +780,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void modSeqEqualsShouldReturnUidsOfMessageHavingAGivenModSeq() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqEquals(2L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid()); } @@ -786,7 +788,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void modSeqGreaterThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqGreaterThan(7L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid(), m9.getUid()); } @@ -794,7 +796,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void modSeqLessThanShouldReturnUidsOfMessageHavingAGreaterModSeq() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.modSeqLessThan(3L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid()); } @@ -803,7 +805,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 6 is over 6.8 KB SearchQuery searchQuery = SearchQuery.of(SearchQuery.sizeGreaterThan(6800L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m6.getUid()); } @@ -812,7 +814,7 @@ public abstract class AbstractMessageSearchIndexTest { // Only message 2 3 4 5 7 9 are under 5 KB SearchQuery searchQuery = SearchQuery.of(SearchQuery.sizeLessThan(5000L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m7.getUid(), m9.getUid()); } @@ -820,7 +822,7 @@ public abstract class AbstractMessageSearchIndexTest { void headerContainsShouldReturnUidsOfMessageHavingThisHeaderWithTheSpecifiedValue() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerContains("Precedence", "list")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m6.getUid(), m8.getUid(), m9.getUid()); } @@ -828,7 +830,7 @@ public abstract class AbstractMessageSearchIndexTest { void headerExistsShouldReturnUidsOfMessageHavingThisHeader() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("Precedence")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m8.getUid(), m9.getUid()); } @@ -836,7 +838,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void addressShouldReturnUidHavingRightExpeditorWhenFromIsSpecified() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "[email protected]")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid()); } @@ -848,7 +850,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "murari")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid()); } @@ -860,7 +862,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.From, "gmail.com")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid()); } @@ -872,7 +874,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "Üsteliğhan")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid()); } @@ -880,7 +882,7 @@ public abstract class AbstractMessageSearchIndexTest { void addressShouldReturnUidHavingRightRecipientWhenToIsSpecified() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "[email protected]")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid()); } @@ -892,7 +894,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "root")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid()); } @@ -904,7 +906,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.To, "listes.minet.net")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid()); } @@ -915,7 +917,7 @@ public abstract class AbstractMessageSearchIndexTest { .contains(MailboxManager.SearchCapabilities.PartialEmailMatch)); SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "[email protected]")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -926,7 +928,7 @@ public abstract class AbstractMessageSearchIndexTest { .contains(MailboxManager.SearchCapabilities.PartialEmailMatch)); SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "monkey")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -937,7 +939,7 @@ public abstract class AbstractMessageSearchIndexTest { .contains(MailboxManager.SearchCapabilities.PartialEmailMatch)); SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Cc, "any.com")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -949,7 +951,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "monkey")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -960,7 +962,7 @@ public abstract class AbstractMessageSearchIndexTest { .contains(MailboxManager.SearchCapabilities.PartialEmailMatch)); SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "any.com")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -972,7 +974,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.address(AddressType.Bcc, "[email protected]")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m9.getUid()); } @@ -981,7 +983,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery.UidRange[] numericRanges = {new SearchQuery.UidRange(m2.getUid(), m4.getUid()), new SearchQuery.UidRange(m6.getUid(), m7.getUid())}; SearchQuery searchQuery = SearchQuery.of(SearchQuery.uid(numericRanges)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m6.getUid(), m7.getUid()); } @@ -990,7 +992,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery.UidRange[] numericRanges = {}; SearchQuery searchQuery = SearchQuery.of(SearchQuery.uid(numericRanges)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -998,7 +1000,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void youShouldBeAbleToSpecifySeveralCriterionOnASingleQuery() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.headerExists("Precedence"), SearchQuery.modSeqGreaterThan(6L)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid(), m9.getUid()); } @@ -1008,7 +1010,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery.headerExists("Precedence"), SearchQuery.modSeqGreaterThan(6L))); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m8.getUid(), m9.getUid()); } @@ -1019,7 +1021,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery.uid(numericRanges), SearchQuery.modSeqGreaterThan(6L))); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m2.getUid(), m3.getUid(), m4.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -1028,7 +1030,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of( SearchQuery.not(SearchQuery.headerExists("Precedence"))); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m7.getUid()); } @@ -1036,7 +1038,7 @@ public abstract class AbstractMessageSearchIndexTest { void sortShouldOrderMessages() throws Exception { SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.Arrival)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m1.getUid(), m2.getUid(), m3.getUid(), m5.getUid(), m4.getUid(), m6.getUid(), m7.getUid(), m8.getUid(), m9.getUid()); } @@ -1044,7 +1046,7 @@ public abstract class AbstractMessageSearchIndexTest { void revertSortingShouldReturnElementsInAReversedOrder() throws Exception { SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.Arrival, Order.REVERSE)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m9.getUid(), m8.getUid(), m7.getUid(), m6.getUid(), m4.getUid(), m5.getUid(), m3.getUid(), m2.getUid(), m1.getUid()); } @@ -1056,7 +1058,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m3.getUid(), m2.getUid()); } @@ -1068,7 +1070,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m5.getUid()); } @@ -1080,7 +1082,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Arrival, Order.REVERSE)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m4.getUid(), m9.getUid()); } @@ -1088,7 +1090,7 @@ public abstract class AbstractMessageSearchIndexTest { protected void mailsContainsShouldIncludeMailHavingAttachmentsMatchingTheRequest() throws Exception { SearchQuery searchQuery = SearchQuery.of(SearchQuery.mailContains("root mailing list")); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m1.getUid(), m6.getUid()); } @@ -1100,7 +1102,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.MailboxCc)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m3.getUid(), m5.getUid(), m4.getUid(), m2.getUid()); // 2 : No cc // 3 : Cc : [email protected] @@ -1116,7 +1118,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.MailboxFrom)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), m5.getUid()); // m3 : [email protected] // m2 : [email protected] @@ -1132,7 +1134,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.MailboxTo)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m5.getUid(), m3.getUid(), m2.getUid(), m4.getUid()); // 5 : "zzz" <[email protected]> // 3 : "aaa" <[email protected]> @@ -1148,7 +1150,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.BaseSubject)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m4.getUid(), m3.getUid(), m2.getUid(), m5.getUid()); // 2 : [jira] [Created] (MAILBOX-234) Convert Message into JSON // 3 : [jira] [Closed] (MAILBOX-217) We should index attachment in elastic search @@ -1164,7 +1166,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Size)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m2.getUid(), m3.getUid(), m5.getUid(), m4.getUid()); // 2 : 3210 o // 3 : 3647 o @@ -1180,7 +1182,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.DisplayFrom)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m4.getUid(), m3.getUid(), m5.getUid(), m2.getUid()); // 2 : Tellier Benoit (JIRA) // 3 : efij @@ -1196,7 +1198,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.DisplayTo)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m3.getUid(), m2.getUid(), m4.getUid(), m5.getUid()); // 2 : abc // 3 : aaa @@ -1212,7 +1214,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.SentDate)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m5.getUid(), m4.getUid(), m2.getUid(), m3.getUid()); // 2 : 4 Jun 2015 09:23:37 // 3 : 4 Jun 2015 09:27:37 @@ -1228,7 +1230,7 @@ public abstract class AbstractMessageSearchIndexTest { .sorts(new Sort(SortClause.Uid)) .build(); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsExactly(m2.getUid(), m3.getUid(), m4.getUid(), m5.getUid()); } @@ -1243,7 +1245,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.mailContains("User message banana")); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid()); } @@ -1258,7 +1260,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentContains("beautiful banana")); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .containsExactly(messageWithBeautifulBananaAsTextAttachment.getUid()); } @@ -1282,7 +1284,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentContains("beautiful banana")); - assertThat(messageSearchIndex.search(session, mailbox2, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox2, searchQuery).toStream()) .containsExactly(messageWithBeautifulBananaAsPDFAttachment.getUid()); } @@ -1334,7 +1336,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.SentDate)); - assertThat(messageManager.search(searchQuery, session)) + assertThat(Flux.from(messageManager.search(searchQuery, session)).toStream()) .containsExactly(message2.getUid(), message1.getUid(), message3.getUid()); @@ -1373,7 +1375,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.allSortedWith(new Sort(SortClause.SentDate)); - assertThat(messageManager.search(searchQuery, session)) + assertThat(Flux.from(messageManager.search(searchQuery, session)).toStream()) .containsExactly(message2.getUid(), message1.getUid(), message3.getUid()); @@ -1384,7 +1386,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.mimeMessageID("<[email protected]>")); // Correspond to mail.eml - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(m3.getUid()); } @@ -1406,7 +1408,7 @@ public abstract class AbstractMessageSearchIndexTest { .await() .atMost(30, TimeUnit.SECONDS) .until( - () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).count() == 9); + () -> messageSearchIndex.search(session, newBox.getMailboxEntity(), searchQuery).toStream().count() == 9); } @Test @@ -1433,7 +1435,7 @@ public abstract class AbstractMessageSearchIndexTest { SearchQuery searchQuery = SearchQuery.of(SearchQuery.attachmentFileName(fileName)); - assertThat(messageSearchIndex.search(session, mailbox, searchQuery)) + assertThat(messageSearchIndex.search(session, mailbox, searchQuery).toStream()) .containsOnly(mWithFileName.getUid()); } } diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java index 4f23ebc..df5bfde 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/AbstractMailboxProcessor.java @@ -571,7 +571,7 @@ public abstract class AbstractMailboxProcessor<R extends ImapRequest> extends Ab } searchQuery.andCriteria(SearchQuery.uid(nRanges)); searchQuery.andCriteria(SearchQuery.modSeqGreaterThan(changedSince)); - try (Stream<MessageUid> uids = mailbox.search(searchQuery.build(), session)) { + try (Stream<MessageUid> uids = Flux.from(mailbox.search(searchQuery.build(), session)).toStream()) { uids.forEach(vanishedUids::remove); } UidRange[] vanishedIdRanges = uidRanges(MessageRange.toRanges(vanishedUids)); diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java index e86e850..d93742f 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/SearchProcessor.java @@ -225,9 +225,9 @@ public class SearchProcessor extends AbstractMailboxProcessor<SearchRequest> imp } private Collection<MessageUid> performUidSearch(MessageManager mailbox, SearchQuery query, MailboxSession msession) throws MailboxException { - try (Stream<MessageUid> stream = mailbox.search(query, msession)) { - return stream.collect(Guavate.toImmutableList()); - } + return Flux.from(mailbox.search(query, msession)) + .collect(Guavate.toImmutableList()) + .block(); } private long[] toArray(Collection<Long> results) { diff --git a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java index 1c8d8db..087e4ad 100644 --- a/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java +++ b/protocols/imap/src/main/java/org/apache/james/imap/processor/base/SelectedMailboxImpl.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Optional; import java.util.Set; import java.util.TreeSet; -import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -58,7 +57,9 @@ import org.apache.james.mailbox.model.UpdatedFlags; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -151,9 +152,10 @@ public class SelectedMailboxImpl implements SelectedMailbox, MailboxListener { synchronized (applicableFlagsLock) { applicableFlags = applicableFlags.updateWithNewFlags(messageManager.getApplicableFlags(mailboxSession)); } - try (Stream<MessageUid> stream = messageManager.search(SearchQuery.of(SearchQuery.all()), mailboxSession)) { - uidMsnConverter.addAll(stream.collect(Guavate.toImmutableList())); - } + ImmutableList<MessageUid> uids = Flux.from(messageManager.search(SearchQuery.of(SearchQuery.all()), mailboxSession)) + .collect(Guavate.toImmutableList()) + .block(); + uidMsnConverter.addAll(uids); } @Override diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java index 1d48407..97bdcbc 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/SearchProcessorTest.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.TimeZone; -import java.util.stream.Stream; import javax.mail.Flags; import javax.mail.Flags.Flag; @@ -71,6 +70,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import reactor.core.publisher.Flux; + public class SearchProcessorTest { private static final int DAY = 6; @@ -463,7 +464,7 @@ public class SearchProcessorTest { private void check(SearchKey key, final SearchQuery query) throws Exception { session.setMailboxSession(mailboxSession); - when(mailbox.search(query, mailboxSession)).thenReturn(Stream.empty()); + when(mailbox.search(query, mailboxSession)).thenReturn(Flux.empty()); when(selectedMailbox.getApplicableFlags()).thenReturn(new Flags()); when(selectedMailbox.hasNewApplicableFlags()).thenReturn(false); diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java index f15d929..57b95b3 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/MailboxEventAnalyserTest.java @@ -25,7 +25,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Date; -import java.util.stream.Stream; import javax.mail.Flags; @@ -60,6 +59,8 @@ import org.apache.james.metrics.tests.RecordingMetricFactory; import org.junit.Before; import org.junit.Test; +import reactor.core.publisher.Flux; + public class MailboxEventAnalyserTest { private static final MessageUid UID = MessageUid.of(900); private static final UpdatedFlags ADD_RECENT_UPDATED_FLAGS = UpdatedFlags.builder() @@ -154,7 +155,7 @@ public class MailboxEventAnalyserTest { when(messageManager.getApplicableFlags(any())).thenReturn(new Flags()); when(messageManager.getId()).thenReturn(MAILBOX_ID); when(messageManager.search(any(), any())) - .thenReturn(Stream.of(MESSAGE_UID)); + .thenReturn(Flux.just(MESSAGE_UID)); when(messageManager.getMessages(any(), any(), any())) .thenReturn(new SingleMessageResultIterator(messageResult)); diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java index 02fd802..02bd5a4 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java @@ -30,13 +30,13 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.time.Duration; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import java.util.stream.Stream; import javax.mail.Flags; @@ -74,6 +74,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -115,7 +116,8 @@ class SelectedMailboxImplTest { when(messageManager.getApplicableFlags(any(MailboxSession.class))) .thenReturn(new Flags()); when(messageManager.search(any(SearchQuery.class), any(MailboxSession.class))) - .then(delayedSearchAnswer()); + .thenReturn(Flux.just(MessageUid.of(1), MessageUid.of(3)) + .delayElements(Duration.ofSeconds(1))); when(messageManager.getId()).thenReturn(mailboxId); imapSession.setMailboxSession(mock(MailboxSession.class)); @@ -186,13 +188,6 @@ class SelectedMailboxImplTest { .isEqualTo(1); } - Answer<Stream<MessageUid>> delayedSearchAnswer() { - return invocation -> { - Thread.sleep(1000); - return Stream.of(MessageUid.of(1), MessageUid.of(3)); - }; - } - Answer<Mono<Registration>> generateEmitEventAnswer(AtomicInteger success) { return generateEmitEventAnswer(event(), success); } diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java index a35b5d9..1377ac7 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/SearchModuleChooser.java @@ -22,7 +22,6 @@ package org.apache.james; import java.util.Collection; import java.util.EnumSet; import java.util.List; -import java.util.stream.Stream; import javax.mail.Flags; @@ -102,7 +101,7 @@ public class SearchModuleChooser { } @Override - public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { throw new NotImplementedException("not implemented"); } diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java index 7dbe612..9b3fb21 100644 --- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java @@ -22,7 +22,6 @@ package org.apache.james; import java.util.Collection; import java.util.EnumSet; import java.util.List; -import java.util.stream.Stream; import javax.mail.Flags; @@ -80,7 +79,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex { } @Override - public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { + public Flux<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException { throw new NotImplementedException("not implemented"); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
