This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 94c5faf60a7ede4c7d26e77039a3c6b45ff8687d Author: Benoit Tellier <[email protected]> AuthorDate: Tue May 5 09:06:27 2020 +0700 JAMES-3149 Reactify ReIndexing Concurrency model: - Sequential processing of mailboxes by using `concatMap` - Use `flatMap` to control messages concurrency (constant: 50) --- .../cassandra/mail/CassandraMessageMapper.java | 20 ++- .../james/mailbox/jpa/mail/JPAMessageMapper.java | 8 +- .../jpa/mail/TransactionalMessageMapper.java | 4 +- .../inmemory/mail/InMemoryMessageIdMapper.java | 16 +- .../mailbox/store/mail/AbstractMessageMapper.java | 9 +- .../james/mailbox/store/mail/MailboxMapper.java | 2 +- .../james/mailbox/store/mail/MessageMapper.java | 12 +- .../store/search/SimpleMessageSearchIndex.java | 8 +- .../StoreMailboxMessageResultIteratorTest.java | 6 +- .../store/mail/model/MessageMapperTest.java | 9 +- .../tools/indexer/ErrorRecoveryIndexationTask.java | 2 +- .../mailbox/tools/indexer/FullReindexingTask.java | 11 +- .../tools/indexer/MessageIdReIndexingTask.java | 2 +- .../mailbox/tools/indexer/ReIndexerPerformer.java | 194 +++++++++------------ .../tools/indexer/SingleMailboxReindexingTask.java | 3 +- .../tools/indexer/SingleMessageReindexingTask.java | 15 +- .../mailbox/tools/indexer/UserReindexingTask.java | 11 +- 17 files changed, 167 insertions(+), 165 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index cb31434..2e41530 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -109,12 +109,10 @@ public class CassandraMessageMapper implements MessageMapper { } @Override - public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) { + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all()) - .map(metaData -> metaData.getComposedMessageId().getUid()) - .toIterable() - .iterator(); + .map(metaData -> metaData.getComposedMessageId().getUid()); } @Override @@ -163,14 +161,20 @@ public class CassandraMessageMapper implements MessageMapper { @Override public Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int max) { + return findInMailboxReactive(mailbox, messageRange, ftype, max) + .toIterable() + .iterator(); + } + + @Override + public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limit) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return Limit.from(max).applyOnFlux( + + return Limit.from(limit).applyOnFlux( messageIdDAO.retrieveMessages(mailboxId, messageRange) .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize())) .map(MailboxMessage.class::cast) - .sort(Comparator.comparing(MailboxMessage::getUid)) - .toIterable() - .iterator(); + .sort(Comparator.comparing(MailboxMessage::getUid)); } private Mono<MailboxMessage> retrieveMessage(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java index 9ef0bbc..c853b03 100644 --- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java +++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/JPAMessageMapper.java @@ -58,7 +58,8 @@ import org.apache.openjpa.persistence.ArgumentException; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; + +import reactor.core.publisher.Flux; /** * JPA implementation of a {@link MessageMapper}. This class is not thread-safe! @@ -88,8 +89,9 @@ public class JPAMessageMapper extends JPATransactionalMapper implements MessageM } @Override - public Iterator<MessageUid> listAllMessageUids(final Mailbox mailbox) throws MailboxException { - return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Full, UNLIMITED), MailboxMessage::getUid); + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { + return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED) + .map(MailboxMessage::getUid); } @Override diff --git a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java index c6f8f6d..1e7d09b 100644 --- a/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java +++ b/mailbox/jpa/src/test/java/org/apache/james/mailbox/jpa/mail/TransactionalMessageMapper.java @@ -40,6 +40,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.transaction.Mapper; +import reactor.core.publisher.Flux; + public class TransactionalMessageMapper implements MessageMapper { private final JPAMessageMapper messageMapper; @@ -62,7 +64,7 @@ public class TransactionalMessageMapper implements MessageMapper { } @Override - public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException { + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { return messageMapper.listAllMessageUids(mailbox); } diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index 5e18dad..4c8e0af 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -40,13 +40,14 @@ import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.MessageIdMapper; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; -import org.apache.james.util.streams.Iterators; import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimap; +import reactor.core.publisher.Flux; + public class InMemoryMessageIdMapper implements MessageIdMapper { private final MailboxMapper mailboxMapper; private final InMemoryMessageMapper messageMapper; @@ -58,16 +59,19 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { @Override public List<MailboxMessage> find(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { - return mailboxMapper.list() - .flatMap(Throwing.function(mailbox -> - Iterators.toFlux( - messageMapper.findInMailbox(mailbox, MessageRange.all(), fetchType, UNLIMITED)))) - .filter(message -> messageIds.contains(message.getMessageId())) + return findReactive(messageIds, fetchType) .collect(Guavate.toImmutableList()) .block(); } @Override + public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) { + return mailboxMapper.list() + .flatMap(mailbox -> messageMapper.findInMailboxReactive(mailbox, MessageRange.all(), fetchType, UNLIMITED)) + .filter(message -> messageIds.contains(message.getMessageId())); + } + + @Override public List<MailboxId> findMailboxes(MessageId messageId) { return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata) .stream() diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java index 5a993d4..9fd5cf3 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java @@ -39,7 +39,8 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.transaction.TransactionalMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; + +import reactor.core.publisher.Flux; /** * Abstract base class for {@link MessageMapper} implementation @@ -149,8 +150,8 @@ public abstract class AbstractMessageMapper extends TransactionalMapper implemen protected abstract MessageMetaData copy(Mailbox mailbox, MessageUid uid, ModSeq modSeq, MailboxMessage original) throws MailboxException; @Override - public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException { - return Iterators.transform(findInMailbox(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED), - MailboxMessage::getUid); + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { + return findInMailboxReactive(mailbox, MessageRange.all(), FetchType.Metadata, UNLIMITED) + .map(MailboxMessage::getUid); } } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java index c5aaf7e..059356e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MailboxMapper.java @@ -138,5 +138,5 @@ public interface MailboxMapper extends Mapper { /** * Return a unmodifable {@link List} of all {@link Mailbox} */ - Flux<Mailbox> list() throws MailboxException; + Flux<Mailbox> list(); } \ No newline at end of file diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java index d7c8428..2c79390 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java @@ -37,7 +37,9 @@ import org.apache.james.mailbox.store.FlagsUpdateCalculator; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.Property; import org.apache.james.mailbox.store.transaction.Mapper; +import org.apache.james.util.streams.Iterators; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** @@ -58,6 +60,14 @@ public interface MessageMapper extends Mapper { Iterator<MailboxMessage> findInMailbox(Mailbox mailbox, MessageRange set, FetchType type, int limit) throws MailboxException; + default Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange set, FetchType type, int limit) { + try { + return Iterators.toFlux(findInMailbox(mailbox, set, type, limit)); + } catch (MailboxException e) { + return Flux.error(e); + } + } + /** * Returns a list of {@link MessageUid} which are marked as deleted */ @@ -147,7 +157,7 @@ public interface MessageMapper extends Mapper { /** * Return a list containing all MessageUid of Messages that belongs to given {@link Mailbox} */ - Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException; + Flux<MessageUid> listAllMessageUids(Mailbox mailbox); /** * Specify what data needs to get filled in a {@link MailboxMessage} before returning it diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java index b33f980..3560b96 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/SimpleMessageSearchIndex.java @@ -58,6 +58,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; /** * {@link MessageSearchIndex} which just fetch {@link MailboxMessage}'s from the {@link MessageMapper} and use {@link MessageSearcher} @@ -155,8 +156,9 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { return getAsMessageIds(searchResults(session, filteredMailboxes, searchQuery), limit); } - private Flux<SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException { - return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox))); + private Flux<? extends SearchResult> searchResults(MailboxSession session, Flux<Mailbox> mailboxes, SearchQuery query) throws MailboxException { + return mailboxes.concatMap(mailbox -> Flux.fromStream(getSearchResultStream(session, query, mailbox))) + .subscribeOn(Schedulers.elastic()); } private Stream<? extends SearchResult> getSearchResultStream(MailboxSession session, SearchQuery query, Mailbox mailbox) { @@ -167,7 +169,7 @@ public class SimpleMessageSearchIndex implements MessageSearchIndex { } } - private Flux<MessageId> getAsMessageIds(Flux<SearchResult> temp, long limit) { + private Flux<MessageId> getAsMessageIds(Flux<? extends SearchResult> temp, long limit) { return temp.map(searchResult -> searchResult.getMessageId().get()) .filter(SearchUtil.distinct()) .take(Long.valueOf(limit).intValue()); diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java index 9f56121..c20c807 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/StoreMailboxMessageResultIteratorTest.java @@ -51,6 +51,8 @@ import org.junit.jupiter.api.Test; import com.google.common.collect.Iterables; +import reactor.core.publisher.Flux; + class StoreMailboxMessageResultIteratorTest { private final class TestMessageMapper implements MessageMapper { @@ -61,8 +63,8 @@ class StoreMailboxMessageResultIteratorTest { } @Override - public Iterator<MessageUid> listAllMessageUids(Mailbox mailbox) throws MailboxException { - return messageRange.iterator(); + public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { + return Flux.fromIterable(messageRange); } @Override diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java index 66ee7bf..99c7186 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java @@ -1134,8 +1134,7 @@ public abstract class MessageMapperTest { void getUidsShouldReturnUidsOfMessagesInTheMailbox() throws Exception { saveMessages(); - assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox)) - .toIterable() + assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block()) .containsOnly(message1.getUid(), message2.getUid(), message3.getUid(), @@ -1149,8 +1148,7 @@ public abstract class MessageMapperTest { messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message2.getUid(), message3.getUid())); - assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox)) - .toIterable() + assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block()) .containsOnly(message1.getUid(), message4.getUid(), message5.getUid()); @@ -1166,8 +1164,7 @@ public abstract class MessageMapperTest { List<MessageUid> uids = messageMapper.retrieveMessagesMarkedForDeletion(benwaInboxMailbox, MessageRange.all()); messageMapper.deleteMessages(benwaInboxMailbox, uids); - assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox)) - .toIterable() + assertThat(messageMapper.listAllMessageUids(benwaInboxMailbox).collectList().block()) .containsOnly(message1.getUid(), message5.getUid()); } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java index 73259cb..716f578 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ErrorRecoveryIndexationTask.java @@ -73,7 +73,7 @@ public class ErrorRecoveryIndexationTask implements Task { @Override public Result run() { - return reIndexerPerformer.reIndex(reprocessingContext, previousFailures); + return reIndexerPerformer.reIndex(reprocessingContext, previousFailures).block(); } @Override diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java index 39aa268..0e99cdb 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/FullReindexingTask.java @@ -24,7 +24,6 @@ import java.util.Optional; import javax.inject.Inject; import org.apache.james.json.DTOModule; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.server.task.json.dto.TaskDTO; import org.apache.james.server.task.json.dto.TaskDTOModule; import org.apache.james.task.Task; @@ -33,6 +32,8 @@ import org.apache.james.task.TaskType; import com.fasterxml.jackson.annotation.JsonProperty; +import reactor.core.publisher.Mono; + public class FullReindexingTask implements Task { public static final TaskType FULL_RE_INDEXING = TaskType.of("full-reindexing"); @@ -73,11 +74,9 @@ public class FullReindexingTask implements Task { @Override public Result run() { - try { - return reIndexerPerformer.reIndex(reprocessingContext); - } catch (MailboxException e) { - return Result.PARTIAL; - } + return reIndexerPerformer.reIndex(reprocessingContext) + .onErrorResume(e -> Mono.just(Result.PARTIAL)) + .block(); } @Override diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java index 369b7c4..191d0ab 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/MessageIdReIndexingTask.java @@ -78,7 +78,7 @@ public class MessageIdReIndexingTask implements Task { @Override public Result run() { - return reIndexerPerformer.handleMessageIdReindexing(messageId); + return reIndexerPerformer.handleMessageIdReindexing(messageId).block(); } @Override diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java index e5640b5..8efcb38 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerPerformer.java @@ -19,16 +19,12 @@ package org.apache.mailbox.tools.indexer; -import java.util.Optional; -import java.util.stream.Stream; - import javax.inject.Inject; import org.apache.james.core.Username; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxId; @@ -37,24 +33,28 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.model.search.MailboxQuery; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; -import org.apache.james.mailbox.store.mail.MailboxMapper; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; import org.apache.james.task.Task; -import org.apache.james.util.streams.Iterators; +import org.apache.james.task.Task.Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class ReIndexerPerformer { private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerPerformer.class); private static final int SINGLE_MESSAGE = 1; + private static final int MESSAGE_CONCURRENCY = 50; private static final String RE_INDEXING = "re-indexing"; private static final Username RE_INDEXER_PERFORMER_USER = Username.of(RE_INDEXING); + public static final int NO_CONCURRENCY = 1; + public static final int NO_PREFETCH = 1; private final MailboxManager mailboxManager; private final ListeningMessageSearchIndex messageSearchIndex; @@ -69,138 +69,116 @@ public class ReIndexerPerformer { this.mailboxSessionMapperFactory = mailboxSessionMapperFactory; } - Task.Result reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) throws Exception { - LOGGER.info("Intend to reindex mailbox with mailboxId {}", mailboxId.serialize()); + Mono<Result> reIndex(MailboxId mailboxId, ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId); - messageSearchIndex.deleteAll(mailboxSession, mailboxId).block(); - try { - return Iterators.toStream( - mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .listAllMessageUids(mailbox)) - .map(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext)) + + return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + .findMailboxByIdReactive(mailboxId) + .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox)); + } + + private Mono<Result> reIndex(ReprocessingContext reprocessingContext, MailboxSession mailboxSession, Mailbox mailbox) { + LOGGER.info("Attempt to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()); + return messageSearchIndex.deleteAll(mailboxSession, mailbox.getMailboxId()) + .then(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .listAllMessageUids(mailbox) + .flatMap(uid -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext), MESSAGE_CONCURRENCY) .reduce(Task::combine) - .orElse(Task.Result.COMPLETED); - } finally { - LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailboxId.serialize()); - } + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> LOGGER.info("Finish to reindex mailbox with mailboxId {}", mailbox.getMailboxId().serialize()))); } - Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) { - return previousReIndexingFailures.failures() - .stream() - .map(previousFailure -> reIndex(reprocessingContext, previousFailure)) + Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures previousReIndexingFailures) { + return Flux.fromIterable(previousReIndexingFailures.failures()) + .flatMap(previousFailure -> reIndex(reprocessingContext, previousFailure), MESSAGE_CONCURRENCY) .reduce(Task::combine) - .orElse(Task.Result.COMPLETED); + .switchIfEmpty(Mono.just(Result.COMPLETED)); } - private Task.Result reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { + private Mono<Result> reIndex(ReprocessingContext reprocessingContext, ReIndexingExecutionFailures.ReIndexingFailure previousReIndexingFailure) { MailboxId mailboxId = previousReIndexingFailure.getMailboxId(); MessageUid uid = previousReIndexingFailure.getUid(); - try { - return handleMessageReIndexing(mailboxId, uid, reprocessingContext); - } catch (MailboxException e) { - LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid); - return Task.Result.PARTIAL; - } + + return handleMessageReIndexing(mailboxId, uid, reprocessingContext) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {} {}", mailboxId, uid, e); + reprocessingContext.recordFailureDetailsForMessage(mailboxId, uid); + return Mono.just(Result.PARTIAL); + }); } - Task.Result reIndex(ReprocessingContext reprocessingContext) throws MailboxException { + Mono<Result> reIndex(ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); LOGGER.info("Starting a full reindex"); - Stream<MailboxId> mailboxIds = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() - .map(Mailbox::getMailboxId) - .toStream(); - - try { - return reIndex(mailboxIds, reprocessingContext); - } finally { - LOGGER.info("Full reindex finished"); - } + return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).list() + .flatMap(mailbox -> reIndex(reprocessingContext, mailboxSession, mailbox), NO_CONCURRENCY, NO_PREFETCH) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> LOGGER.info("Full reindex finished")); } - Task.Result reIndex(Username username, ReprocessingContext reprocessingContext) throws MailboxException { + Mono<Result> reIndex(Username username, ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(username); LOGGER.info("Starting a reindex for user {}", username.asString()); - Stream<MailboxId> mailboxIds = mailboxManager.search(MailboxQuery.privateMailboxesBuilder(mailboxSession).build(), mailboxSession) - .stream() - .map(MailboxMetaData::getId); + MailboxQuery mailboxQuery = MailboxQuery.privateMailboxesBuilder(mailboxSession).build(); - try { - return reIndex(mailboxIds, reprocessingContext); - } finally { - LOGGER.info("User {} reindex finished", username.asString()); - } + return mailboxManager.searchReactive(mailboxQuery, mailboxSession) + .map(MailboxMetaData::getId) + .flatMap(id -> reIndex(id, reprocessingContext), NO_CONCURRENCY, NO_PREFETCH) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .doFinally(any -> LOGGER.info("User {} reindex finished", username.asString())); } - Task.Result handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) throws MailboxException { + Mono<Result> handleMessageReIndexing(MailboxId mailboxId, MessageUid uid, ReprocessingContext reprocessingContext) { MailboxSession mailboxSession = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId); - return handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext); + return mailboxSessionMapperFactory.getMailboxMapper(mailboxSession) + .findMailboxByIdReactive(mailboxId) + .flatMap(mailbox -> handleMessageReIndexing(mailboxSession, mailbox, uid, reprocessingContext)); } - Task.Result handleMessageIdReindexing(MessageId messageId) { - try { - MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - - return mailboxSessionMapperFactory.getMessageIdMapper(session) - .find(ImmutableList.of(messageId), MessageMapper.FetchType.Full) - .stream() - .map(mailboxMessage -> reIndex(mailboxMessage, session)) - .reduce(Task::combine) - .orElse(Task.Result.COMPLETED); - } catch (Exception e) { - LOGGER.warn("Failed to re-index {}", messageId, e); - return Task.Result.PARTIAL; - } - } + Mono<Result> handleMessageIdReindexing(MessageId messageId) { + MailboxSession session = mailboxManager.createSystemSession(RE_INDEXER_PERFORMER_USER); - private Task.Result reIndex(MailboxMessage mailboxMessage, MailboxSession session) { - try { - MailboxMapper mailboxMapper = mailboxSessionMapperFactory.getMailboxMapper(session); - Mailbox mailbox = mailboxMapper.findMailboxById(mailboxMessage.getMailboxId()); - messageSearchIndex.add(session, mailbox, mailboxMessage).block(); - return Task.Result.COMPLETED; - } catch (Exception e) { - LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e); - return Task.Result.PARTIAL; - } + return mailboxSessionMapperFactory.getMessageIdMapper(session) + .findReactive(ImmutableList.of(messageId), MessageMapper.FetchType.Full) + .flatMap(mailboxMessage -> reIndex(mailboxMessage, session)) + .reduce(Task::combine) + .switchIfEmpty(Mono.just(Result.COMPLETED)) + .onErrorResume(e -> { + LOGGER.warn("Failed to re-index {}", messageId, e); + return Mono.just(Result.PARTIAL); + }); } - private Task.Result reIndex(Stream<MailboxId> mailboxIds, ReprocessingContext reprocessingContext) { - return mailboxIds - .map(mailboxId -> { - try { - return reIndex(mailboxId, reprocessingContext); - } catch (Throwable e) { - LOGGER.error("Error while proceeding to full reindexing on mailbox with mailboxId {}", mailboxId.serialize(), e); - return Task.Result.PARTIAL; - } - }) - .reduce(Task::combine) - .orElse(Task.Result.COMPLETED); + private Mono<Result> reIndex(MailboxMessage mailboxMessage, MailboxSession session) { + return mailboxSessionMapperFactory.getMailboxMapper(session) + .findMailboxByIdReactive(mailboxMessage.getMailboxId()) + .flatMap(mailbox -> messageSearchIndex.add(session, mailbox, mailboxMessage)) + .thenReturn(Result.COMPLETED) + .onErrorResume(e -> { + LOGGER.warn("Failed to re-index {} in {}", mailboxMessage.getUid(), mailboxMessage.getMailboxId(), e); + return Mono.just(Result.PARTIAL); + }); } - private Task.Result handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) { - try { - Optional.of(uid) - .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid))) - .ifPresent(message -> messageSearchIndex.add(mailboxSession, mailbox, message).block()); - reprocessingContext.recordSuccess(); - return Task.Result.COMPLETED; - } catch (Exception e) { - LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); - reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid); - return Task.Result.PARTIAL; - } + private Mono<Result> handleMessageReIndexing(MailboxSession mailboxSession, Mailbox mailbox, MessageUid uid, ReprocessingContext reprocessingContext) { + return fullyReadMessage(mailboxSession, mailbox, uid) + .flatMap(message -> messageSearchIndex.add(mailboxSession, mailbox, message)) + .thenReturn(Result.COMPLETED) + .doOnNext(any -> reprocessingContext.recordSuccess()) + .onErrorResume(e -> { + LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); + reprocessingContext.recordFailureDetailsForMessage(mailbox.getMailboxId(), uid); + return Mono.just(Result.PARTIAL); + }); } - private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException { - return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)) - .findFirst(); + private Mono<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) { + return mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailboxReactive(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE) + .next(); } } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java index 737be90..0e9580d 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMailboxReindexingTask.java @@ -79,7 +79,8 @@ public class SingleMailboxReindexingTask implements Task { @Override public Result run() { try { - return reIndexerPerformer.reIndex(mailboxId, reprocessingContext); + return reIndexerPerformer.reIndex(mailboxId, reprocessingContext) + .block(); } catch (Exception e) { return Result.PARTIAL; } diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java index 77ea683..08c014b 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/SingleMessageReindexingTask.java @@ -26,7 +26,6 @@ import java.util.Optional; import javax.inject.Inject; import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.MailboxId; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; @@ -34,6 +33,8 @@ import org.apache.james.task.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + public class SingleMessageReindexingTask implements Task { private static final Logger LOGGER = LoggerFactory.getLogger(SingleMessageReindexingTask.class); @@ -95,12 +96,12 @@ public class SingleMessageReindexingTask implements Task { @Override public Result run() { - try { - return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext()); - } catch (MailboxException e) { - LOGGER.warn("Error encounteres while reindexing {} : {}", mailboxId, uid, e); - return Result.PARTIAL; - } + return reIndexerPerformer.handleMessageReIndexing(mailboxId, uid, new ReprocessingContext()) + .onErrorResume(e -> { + LOGGER.warn("Error encountered while reindexing {} : {}", mailboxId, uid, e); + return Mono.just(Result.PARTIAL); + }) + .block(); } MailboxId getMailboxId() { diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java index 9a7a863..d1ff42b 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/UserReindexingTask.java @@ -26,12 +26,13 @@ import java.util.Optional; import javax.inject.Inject; import org.apache.james.core.Username; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexingExecutionFailures; import org.apache.james.task.Task; import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskType; +import reactor.core.publisher.Mono; + public class UserReindexingTask implements Task { public static final TaskType USER_RE_INDEXING = TaskType.of("user-reindexing"); @@ -78,11 +79,9 @@ public class UserReindexingTask implements Task { @Override public Result run() { - try { - return reIndexerPerformer.reIndex(username, reprocessingContext); - } catch (MailboxException e) { - return Result.PARTIAL; - } + return reIndexerPerformer.reIndex(username, reprocessingContext) + .onErrorResume(e -> Mono.just(Result.PARTIAL)) + .block(); } public Username getUsername() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
