JAMES-2555 Read a list of UID then proceed per-message re-indexing This avoids loading a full mailbox blobs in memory
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3f637e7b Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3f637e7b Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3f637e7b Branch: refs/heads/master Commit: 3f637e7bb896203d8f060e0922135c9ee6a4af40 Parents: fbe7900 Author: Benoit Tellier <[email protected]> Authored: Tue Oct 9 09:05:00 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Fri Oct 12 15:27:34 2018 +0700 ---------------------------------------------------------------------- .../mailbox/tools/indexer/ReIndexerImpl.java | 84 ++++++++++++-------- 1 file changed, 49 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/3f637e7b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java ---------------------------------------------------------------------- diff --git a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java index 72a6746..f239e32 100644 --- a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java +++ b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java @@ -19,7 +19,6 @@ package org.apache.mailbox.tools.indexer; -import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -27,6 +26,7 @@ import javax.inject.Inject; import org.apache.james.mailbox.MailboxManager; import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.indexer.ReIndexer; import org.apache.james.mailbox.model.MailboxPath; @@ -36,6 +36,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex; +import org.apache.james.util.streams.Iterators; import org.apache.mailbox.tools.indexer.events.ImpactingEventType; import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent; import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration; @@ -43,6 +44,7 @@ import org.apache.mailbox.tools.indexer.registrations.MailboxRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.fge.lambdas.Throwing; import com.google.common.collect.Lists; /** @@ -60,7 +62,9 @@ import com.google.common.collect.Lists; public class ReIndexerImpl implements ReIndexer { private static final Logger LOGGER = LoggerFactory.getLogger(ReIndexerImpl.class); - public static final int NO_LIMIT = 0; + + private static final int NO_LIMIT = 0; + private static final int SINGLE_MESSAGE = 1; private final MailboxManager mailboxManager; private final ListeningMessageSearchIndex messageSearchIndex; @@ -97,28 +101,7 @@ public class ReIndexerImpl implements ReIndexer { LOGGER.info("Full reindex finished"); } - private void reIndex(MailboxPath path, MailboxSession mailboxSession) throws MailboxException { - MailboxRegistration mailboxRegistration = new MailboxRegistration(path); - LOGGER.info("Intend to reindex {}",path); - Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path); - messageSearchIndex.deleteAll(mailboxSession, mailbox); - mailboxManager.addListener(path, mailboxRegistration, mailboxSession); - try { - handleMailboxIndexingIterations(mailboxSession, - mailboxRegistration, - mailbox, - mailboxSessionMapperFactory.getMessageMapper(mailboxSession) - .findInMailbox(mailbox, - MessageRange.all(), - MessageMapper.FetchType.Full, - NO_LIMIT)); - LOGGER.info("Finish to reindex {}", path); - } finally { - mailboxManager.removeListener(path, mailboxRegistration, mailboxSession); - } - } - - private void handleFullReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration globalRegistration) throws MailboxException { + private void handleFullReindexingIterations(List<MailboxPath> mailboxPaths, GlobalRegistration globalRegistration) { for (MailboxPath mailboxPath : mailboxPaths) { Optional<MailboxPath> pathToIndex = globalRegistration.getPathToIndex(mailboxPath); if (pathToIndex.isPresent()) { @@ -131,21 +114,37 @@ public class ReIndexerImpl implements ReIndexer { } } - private void handleMailboxIndexingIterations(MailboxSession mailboxSession, MailboxRegistration mailboxRegistration, Mailbox mailbox, Iterator<MailboxMessage> iterator) throws MailboxException { - while (iterator.hasNext()) { - MailboxMessage message = iterator.next(); - Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(message.getUid())); - - impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags); - if (wasNotDeleted(impactingMessageEvent)) { - messageSearchIndex.add(mailboxSession, mailbox, message); - } + private void reIndex(MailboxPath path, MailboxSession mailboxSession) throws MailboxException { + MailboxRegistration mailboxRegistration = new MailboxRegistration(path); + LOGGER.info("Intend to reindex {}",path); + Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path); + messageSearchIndex.deleteAll(mailboxSession, mailbox); + mailboxManager.addListener(path, mailboxRegistration, mailboxSession); + try { + Iterators.toStream( + mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailbox(mailbox, MessageRange.all(), MessageMapper.FetchType.Metadata, NO_LIMIT)) + .map(MailboxMessage::getUid) + .forEach(uid -> handleMessageReIndexing(mailboxSession, mailboxRegistration, mailbox, uid)); + LOGGER.info("Finish to reindex {}", path); + } finally { + mailboxManager.removeListener(path, mailboxRegistration, mailboxSession); } } - private boolean wasNotDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent) { - return impactingMessageEvent.map(event -> !event.wasDeleted()).orElse(true); + private void handleMessageReIndexing(MailboxSession mailboxSession, MailboxRegistration mailboxRegistration, Mailbox mailbox, MessageUid uid) { + try { + Optional<ImpactingMessageEvent> impactingMessageEvent = findMostRelevant(mailboxRegistration.getImpactingEvents(uid)); + + Optional.of(uid) + .filter(x -> !wasDeleted(impactingMessageEvent)) + .flatMap(Throwing.function(mUid -> fullyReadMessage(mailboxSession, mailbox, mUid))) + .map(message -> messageUpdateRegardingEvents(message, impactingMessageEvent)) + .ifPresent(Throwing.consumer(message -> messageSearchIndex.add(mailboxSession, mailbox, message))); + } catch (Exception e) { + LOGGER.warn("ReIndexing failed for {} {}", mailbox.generateAssociatedPath(), uid, e); + } } private Optional<ImpactingMessageEvent> findMostRelevant(List<ImpactingMessageEvent> messageEvents) { @@ -157,4 +156,19 @@ public class ReIndexerImpl implements ReIndexer { return Lists.reverse(messageEvents).stream().findFirst(); } + private boolean wasDeleted(Optional<ImpactingMessageEvent> impactingMessageEvent) { + return impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false); + } + + private MailboxMessage messageUpdateRegardingEvents(MailboxMessage message, Optional<ImpactingMessageEvent> impactingMessageEvent) { + impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags); + return message; + } + + private Optional<MailboxMessage> fullyReadMessage(MailboxSession mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException { + return Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession) + .findInMailbox(mailbox, MessageRange.one(mUid), MessageMapper.FetchType.Full, SINGLE_MESSAGE)) + .findFirst(); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
