This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 58af5c33e911053e9971ff142b26ff640a95a518 Author: Benoit Tellier <[email protected]> AuthorDate: Sun Apr 5 23:50:42 2020 +0700 JAMES-3149 Reactive GetMessageList --- .../versions/CassandraSchemaVersionManager.java | 22 ++--- .../SessionWithInitializedTablesFactoryTest.java | 14 ++-- .../CassandraSchemaVersionManagerTest.java | 6 +- .../org/apache/james/mailbox/MailboxManager.java | 2 +- .../apache/james/mailbox/MailboxManagerTest.java | 49 ++++++++---- .../cassandra/mail/CassandraMailboxMapper.java | 52 ++++++------ .../cassandra/mail/CassandraMessageIdMapper.java | 27 +++++-- .../task/SolveMailboxInconsistenciesService.java | 2 +- .../cassandra/mail/CassandraMailboxMapperTest.java | 93 ++++++++++++++-------- .../ElasticSearchListeningMessageSearchIndex.java | 9 +-- .../search/ElasticSearchSearcherTest.java | 3 +- .../james/mailbox/jpa/mail/JPAMailboxMapper.java | 16 ++-- .../jpa/mail/TransactionalMailboxMapper.java | 5 +- .../lucene/search/LuceneMessageSearchIndex.java | 10 ++- .../LuceneMailboxMessageSearchIndexTest.java | 12 ++- .../mailbox/maildir/mail/MaildirMailboxMapper.java | 18 ++--- .../inmemory/mail/InMemoryMailboxMapper.java | 18 ++--- .../james/vault/DeletedMessageVaultHook.java | 14 ++-- .../james/mailbox/store/StoreMailboxManager.java | 53 ++++++------ .../james/mailbox/store/mail/MailboxMapper.java | 26 +++--- .../store/quota/DefaultUserQuotaRootResolver.java | 3 +- .../store/search/LazyMessageSearchIndex.java | 4 +- .../mailbox/store/search/MessageSearchIndex.java | 5 +- .../store/search/SimpleMessageSearchIndex.java | 31 ++++---- .../store/AbstractCombinationManagerTest.java | 5 +- .../store/mail/model/MailboxMapperACLTest.java | 24 +++--- .../store/mail/model/MailboxMapperTest.java | 44 ++-------- .../quota/DefaultUserQuotaRootResolverTest.java | 4 +- .../search/AbstractMessageSearchIndexTest.java | 36 ++++++--- .../CassandraSchemaVersionStartUpCheck.java | 6 +- .../org/apache/james/FakeMessageSearchIndex.java | 4 +- .../cassandra/CassandraRecipientRewriteTable.java | 2 +- .../jmap/draft/methods/GetMessageListMethod.java | 71 +++++++++-------- .../james/jmap/draft/methods/ReferenceUpdater.java | 5 +- .../mailet/ExtractMDNOriginalJMAPMessageId.java | 4 +- .../routes/DeletedMessagesVaultRoutesTest.java | 3 +- 36 files changed, 378 insertions(+), 324 deletions(-) diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java index 9b47f39..516fbd3 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java @@ -32,6 +32,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import reactor.core.publisher.Mono; + public class CassandraSchemaVersionManager { public static final SchemaVersion MIN_VERSION = new SchemaVersion(5); public static final SchemaVersion MAX_VERSION = new SchemaVersion(7); @@ -65,24 +67,26 @@ public class CassandraSchemaVersionManager { this.minVersion = minVersion; this.maxVersion = maxVersion; - this.initialSchemaVersion = computeVersion(); + this.initialSchemaVersion = computeVersion().block(); } - public boolean isBefore(SchemaVersion minimum) { - return initialSchemaVersion.isBefore(minimum) + public Mono<Boolean> isBefore(SchemaVersion minimum) { + if (initialSchemaVersion.isBefore(minimum)) { // If we started with a legacy james then maybe schema version had been updated since then - && computeVersion().isBefore(minimum); + return computeVersion() + .map(computedVersion -> computedVersion.isBefore(minimum)); + } + return Mono.just(false); } - public SchemaVersion computeVersion() { + public Mono<SchemaVersion> computeVersion() { return schemaVersionDAO .getCurrentSchemaVersion() - .block() - .orElseGet(() -> { + .map(maybeVersion -> maybeVersion.orElseGet(() -> { LOGGER.warn("No schema version information found on Cassandra, we assume schema is at version {}", CassandraSchemaVersionManager.DEFAULT_VERSION); return DEFAULT_VERSION; - }); + })); } public SchemaVersion getMinimumSupportedVersion() { @@ -94,7 +98,7 @@ public class CassandraSchemaVersionManager { } public SchemaState computeSchemaState() { - SchemaVersion version = computeVersion(); + SchemaVersion version = computeVersion().block(); if (version.isBefore(minVersion)) { return TOO_OLD; } else if (version.isBefore(maxVersion)) { diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java index 3c3345f..817fe85 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/init/SessionWithInitializedTablesFactoryTest.java @@ -83,39 +83,39 @@ class SessionWithInitializedTablesFactoryTest { @Test void createSessionShouldSetTheLatestSchemaVersionWhenCreatingTypesAndTables() { - assertThat(versionManager(testee.get()).computeVersion()) + assertThat(versionManager(testee.get()).computeVersion().block()) .isEqualTo(MAX_VERSION); } @Test void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHaveNotChanged() { Session session = testee.get(); - assertThat(versionManager(session).computeVersion()) + assertThat(versionManager(session).computeVersion().block()) .isEqualTo(MAX_VERSION); new CassandraTableManager(MODULE, session).clearAllTables(); versionManagerDAO(session).updateVersion(MIN_VERSION); - assertThat(versionManager(session).computeVersion()) + assertThat(versionManager(session).computeVersion().block()) .isEqualTo(MIN_VERSION); - assertThat(versionManager(testee.get()).computeVersion()) + assertThat(versionManager(testee.get()).computeVersion().block()) .isEqualTo(MIN_VERSION); } @Test void createSessionShouldKeepTheSetSchemaVersionWhenTypesAndTablesHavePartiallyChanged() { Session session = testee.get(); - assertThat(versionManager(session).computeVersion()) + assertThat(versionManager(session).computeVersion().block()) .isEqualTo(MAX_VERSION); new CassandraTableManager(MODULE, session).clearAllTables(); versionManagerDAO(session).updateVersion(MIN_VERSION); - assertThat(versionManager(session).computeVersion()) + assertThat(versionManager(session).computeVersion().block()) .isEqualTo(MIN_VERSION); session.execute(SchemaBuilder.dropTable(TABLE_NAME)); session.execute(SchemaBuilder.dropType(TYPE_NAME)); - assertThat(versionManager(testee.get()).computeVersion()) + assertThat(versionManager(testee.get()).computeVersion().block()) .isEqualTo(MIN_VERSION); } diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java index 64c498e..4fb33fb 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManagerTest.java @@ -74,7 +74,7 @@ class CassandraSchemaVersionManagerTest { minVersion, maxVersion); - assertThat(testee.isBefore(maxVersion)).isTrue(); + assertThat(testee.isBefore(maxVersion).block()).isTrue(); } @Test @@ -89,7 +89,7 @@ class CassandraSchemaVersionManagerTest { minVersion, maxVersion); - assertThat(testee.isBefore(maxVersion)).isFalse(); + assertThat(testee.isBefore(maxVersion).block()).isFalse(); } @Test @@ -107,7 +107,7 @@ class CassandraSchemaVersionManagerTest { when(schemaVersionDAO.getCurrentSchemaVersion()) .thenReturn(Mono.just(Optional.of(maxVersion))); - assertThat(testee.isBefore(maxVersion)).isFalse(); + assertThat(testee.isBefore(maxVersion).block()).isFalse(); } @Test diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java index 6b83d9f..9f31163 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/MailboxManager.java @@ -251,7 +251,7 @@ public interface MailboxManager extends RequestAware, RightManager, MailboxAnnot * @param session * the context for this call, not null */ - List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException; + Publisher<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException; /** * Does the given mailbox exist? diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java index 5397ed9..f51c124 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/MailboxManagerTest.java @@ -94,6 +94,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -1208,7 +1209,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsOnly(cacahueteMessageId, pirouetteMessageId); } @@ -1237,7 +1239,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .from(new SearchQuery()) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsOnly(messageId); } @@ -1264,7 +1267,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .from(new SearchQuery()) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); } @@ -1284,7 +1288,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .from(new SearchQuery()) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); } @@ -1305,7 +1310,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); } @@ -1325,7 +1331,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .notInMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); } @@ -1346,7 +1353,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .notInMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); } @@ -1375,7 +1383,8 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(searchedMailboxId) .build(); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId); } @@ -1850,9 +1859,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .isEmpty(); - assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId1, messageId2); } @@ -1889,9 +1900,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId2); - assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(composedMessageId1.getMessageId()); } @@ -1979,9 +1992,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId1, messageId2); - assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId1, messageId2); } @@ -2019,9 +2034,11 @@ public abstract class MailboxManagerTest<T extends MailboxManager> { .inMailboxes(otherMailboxId) .build(); - assertThat(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(inboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId1, messageId2); - assertThat(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + assertThat(Flux.from(mailboxManager.search(otherMailboxQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()) .containsExactly(messageId1); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index d74ce1c..5a15d9e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -20,9 +20,7 @@ package org.apache.james.mailbox.cassandra.mail; import java.time.Duration; -import java.util.Collection; import java.util.List; -import java.util.stream.Stream; import javax.inject.Inject; @@ -82,7 +80,7 @@ public class CassandraMailboxMapper implements MailboxMapper { this.versionManager = versionManager; } - private boolean needMailboxPathV1Support() { + private Mono<Boolean> needMailboxPathV1Support() { return versionManager.isBefore(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION); } @@ -96,12 +94,15 @@ public class CassandraMailboxMapper implements MailboxMapper { } private Flux<Void> deletePath(Mailbox mailbox) { - if (needMailboxPathV1Support()) { - return Flux.merge( - mailboxPathDAO.delete(mailbox.generateAssociatedPath()), - mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())); - } - return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())); + return needMailboxPathV1Support() + .flatMapMany(needSupport -> { + if (needSupport) { + return Flux.merge( + mailboxPathDAO.delete(mailbox.generateAssociatedPath()), + mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())); + } + return Flux.from(mailboxPathV2DAO.delete(mailbox.generateAssociatedPath())); + }); } @Override @@ -144,11 +145,9 @@ public class CassandraMailboxMapper implements MailboxMapper { } @Override - public Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) { - return Flux.fromIterable(mailboxIds) - .map(CassandraId.class::cast) - .concatMap(this::retrieveMailbox) - .toStream(); + public Mono<Mailbox> findMailboxByIdReactive(MailboxId id) { + CassandraId mailboxId = (CassandraId) id; + return retrieveMailbox(mailboxId); } private Mono<Mailbox> retrieveMailbox(CassandraId mailboxId) { @@ -164,24 +163,25 @@ public class CassandraMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { + public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { String fixedNamespace = query.getFixedNamespace(); Username fixedUser = query.getFixedUser(); return listPaths(fixedNamespace, fixedUser) .filter(idAndPath -> query.isPathMatch(idAndPath.getMailboxPath())) .distinct(CassandraIdAndPath::getMailboxPath) - .concatMap(this::retrieveMailbox) - .collectList() - .block(); + .concatMap(this::retrieveMailbox); } private Flux<CassandraIdAndPath> listPaths(String fixedNamespace, Username fixedUser) { - if (needMailboxPathV1Support()) { - return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser), - mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser)); - } - return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser); + return needMailboxPathV1Support() + .flatMapMany(needSupport -> { + if (needSupport) { + return Flux.concat(mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser), + mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser)); + } + return mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser); + }); } private Mono<Mailbox> retrieveMailbox(CassandraIdAndPath idAndPath) { @@ -302,12 +302,10 @@ public class CassandraMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { + public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { return userMailboxRightsDAO.listRightsForUser(userName) .filter(mailboxId -> mailboxId.getRight().contains(right)) .map(Pair::getLeft) - .flatMap(this::retrieveMailbox) - .collectList() - .block(); + .flatMap(this::retrieveMailbox); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 47cd80b..107e37b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -32,6 +32,7 @@ import org.apache.james.mailbox.MessageManager; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.exception.MailboxNotFoundException; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MailboxId; @@ -48,6 +49,7 @@ import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.fge.lambdas.runnable.ThrowingRunnable; import com.github.steveash.guavate.Guavate; import com.google.common.collect.Multimap; @@ -129,20 +131,31 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public void save(MailboxMessage mailboxMessage) throws MailboxException { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); - mailboxMapper.findMailboxById(mailboxId); - - messageDAO.save(mailboxMessage) + unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId) + .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId))) + .then(messageDAO.save(mailboxMessage)) .thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId)) - .block(); + .block()); } @Override public void copyInMailbox(MailboxMessage mailboxMessage) throws MailboxException { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); - mailboxMapper.findMailboxById(mailboxId); + unbox(() -> mailboxMapper.findMailboxByIdReactive(mailboxId) + .switchIfEmpty(Mono.error(new MailboxNotFoundException(mailboxId))) + .then(saveMessageMetadata(mailboxMessage, mailboxId)) + .block()); + } - saveMessageMetadata(mailboxMessage, mailboxId) - .block(); + private void unbox(ThrowingRunnable runnable) throws MailboxNotFoundException { + try { + runnable.run(); + } catch (RuntimeException e) { + if (e.getCause() instanceof MailboxNotFoundException) { + throw (MailboxNotFoundException) e.getCause(); + } + throw e; + } } private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index 5aeb181..a10fc6d 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -375,7 +375,7 @@ public class SolveMailboxInconsistenciesService { } private void assertValidVersion() { - SchemaVersion version = versionManager.computeVersion(); + SchemaVersion version = versionManager.computeVersion().block(); boolean isVersionValid = version.isAfterOrEquals(MAILBOX_PATH_V_2_MIGRATION_PERFORMED_VERSION); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java index f6d1dc2..f8e6839 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java @@ -23,8 +23,6 @@ import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.mailbox.model.MailboxAssertingTool.softly; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; import java.util.List; @@ -61,7 +59,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.github.fge.lambdas.Throwing; import com.github.fge.lambdas.runnable.ThrowingRunnable; -import reactor.core.publisher.Mono; class CassandraMailboxMapperTest { private static final UidValidity UID_VALIDITY = UidValidity.of(52); @@ -162,7 +159,8 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) .isEqualTo(inboxRenamed); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); @@ -189,7 +187,8 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) .isEqualTo(inboxRenamed); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); @@ -212,7 +211,8 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -235,7 +235,8 @@ class CassandraMailboxMapperTest { .isInstanceOf(MailboxNotFoundException.class); softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .isEmpty(); })); } @@ -253,9 +254,11 @@ class CassandraMailboxMapperTest { SoftAssertions.assertSoftly(softly -> { softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .isEmpty(); }); } @@ -282,7 +285,8 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -304,7 +308,8 @@ class CassandraMailboxMapperTest { doQuietly(() -> testee.rename(inboxRenamed)); SoftAssertions.assertSoftly(Throwing.consumer(softly -> { - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -328,7 +333,8 @@ class CassandraMailboxMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed)) .isInstanceOf(MailboxNotFoundException.class); - softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) .isEmpty(); })); } @@ -353,7 +359,8 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -375,7 +382,8 @@ class CassandraMailboxMapperTest { doQuietly(() -> testee.rename(inboxRenamed)); SoftAssertions.assertSoftly(Throwing.consumer(softly -> { - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -399,7 +407,8 @@ class CassandraMailboxMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThatThrownBy(() -> testee.findMailboxByPath(inboxPathRenamed)) .isInstanceOf(MailboxNotFoundException.class); - softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) .isEmpty(); })); } @@ -422,11 +431,13 @@ class CassandraMailboxMapperTest { .doesNotThrowAnyException(); softly.assertThatCode(() -> testee.findMailboxByPath(inboxPath)) .doesNotThrowAnyException(); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -468,11 +479,13 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -495,11 +508,13 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPath).block()) .isEqualTo(inbox); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inbox)); @@ -525,9 +540,11 @@ class CassandraMailboxMapperTest { .isInstanceOf(MailboxNotFoundException.class); softly.assertThat(testee.findMailboxByPath(inboxPath).blockOptional()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .isEmpty(); })); } @@ -556,13 +573,16 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) .isEqualTo(inboxRenamed); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); @@ -592,14 +612,17 @@ class CassandraMailboxMapperTest { softly(softly) .assertThat(testee.findMailboxByPath(inboxPathRenamed).block()) .isEqualTo(inboxRenamed); - softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxSearchQuery) + .collectList().block()) .isEmpty(); - softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(inboxRenamedSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); - softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery)) + softly.assertThat(testee.findMailboxWithPathLike(allMailboxesSearchQuery) + .collectList().block()) .hasOnlyOneElementSatisfying(searchMailbox -> softly(softly) .assertThat(searchMailbox) .isEqualTo(inboxRenamed)); @@ -763,7 +786,8 @@ class CassandraMailboxMapperTest { .username(USER) .expression(Wildcard.INSTANCE) .build() - .asUserBound()); + .asUserBound()) + .collectList().block(); assertThat(mailboxes).containsOnly(MAILBOX); } @@ -782,7 +806,8 @@ class CassandraMailboxMapperTest { .username(USER) .expression(Wildcard.INSTANCE) .build() - .asUserBound()); + .asUserBound()) + .collectList().block(); assertThat(mailboxes).containsOnly(MAILBOX); } @@ -799,7 +824,8 @@ class CassandraMailboxMapperTest { .username(USER) .expression(Wildcard.INSTANCE) .build() - .asUserBound()); + .asUserBound()) + .collectList().block(); assertThat(mailboxes).containsOnly(MAILBOX); } @@ -877,7 +903,8 @@ class CassandraMailboxMapperTest { .username(USER) .expression(Wildcard.INSTANCE) .build() - .asUserBound())) + .asUserBound()) + .collectList().block()) .containsOnly(MAILBOX); } } diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index b64ecb3..a85467b 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -62,6 +62,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex { @@ -118,11 +119,11 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe } @Override - public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) { + public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) { Preconditions.checkArgument(session != null, "'session' is mandatory"); if (mailboxIds.isEmpty()) { - return ImmutableList.of(); + return Flux.empty(); } return searcher.search(mailboxIds, searchQuery, Optional.empty()) @@ -130,9 +131,7 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe .map(SearchResult::getMessageId) .flatMap(Mono::justOrEmpty) .distinct() - .take(limit) - .collect(Guavate.toImmutableList()) - .block(); + .take(limit); } @Override diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java index f7d6564..fcaa97b 100644 --- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java +++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/search/ElasticSearchSearcherTest.java @@ -154,7 +154,8 @@ class ElasticSearchSearcherTest { .stream() .map(ComposedMessageId::getMessageId) .collect(Guavate.toImmutableList()); - assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1)) + assertThat(storeMailboxManager.search(multimailboxesSearchQuery, session, numberOfMailboxes + 1) + .collectList().block()) .containsExactlyInAnyOrderElementsOf(expectedMessageIds); } diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java index 777edf3..c2d4514 100644 --- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java +++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMailboxMapper.java @@ -48,8 +48,8 @@ import org.apache.james.mailbox.store.mail.MailboxMapper; import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -186,15 +186,13 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM } @Override - public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { + public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { try { String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query); - return findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike) - .getResultList() - .stream() + return Flux.fromIterable(findMailboxWithPathLikeTypedQuery(query.getFixedNamespace(), query.getFixedUser(), pathLike) + .getResultList()) .map(JPAMailbox::toMailbox) - .filter(query::matches) - .collect(Guavate.toImmutableList()); + .filter(query::matches); } catch (PersistenceException e) { throw new MailboxException("Search of mailbox " + query + " failed", e); } @@ -250,7 +248,7 @@ public class JPAMailboxMapper extends JPATransactionalMapper implements MailboxM } @Override - public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException { - return ImmutableList.of(); + public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { + return Flux.empty(); } } diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java index 30b4da7..787db5b 100644 --- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java +++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMailboxMapper.java @@ -36,6 +36,7 @@ import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.transaction.Mapper; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class TransactionalMailboxMapper implements MailboxMapper { @@ -81,7 +82,7 @@ public class TransactionalMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { + public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { return wrapped.findMailboxWithPathLike(query); } @@ -106,7 +107,7 @@ public class TransactionalMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException { + public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { return wrapped.findNonPersonalMailboxes(userName, right); } diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java index b93306b..793f4a0 100644 --- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java +++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java @@ -123,6 +123,8 @@ import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + /** * Lucene based {@link ListeningMessageSearchIndex} which offers message searching via a Lucene index */ @@ -461,18 +463,18 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex { } @Override - public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { + public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); if (mailboxIds.isEmpty()) { - return ImmutableList.of(); + return Flux.empty(); } - return searchMultimap(mailboxIds, searchQuery) + return Flux.fromIterable(searchMultimap(mailboxIds, searchQuery) .stream() .map(searchResult -> searchResult.getMessageId().get()) .filter(SearchUtil.distinct()) .limit(Long.valueOf(limit).intValue()) - .collect(Guavate.toImmutableList()); + .collect(Guavate.toImmutableList())); } private List<SearchResult> searchMultimap(Collection<MailboxId> mailboxIds, SearchQuery searchQuery) throws MailboxException { diff --git a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java index 6ff52fb..4fa87f4 100644 --- a/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java +++ b/mailbox/lucene/src/test/java/org/apache/james/mailbox/lucene/search/LuceneMailboxMessageSearchIndexTest.java @@ -310,7 +310,8 @@ class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.bodyContains("My Body")); - List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT); + List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT) + .collectList().block(); assertThat(result).containsOnly(id1, id2); } @@ -323,7 +324,8 @@ class LuceneMailboxMessageSearchIndexTest { List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox3.getMailboxId()), query, - LIMIT); + LIMIT) + .collectList().block(); assertThat(result).containsOnly(id1); } @@ -333,7 +335,8 @@ class LuceneMailboxMessageSearchIndexTest { SearchQuery query = new SearchQuery(); query.andCriteria(SearchQuery.all()); - List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT); + List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, LIMIT) + .collectList().block(); // The query is not limited to one mailbox and we have 5 indexed messages assertThat(result).hasSize(5); @@ -345,7 +348,8 @@ class LuceneMailboxMessageSearchIndexTest { query.andCriteria(SearchQuery.all()); int limit = 1; - List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit); + List<MessageId> result = index.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId(), mailbox3.getMailboxId()), query, limit) + .collectList().block(); assertThat(result).hasSize(limit); } diff --git a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java index d9b5c0d..05d9799 100644 --- a/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java +++ b/mailbox/maildir/src/main/java/org/apache/james/mailbox/maildir/mail/MaildirMailboxMapper.java @@ -51,9 +51,7 @@ import org.apache.james.mailbox.store.transaction.NonTransactionalMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.steveash.guavate.Guavate; -import com.google.common.collect.ImmutableList; - +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class MaildirMailboxMapper extends NonTransactionalMapper implements MailboxMapper { @@ -128,7 +126,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail } @Override - public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { + public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException { String pathLike = MailboxExpressionBackwardCompatibility.getPathLike(query); final Pattern searchPattern = Pattern.compile("[" + MaildirStore.maildirDelimiter + "]" + pathLike.replace(".", "\\.").replace(MaildirStore.WILDCARD, ".*")); @@ -147,9 +145,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail Mailbox mailbox = maildirStore.loadMailbox(session, root, query.getFixedNamespace(), query.getFixedUser(), ""); mailboxList.add(0, mailbox); } - return mailboxList.stream() - .filter(query::matches) - .collect(Guavate.toImmutableList()); + return Flux.fromIterable(mailboxList) + .filter(query::matches); } @Override @@ -159,7 +156,8 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail .userAndNamespaceFrom(mailbox.generateAssociatedPath()) .expression(new PrefixedWildcard(mailbox.getName() + delimiter)) .build() - .asUserBound()); + .asUserBound()) + .collectList().block(); return mailboxes.size() > 0; } @@ -333,7 +331,7 @@ public class MaildirMailboxMapper extends NonTransactionalMapper implements Mail } @Override - public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException { - return ImmutableList.of(); + public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { + return Flux.empty(); } } diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java index fc55302..77db71d 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java @@ -40,10 +40,10 @@ import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.mail.MailboxMapper; -import com.github.steveash.guavate.Guavate; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class InMemoryMailboxMapper implements MailboxMapper { @@ -84,12 +84,10 @@ public class InMemoryMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { - return mailboxesByPath.values() - .stream() + public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) { + return Flux.fromIterable(mailboxesByPath.values()) .filter(query::matches) - .map(Mailbox::new) - .collect(Guavate.toImmutableList()); + .map(Mailbox::new); } @Override @@ -166,11 +164,9 @@ public class InMemoryMailboxMapper implements MailboxMapper { } @Override - public List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException { - return mailboxesByPath.values() - .stream() - .filter(mailbox -> hasRightOn(mailbox, userName, right)) - .collect(Guavate.toImmutableList()); + public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right) { + return Flux.fromIterable(mailboxesByPath.values()) + .filter(mailbox -> hasRightOn(mailbox, userName, right)); } private Boolean hasRightOn(Mailbox mailbox, Username userName, Right right) { diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java index a69b4bd..cdb5071 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/DeletedMessageVaultHook.java @@ -35,6 +35,7 @@ import org.apache.james.mailbox.MetadataWithMailboxId; import org.apache.james.mailbox.SessionProvider; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.extension.PreDeletionHook; +import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; @@ -154,18 +155,19 @@ public class DeletedMessageVaultHook implements PreDeletionHook { .flatMap(groupFlux -> groupFlux.reduce(DeletedMessageMailboxContext::combine)); } - private Publisher<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException { - Username owner = retrieveMailboxUser(groupedFlux.key()); - return groupedFlux.map(metadata -> new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId()))); + private Flux<DeletedMessageMailboxContext> addOwnerToMetadata(GroupedFlux<MailboxId, MetadataWithMailboxId> groupedFlux) throws MailboxException { + return retrieveMailboxUser(groupedFlux.key()) + .flatMapMany(owner -> groupedFlux.map(metadata -> + new DeletedMessageMailboxContext(metadata.getMessageMetaData().getMessageId(), owner, ImmutableList.of(metadata.getMailboxId())))); } private Pair<MessageId, Username> toMessageIdUserPair(DeletedMessageMailboxContext deletedMessageMetadata) { return Pair.of(deletedMessageMetadata.getMessageId(), deletedMessageMetadata.getOwner()); } - private Username retrieveMailboxUser(MailboxId mailboxId) throws MailboxException { + private Mono<Username> retrieveMailboxUser(MailboxId mailboxId) throws MailboxException { return mapperFactory.getMailboxMapper(session) - .findMailboxById(mailboxId) - .getUser(); + .findMailboxByIdReactive(mailboxId) + .map(Mailbox::getUser); } } \ No newline at end of file diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java index c3d7c02..76f8bfb 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMailboxManager.java @@ -93,6 +93,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -536,7 +537,7 @@ public class StoreMailboxManager implements MailboxManager { .build() .asUserBound(); locker.executeWithLock(from, (LockAwareExecution<Void>) () -> { - List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query); + List<Mailbox> subMailboxes = mapper.findMailboxWithPathLike(query).collectList().block(); for (Mailbox sub : subMailboxes) { String subOriginalName = sub.getName(); String subNewName = newMailboxPath.getName() + subOriginalName.substring(from.getName().length()); @@ -596,7 +597,7 @@ public class StoreMailboxManager implements MailboxManager { } private List<MailboxMetaData> searchMailboxesMetadata(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { - List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right); + List<Mailbox> mailboxes = searchMailboxes(mailboxQuery, session, right).collectList().block(); ImmutableMap<MailboxId, MailboxCounters> counters = getMailboxCounters(mailboxes, session) .stream() @@ -614,16 +615,13 @@ public class StoreMailboxManager implements MailboxManager { .collect(Guavate.toImmutableList()); } - private List<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { + private Flux<Mailbox> searchMailboxes(MailboxQuery mailboxQuery, MailboxSession session, Right right) throws MailboxException { MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session); - Stream<Mailbox> baseMailboxes = mailboxMapper - .findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session)) - .stream(); - Stream<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session); - return Stream.concat(baseMailboxes, delegatedMailboxes) + Flux<Mailbox> baseMailboxes = mailboxMapper.findMailboxWithPathLike(toSingleUserQuery(mailboxQuery, session)); + Flux<Mailbox> delegatedMailboxes = getDelegatedMailboxes(mailboxMapper, mailboxQuery, right, session); + return Flux.merge(baseMailboxes, delegatedMailboxes) .distinct() - .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session))) - .collect(Guavate.toImmutableList()); + .filter(Throwing.predicate(mailbox -> storeRightManager.hasRight(mailbox, right, session))); } static MailboxQuery.UserBound toSingleUserQuery(MailboxQuery mailboxQuery, MailboxSession mailboxSession) { @@ -644,12 +642,12 @@ public class StoreMailboxManager implements MailboxManager { .build()); } - private Stream<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery, - Right right, MailboxSession session) throws MailboxException { + private Flux<Mailbox> getDelegatedMailboxes(MailboxMapper mailboxMapper, MailboxQuery mailboxQuery, + Right right, MailboxSession session) { if (mailboxQuery.isPrivateMailboxes(session)) { - return Stream.of(); + return Flux.empty(); } - return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right).stream(); + return mailboxMapper.findNonPersonalMailboxes(session.getUser(), right); } private MailboxMetaData toMailboxMetadata(MailboxSession session, List<Mailbox> mailboxes, Mailbox mailbox, MailboxCounters counters) throws UnsupportedRightException { @@ -677,32 +675,31 @@ public class StoreMailboxManager implements MailboxManager { } @Override - public List<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException { - ImmutableSet<MailboxId> wantedMailboxesId = - getInMailboxes(expression.getInMailboxes(), session) - .filter(id -> !expression.getNotInMailboxes().contains(id)) - .collect(Guavate.toImmutableSet()); - - return index.search(session, wantedMailboxesId, expression.getSearchQuery(), limit); + public Flux<MessageId> search(MultimailboxesSearchQuery expression, MailboxSession session, long limit) throws MailboxException { + return getInMailboxes(expression.getInMailboxes(), session) + .filter(id -> !expression.getNotInMailboxes().contains(id)) + .collect(Guavate.toImmutableSet()) + .flatMapMany(Throwing.function(ids -> index.search(session, ids, expression.getSearchQuery(), limit))); } - private Stream<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException { - if (inMailboxes.isEmpty()) { + + private Flux<MailboxId> getInMailboxes(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException { + if (inMailboxes.isEmpty()) { return getAllReadableMailbox(session); } else { return filterReadable(inMailboxes, session); } } - private Stream<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException { + private Flux<MailboxId> getAllReadableMailbox(MailboxSession session) throws MailboxException { return searchMailboxes(MailboxQuery.builder().matchesAllMailboxNames().build(), session, Right.Read) - .stream() .map(Mailbox::getMailboxId); } - private Stream<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException { - return mailboxSessionMapperFactory.getMailboxMapper(session) - .findMailboxesById(inMailboxes) + private Flux<MailboxId> filterReadable(ImmutableSet<MailboxId> inMailboxes, MailboxSession session) throws MailboxException { + MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session); + return Flux.fromIterable(inMailboxes) + .concatMap(mailboxMapper::findMailboxByIdReactive) .filter(Throwing.<Mailbox>predicate(mailbox -> storeRightManager.hasRight(mailbox, Right.Read, session)).sneakyThrow()) .map(Mailbox::getMailboxId); } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java index 7ace127..6c5e3b8 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java @@ -18,9 +18,7 @@ ****************************************************************/ package org.apache.james.mailbox.store.mail; -import java.util.Collection; import java.util.List; -import java.util.stream.Stream; import org.apache.james.core.Username; import org.apache.james.mailbox.acl.ACLDiff; @@ -35,8 +33,7 @@ import org.apache.james.mailbox.model.UidValidity; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.transaction.Mapper; -import com.github.fge.lambdas.Throwing; - +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -87,26 +84,25 @@ public interface MailboxMapper extends Mapper { Mailbox findMailboxById(MailboxId mailboxId) throws MailboxException, MailboxNotFoundException; - default Stream<Mailbox> findMailboxesById(Collection<MailboxId> mailboxIds) throws MailboxException { - return mailboxIds.stream() - .flatMap(Throwing.<MailboxId, Stream<Mailbox>>function(id -> { - try { - return Stream.of(findMailboxById(id)); - } catch (MailboxNotFoundException e) { - return Stream.empty(); - } - }).sneakyThrow()); + default Mono<Mailbox> findMailboxByIdReactive(MailboxId id) { + try { + return Mono.justOrEmpty(findMailboxById(id)); + } catch (MailboxNotFoundException e) { + return Mono.empty(); + } catch (MailboxException e) { + return Mono.error(e); + } } /** * Return a List of {@link Mailbox} for the given userName and matching the right */ - List<Mailbox> findNonPersonalMailboxes(Username userName, Right right) throws MailboxException; + Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right right); /** * Return a List of {@link Mailbox} which name is like the given name */ - List<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) + Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query) throws MailboxException; /** diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java index 9c29ac4..6ea1f88 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolver.java @@ -118,6 +118,7 @@ public class DefaultUserQuotaRootResolver implements UserQuotaRootResolver { .user(Username.of(user)) .matchesAllMailboxNames() .build() - .asUserBound()); + .asUserBound()) + .collectList().block(); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java index 4a06c64..6e75f26 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java @@ -47,6 +47,8 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; + /** * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it. * @@ -141,7 +143,7 @@ public class LazyMessageSearchIndex extends ListeningMessageSearchIndex { @Override - public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { + public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { throw new UnsupportedSearchException(); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java index eb1dcee..4873099 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/MessageSearchIndex.java @@ -21,7 +21,6 @@ package org.apache.james.mailbox.store.search; import java.util.Collection; import java.util.EnumSet; -import java.util.List; import java.util.Optional; import java.util.stream.Stream; @@ -34,6 +33,8 @@ import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.SearchQuery; +import reactor.core.publisher.Flux; + /** * An index which can be used to search for MailboxMessage UID's that match a {@link SearchQuery}. * @@ -50,7 +51,7 @@ public interface MessageSearchIndex { /** * Return all uids of all {@link Mailbox}'s the current user has access to which match the {@link SearchQuery} */ - List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException; + Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException; EnumSet<MailboxManager.SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities); diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java index 26b602d..ed6fd17 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java @@ -53,10 +53,11 @@ import org.apache.james.mailbox.store.mail.MessageMapperFactory; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import com.github.fge.lambdas.Throwing; -import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + /** * {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher} * to match them against the {@link SearchQuery}. @@ -108,10 +109,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { @Override public Stream<MessageUid> search(MailboxSession session, final Mailbox mailbox, SearchQuery query) throws MailboxException { Preconditions.checkArgument(session != null, "'session' is mandatory"); - return searchResults(session, ImmutableList.of(mailbox).stream(), query) - .stream() + return searchResults(session, Flux.just(mailbox), query) .filter(searchResult -> searchResult.getMailboxId().equals(mailbox.getMailboxId())) - .map(SearchResult::getMessageUid); + .map(SearchResult::getMessageUid) + .toStream(); } private List<SearchResult> searchResults(MailboxSession session, Mailbox mailbox, SearchQuery query) throws MailboxException { @@ -142,19 +143,17 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { } @Override - public List<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { - MailboxMapper mailboxManager = mailboxMapperFactory.getMailboxMapper(session); + public Flux<MessageId> search(MailboxSession session, final Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { + MailboxMapper mailboxMapper = mailboxMapperFactory.getMailboxMapper(session); - Stream<Mailbox> filteredMailboxes = mailboxIds - .stream() - .map(Throwing.function(mailboxManager::findMailboxById).sneakyThrow()); + Flux<Mailbox> filteredMailboxes = Flux.fromIterable(mailboxIds) + .concatMap(Throwing.function(mailboxMapper::findMailboxByIdReactive).sneakyThrow()); return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit); } - private List<SearchResult> searchResults(MailboxSession session, Stream<Mailbox> mailboxes, SearchQuery query) throws MailboxException { - return mailboxes.flatMap(mailbox -> getSearchResultStream(session, query, mailbox)) - .collect(Guavate.toImmutableList()); + private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException { + return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox))); } private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) { @@ -165,12 +164,10 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { } } - private List<MessageId> getAsMessageIds(List<SearchResult> temp, long limit) { - return temp.stream() - .map(searchResult -> searchResult.getMessageId().get()) + private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) { + return temp.map(searchResult -> searchResult.getMessageId().get()) .filter(SearchUtil.distinct()) - .limit(Long.valueOf(limit).intValue()) - .collect(Guavate.toImmutableList()); + .take(Long.valueOf(limit).intValue()); } } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java index 30dedcf..03f5900 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/AbstractCombinationManagerTest.java @@ -52,6 +52,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; + public abstract class AbstractCombinationManagerTest { private static final int DEFAULT_MAXIMUM_LIMIT = 256; @@ -163,7 +165,8 @@ public abstract class AbstractCombinationManagerTest { messageIdManager.setInMailboxes(messageId, ImmutableList.of(mailbox1.getMailboxId(), mailbox2.getMailboxId()), session); - assertThat(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)).containsOnly(messageId); + assertThat(Flux.from(mailboxManager.search(multiMailboxesQuery, session, DEFAULT_MAXIMUM_LIMIT)) + .collectList().block()).containsOnly(messageId); } @Test diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java index d4dfaec..c8aef54 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java @@ -233,7 +233,7 @@ public abstract class MailboxMapperACLTest { @Test void findMailboxesShouldReturnEmptyWhenNone() throws MailboxException { - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)).isEmpty(); + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()).isEmpty(); } @Test @@ -246,7 +246,7 @@ public abstract class MailboxMapperACLTest { .rights(rights) .asReplacement()); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read)).isEmpty(); + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block()).isEmpty(); } @Test @@ -258,7 +258,7 @@ public abstract class MailboxMapperACLTest { .rights(rights) .asAddition()); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()) .containsOnly(benwaInboxMailbox); } @@ -278,10 +278,10 @@ public abstract class MailboxMapperACLTest { .rights(newRights) .asReplacement()); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Read).collectList().block()) .containsOnly(benwaInboxMailbox); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()) .isEmpty(); } @@ -302,7 +302,7 @@ public abstract class MailboxMapperACLTest { .rights(new Rfc4314Rights()) .build()); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()) .isEmpty(); } @@ -321,7 +321,7 @@ public abstract class MailboxMapperACLTest { .rights(initialRights) .asRemoval()); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()) .isEmpty(); } @@ -336,7 +336,7 @@ public abstract class MailboxMapperACLTest { .asReplacement()); mailboxMapper.delete(benwaInboxMailbox); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER, Right.Administer).collectList().block()) .isEmpty(); } @@ -349,9 +349,9 @@ public abstract class MailboxMapperACLTest { new MailboxACL.Entry(user1, new Rfc4314Rights(Right.Administer)), new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read)))); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block()) .containsOnly(benwaInboxMailbox); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Read).collectList().block()) .containsOnly(benwaInboxMailbox); } @@ -367,7 +367,7 @@ public abstract class MailboxMapperACLTest { mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL( new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Read)))); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER_1, Right.Administer).collectList().block()) .isEmpty(); } @@ -383,7 +383,7 @@ public abstract class MailboxMapperACLTest { mailboxMapper.setACL(benwaInboxMailbox, new MailboxACL( new MailboxACL.Entry(user2, new Rfc4314Rights(Right.Write)))); - assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write)) + assertThat(mailboxMapper.findNonPersonalMailboxes(USER_2, Right.Write).collectList().block()) .containsOnly(benwaInboxMailbox); } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java index 3fdf808..998a862 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperTest.java @@ -193,7 +193,8 @@ public abstract class MailboxMapperTest { .build() .asUserBound(); - List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery); + List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery) + .collectList().block(); assertMailboxes(mailboxes).containOnly(bobInboxMailbox); } @@ -216,7 +217,8 @@ public abstract class MailboxMapperTest { .build() .asUserBound(); - List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery); + List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery) + .collectList().block(); assertMailboxes(mailboxes).containOnly(benwaWorkMailbox, benwaWorkDoneMailbox, benwaWorkTodoMailbox); } @@ -230,7 +232,8 @@ public abstract class MailboxMapperTest { .build() .asUserBound(); - List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery); + List<Mailbox> mailboxes = mailboxMapper.findMailboxWithPathLike(mailboxQuery) + .collectList().block(); assertMailboxes(mailboxes).containOnly(benwaInboxMailbox); } @@ -244,7 +247,8 @@ public abstract class MailboxMapperTest { .build() .asUserBound(); - assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery)).isEmpty(); + assertThat(mailboxMapper.findMailboxWithPathLike(mailboxQuery) + .collectList().block()).isEmpty(); } @Test @@ -253,38 +257,6 @@ public abstract class MailboxMapperTest { Mailbox actual = mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId()); assertThat(actual).isEqualTo(benwaInboxMailbox); } - - @Test - void findMailboxesByIdShouldReturnEmptyWhenNoIdSupplied() throws MailboxException { - createAll(); - - Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of()); - - assertThat(mailboxes).isEmpty(); - } - - @Test - void findMailboxesByIdShouldReturnMailboxOfSuppliedId() throws MailboxException { - createAll(); - - Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of( - benwaInboxMailbox.getMailboxId(), - benwaWorkMailbox.getMailboxId())); - - assertThat(mailboxes).containsOnly(benwaWorkMailbox, benwaInboxMailbox); - } - - @Test - void findMailboxesByIdShouldFilterOutNonExistingMailbox() throws MailboxException { - createAll(); - mailboxMapper.delete(benwaWorkMailbox); - - Stream<Mailbox> mailboxes = mailboxMapper.findMailboxesById(ImmutableList.of( - benwaInboxMailbox.getMailboxId(), - benwaWorkMailbox.getMailboxId())); - - assertThat(mailboxes).containsOnly(benwaInboxMailbox); - } @Test void findMailboxByIdShouldFailWhenAbsent() throws MailboxException { diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java index d744e15..36cb715 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/quota/DefaultUserQuotaRootResolverTest.java @@ -42,7 +42,7 @@ import org.apache.james.mailbox.store.mail.MailboxMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import com.google.common.collect.Lists; +import reactor.core.publisher.Flux; class DefaultUserQuotaRootResolverTest { @@ -92,7 +92,7 @@ class DefaultUserQuotaRootResolverTest { void retrieveAssociatedMailboxesShouldWork() throws Exception { MailboxMapper mockedMapper = mock(MailboxMapper.class); when(mockedFactory.getMailboxMapper(MAILBOX_SESSION)).thenReturn(mockedMapper); - when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Lists.newArrayList(MAILBOX, MAILBOX_2)); + when(mockedMapper.findMailboxWithPathLike(any())).thenReturn(Flux.just(MAILBOX, MAILBOX_2)); assertThat(testee.retrieveAssociatedMailboxes(QUOTA_ROOT, MAILBOX_SESSION)).containsOnly(MAILBOX, MAILBOX_2); } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java index 8271602..6da8aa0 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/search/AbstractMessageSearchIndexTest.java @@ -248,7 +248,8 @@ public abstract class AbstractMessageSearchIndexTest { List<MessageId> result = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(result) .hasSize(12) @@ -280,7 +281,8 @@ public abstract class AbstractMessageSearchIndexTest { List<MessageId> result = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(result) .containsOnly(m1.getMessageId(), @@ -316,7 +318,8 @@ public abstract class AbstractMessageSearchIndexTest { List<MessageId> result = messageSearchIndex.search(session, ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()), searchQuery, - limit); + limit) + .collectList().block(); assertThat(result) .hasSize(limit); @@ -329,7 +332,8 @@ public abstract class AbstractMessageSearchIndexTest { List<MessageId> result = messageSearchIndex.search(session, ImmutableList.of(), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(result) .isEmpty(); @@ -353,7 +357,8 @@ public abstract class AbstractMessageSearchIndexTest { List<MessageId> result = messageSearchIndex.search(session, ImmutableList.of(mailbox2.getMailboxId(), mailbox.getMailboxId()), searchQuery, - limit); + limit) + .collectList().block(); assertThat(result) .hasSize(limit); @@ -549,7 +554,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId()); } @@ -558,7 +564,8 @@ public abstract class AbstractMessageSearchIndexTest { void multimailboxSearchShouldReturnUidOfMessageMarkedAsSeenInOneMailbox() throws MailboxException { SearchQuery searchQuery = new SearchQuery(SearchQuery.flagIsSet(Flags.Flag.SEEN)); - List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT); + List<MessageId> actual = messageSearchIndex.search(session, ImmutableList.of(mailbox.getMailboxId()), searchQuery, LIMIT) + .collectList().block(); assertThat(actual).containsOnly(m6.getMessageId()); } @@ -571,7 +578,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(actual).containsOnly(mOther.getMessageId(), m8.getMessageId()); } @@ -584,7 +592,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(actual).containsOnly(mOther.getMessageId(), m6.getMessageId()); } @@ -598,7 +607,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - limit); + limit) + .collectList().block(); // Two messages matches this query : mOther and m6 assertThat(actual).hasSize(1); @@ -614,7 +624,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(otherMailbox.getMailboxId()), searchQuery, - limit); + limit) + .collectList().block(); assertThat(actual).contains(m10.getMessageId()); } @@ -1408,7 +1419,8 @@ public abstract class AbstractMessageSearchIndexTest { session, ImmutableList.of(mailbox.getMailboxId(), mailbox2.getMailboxId()), searchQuery, - LIMIT); + LIMIT) + .collectList().block(); assertThat(actual).containsOnly(m1.getMessageId(), m2.getMessageId(), m3.getMessageId(), m4.getMessageId(), m5.getMessageId(), m6.getMessageId(), m7.getMessageId(), m8.getMessageId(), m9.getMessageId(), mOther.getMessageId(), mailWithAttachment.getMessageId(), mailWithInlinedAttachment.getMessageId()); diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java index 2c992cb..1cf5ebb 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/mailbox/CassandraSchemaVersionStartUpCheck.java @@ -69,7 +69,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck { private CheckResult checkUpgradeAbleState() { String upgradeVersionMessage = String.format("Current schema version is %d. Recommended version is %d", - versionManager.computeVersion().getValue(), + versionManager.computeVersion().block().getValue(), versionManager.getMaximumSupportedVersion().getValue()); LOGGER.warn(upgradeVersionMessage); return CheckResult.builder() @@ -93,7 +93,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck { String versionExceedMaximumSupportedMessage = String.format("Current schema version is %d whereas the maximum supported version is %d. " + "Recommended version is %d.", - versionManager.computeVersion().getValue(), + versionManager.computeVersion().block().getValue(), versionManager.getMaximumSupportedVersion().getValue(), versionManager.getMaximumSupportedVersion().getValue()); LOGGER.error(versionExceedMaximumSupportedMessage); @@ -108,7 +108,7 @@ public class CassandraSchemaVersionStartUpCheck implements StartUpCheck { String versionToOldMessage = String.format("Current schema version is %d whereas minimum required version is %d. " + "Recommended version is %d", - versionManager.computeVersion().getValue(), + versionManager.computeVersion().block().getValue(), versionManager.getMinimumSupportedVersion().getValue(), versionManager.getMaximumSupportedVersion().getValue()); LOGGER.error(versionToOldMessage); diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java index e272402..f9b866c 100644 --- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java +++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java @@ -38,6 +38,8 @@ import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import reactor.core.publisher.Flux; + public class FakeMessageSearchIndex extends ListeningMessageSearchIndex { private static class FakeMessageSearchIndexGroup extends Group { @@ -80,7 +82,7 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex { } @Override - public List<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException { + public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) { throw new NotImplementedException("not implemented"); } diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java index 6dc233b..8167ace 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java @@ -97,7 +97,7 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl Preconditions.checkArgument(listSourcesSupportedType.contains(mapping.getType()), "Not supported mapping of type %s", mapping.getType()); - if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION)) { + if (versionManager.isBefore(MAPPINGS_SOURCES_SUPPORTED_VERSION).block()) { return super.listSources(mapping); } diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java index 19c7503..cd31826 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetMessageListMethod.java @@ -19,6 +19,8 @@ package org.apache.james.jmap.draft.methods; +import static org.apache.james.util.ReactorUtils.context; + import java.util.List; import java.util.Optional; import java.util.Set; @@ -39,7 +41,6 @@ import org.apache.james.jmap.draft.utils.FilterToSearchQuery; import org.apache.james.jmap.draft.utils.SortConverter; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxId.Factory; import org.apache.james.mailbox.model.MultimailboxesSearchQuery; @@ -47,10 +48,15 @@ import org.apache.james.mailbox.model.SearchQuery; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; +import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + public class GetMessageListMethod implements Method { private static final long DEFAULT_POSITION = 0; @@ -89,11 +95,18 @@ public class GetMessageListMethod implements Method { } @Override - public Stream<JmapResponse> processToStream(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { + public Flux<JmapResponse> process(JmapRequest request, MethodCallId methodCallId, MailboxSession mailboxSession) { Preconditions.checkArgument(request instanceof GetMessageListRequest); GetMessageListRequest messageListRequest = (GetMessageListRequest) request; + return metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), + () -> process(methodCallId, mailboxSession, messageListRequest) + .subscriberContext(context("GET_MESSAGE_LIST", mdc(messageListRequest)))) + .subscribeOn(Schedulers.elastic()); + } + + private MDCBuilder mdc(GetMessageListRequest messageListRequest) { return MDCBuilder.create() .addContext(MDCBuilder.ACTION, "GET_MESSAGE_LIST") .addContext("accountId", messageListRequest.getAccountId()) @@ -105,38 +118,30 @@ public class GetMessageListMethod implements Method { .addContext("filters", messageListRequest.getFilter()) .addContext("sorts", messageListRequest.getSort()) .addContext("isFetchMessage", messageListRequest.isFetchMessages()) - .addContext("isCollapseThread", messageListRequest.isCollapseThreads()) - .wrapArround( - () -> metricFactory.runPublishingTimerMetricLogP99(JMAP_PREFIX + METHOD_NAME.getName(), - () -> process(methodCallId, mailboxSession, messageListRequest))) - .get(); + .addContext("isCollapseThread", messageListRequest.isCollapseThreads()); } - private Stream<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) { - GetMessageListResponse messageListResponse = getMessageListResponse(messageListRequest, mailboxSession); - Stream<JmapResponse> jmapResponse = Stream.of(JmapResponse.builder().methodCallId(methodCallId) - .response(messageListResponse) - .responseName(RESPONSE_NAME) - .build()); - return Stream.concat(jmapResponse, - processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession)); + private Flux<JmapResponse> process(MethodCallId methodCallId, MailboxSession mailboxSession, GetMessageListRequest messageListRequest) { + return getMessageListResponse(messageListRequest, mailboxSession) + .flatMapMany(messageListResponse -> Flux.concat( + Mono.just(JmapResponse.builder().methodCallId(methodCallId) + .response(messageListResponse) + .responseName(RESPONSE_NAME) + .build()), + processGetMessages(messageListRequest, messageListResponse, methodCallId, mailboxSession))); } - private GetMessageListResponse getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) { - GetMessageListResponse.Builder builder = GetMessageListResponse.builder(); - try { - MultimailboxesSearchQuery searchQuery = convertToSearchQuery(messageListRequest); - Long postionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION); - mailboxManager.search(searchQuery, - mailboxSession, - postionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit)) - .stream() - .skip(postionValue) - .forEach(builder::messageId); - return builder.build(); - } catch (MailboxException e) { - throw new RuntimeException(e); - } + private Mono<GetMessageListResponse> getMessageListResponse(GetMessageListRequest messageListRequest, MailboxSession mailboxSession) { + Mono<MultimailboxesSearchQuery> searchQuery = Mono.fromCallable(() -> convertToSearchQuery(messageListRequest)) + .subscribeOn(Schedulers.parallel()); + Long positionValue = messageListRequest.getPosition().map(Number::asLong).orElse(DEFAULT_POSITION); + long limit = positionValue + messageListRequest.getLimit().map(Number::asLong).orElse(maximumLimit); + + return searchQuery + .flatMapMany(Throwing.function(query -> mailboxManager.search(query, mailboxSession, limit))) + .skip(positionValue) + .reduce(GetMessageListResponse.builder(), GetMessageListResponse.Builder::messageId) + .map(GetMessageListResponse.Builder::build); } private MultimailboxesSearchQuery convertToSearchQuery(GetMessageListRequest messageListRequest) { @@ -174,15 +179,15 @@ public class GetMessageListMethod implements Method { }); } - private Stream<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) { + private Flux<JmapResponse> processGetMessages(GetMessageListRequest messageListRequest, GetMessageListResponse messageListResponse, MethodCallId methodCallId, MailboxSession mailboxSession) { if (shouldChainToGetMessages(messageListRequest)) { GetMessagesRequest getMessagesRequest = GetMessagesRequest.builder() .ids(messageListResponse.getMessageIds()) .properties(messageListRequest.getFetchMessageProperties()) .build(); - return getMessagesMethod.processToStream(getMessagesRequest, methodCallId, mailboxSession); + return getMessagesMethod.process(getMessagesRequest, methodCallId, mailboxSession); } - return Stream.empty(); + return Flux.empty(); } private boolean shouldChainToGetMessages(GetMessageListRequest messageListRequest) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java index dca90fc..4abc540 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/ReferenceUpdater.java @@ -49,6 +49,8 @@ import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.Iterables; +import reactor.core.publisher.Flux; + public class ReferenceUpdater { public static final String X_FORWARDED_ID_HEADER = "X-Forwarded-Message-Id"; public static final Flags FORWARDED_FLAG = new Flags("$Forwarded"); @@ -90,7 +92,8 @@ public class ReferenceUpdater { MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery .from(new SearchQuery(SearchQuery.mimeMessageID(messageId))) .build(); - List<MessageId> references = mailboxManager.search(searchByRFC822MessageId, session, limit); + List<MessageId> references = Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)) + .collectList().block(); try { MessageId reference = Iterables.getOnlyElement(references); List<MailboxId> mailboxIds = messageIdManager.getMessage(reference, FetchGroup.MINIMAL, session).stream() diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java index 1679fd9..823867d 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/ExtractMDNOriginalJMAPMessageId.java @@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; +import reactor.core.publisher.Flux; + /** * This mailet handles MDN messages and define a header X-JAMES-MDN-JMAP-MESSAGE-ID referencing * the original message (by its Jmap Id) asking for the recipient to send an MDN. @@ -107,7 +109,7 @@ public class ExtractMDNOriginalJMAPMessageId extends GenericMailet { MultimailboxesSearchQuery searchByRFC822MessageId = MultimailboxesSearchQuery .from(new SearchQuery(SearchQuery.mimeMessageID(messageId))) .build(); - return mailboxManager.search(searchByRFC822MessageId, session, limit).stream().findFirst(); + return Flux.from(mailboxManager.search(searchByRFC822MessageId, session, limit)).toStream().findFirst(); } catch (MailboxException | UsersRepositoryException e) { LOGGER.error("unable to find message with Message-Id: " + messageId, e); } diff --git a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java index 5990161..3e7dcb7 100644 --- a/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java +++ b/server/protocols/webadmin/webadmin-mailbox-deleted-message-vault/src/test/java/org/apache/james/webadmin/vault/routes/DeletedMessagesVaultRoutesTest.java @@ -2262,7 +2262,8 @@ class DeletedMessagesVaultRoutesTest { MailboxSession session = mailboxManager.createSystemSession(username); int limitToOneMessage = 1; - return !mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage) + return !Flux.from(mailboxManager.search(MultimailboxesSearchQuery.from(new SearchQuery()).build(), session, limitToOneMessage)) + .collectList().block() .isEmpty(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
