JAMES-2555 Read a list of UID then proceed per-message re-indexing

This avoids loading a full mailbox blobs in memory


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/3f637e7b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/3f637e7b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/3f637e7b

Branch: refs/heads/master
Commit: 3f637e7bb896203d8f060e0922135c9ee6a4af40
Parents: fbe7900
Author: Benoit Tellier <[email protected]>
Authored: Tue Oct 9 09:05:00 2018 +0700
Committer: Benoit Tellier <[email protected]>
Committed: Fri Oct 12 15:27:34 2018 +0700

----------------------------------------------------------------------
 .../mailbox/tools/indexer/ReIndexerImpl.java    | 84 ++++++++++++--------
 1 file changed, 49 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/3f637e7b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
----------------------------------------------------------------------
diff --git 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
index 72a6746..f239e32 100644
--- 
a/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
+++ 
b/mailbox/tools/indexer/src/main/java/org/apache/mailbox/tools/indexer/ReIndexerImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.mailbox.tools.indexer;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 
@@ -27,6 +26,7 @@ import javax.inject.Inject;
 
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.indexer.ReIndexer;
 import org.apache.james.mailbox.model.MailboxPath;
@@ -36,6 +36,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.search.ListeningMessageSearchIndex;
+import org.apache.james.util.streams.Iterators;
 import org.apache.mailbox.tools.indexer.events.ImpactingEventType;
 import org.apache.mailbox.tools.indexer.events.ImpactingMessageEvent;
 import org.apache.mailbox.tools.indexer.registrations.GlobalRegistration;
@@ -43,6 +44,7 @@ import 
org.apache.mailbox.tools.indexer.registrations.MailboxRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.Lists;
 
 /**
@@ -60,7 +62,9 @@ import com.google.common.collect.Lists;
 public class ReIndexerImpl implements ReIndexer {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ReIndexerImpl.class);
-    public static final int NO_LIMIT = 0;
+
+    private static final int NO_LIMIT = 0;
+    private static final int SINGLE_MESSAGE = 1;
 
     private final MailboxManager mailboxManager;
     private final ListeningMessageSearchIndex messageSearchIndex;
@@ -97,28 +101,7 @@ public class ReIndexerImpl implements ReIndexer {
         LOGGER.info("Full reindex finished");
     }
 
-    private void reIndex(MailboxPath path, MailboxSession mailboxSession) 
throws MailboxException {
-        MailboxRegistration mailboxRegistration = new 
MailboxRegistration(path);
-        LOGGER.info("Intend to reindex {}",path);
-        Mailbox mailbox = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
-        messageSearchIndex.deleteAll(mailboxSession, mailbox);
-        mailboxManager.addListener(path, mailboxRegistration, mailboxSession);
-        try {
-            handleMailboxIndexingIterations(mailboxSession,
-                mailboxRegistration,
-                mailbox,
-                mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
-                    .findInMailbox(mailbox,
-                        MessageRange.all(),
-                        MessageMapper.FetchType.Full,
-                        NO_LIMIT));
-            LOGGER.info("Finish to reindex {}", path);
-        } finally {
-            mailboxManager.removeListener(path, mailboxRegistration, 
mailboxSession);
-        }
-    }
-
-    private void handleFullReindexingIterations(List<MailboxPath> 
mailboxPaths, GlobalRegistration globalRegistration) throws MailboxException {
+    private void handleFullReindexingIterations(List<MailboxPath> 
mailboxPaths, GlobalRegistration globalRegistration) {
         for (MailboxPath mailboxPath : mailboxPaths) {
             Optional<MailboxPath> pathToIndex = 
globalRegistration.getPathToIndex(mailboxPath);
             if (pathToIndex.isPresent()) {
@@ -131,21 +114,37 @@ public class ReIndexerImpl implements ReIndexer {
         }
     }
 
-    private void handleMailboxIndexingIterations(MailboxSession 
mailboxSession, MailboxRegistration mailboxRegistration, Mailbox mailbox, 
Iterator<MailboxMessage> iterator) throws MailboxException {
-        while (iterator.hasNext()) {
-            MailboxMessage message = iterator.next();
-            Optional<ImpactingMessageEvent> impactingMessageEvent = 
findMostRelevant(mailboxRegistration.getImpactingEvents(message.getUid()));
-
-            
impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags);
 
-            if (wasNotDeleted(impactingMessageEvent)) {
-                messageSearchIndex.add(mailboxSession, mailbox, message);
-            }
+    private void reIndex(MailboxPath path, MailboxSession mailboxSession) 
throws MailboxException {
+        MailboxRegistration mailboxRegistration = new 
MailboxRegistration(path);
+        LOGGER.info("Intend to reindex {}",path);
+        Mailbox mailbox = 
mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxByPath(path);
+        messageSearchIndex.deleteAll(mailboxSession, mailbox);
+        mailboxManager.addListener(path, mailboxRegistration, mailboxSession);
+        try {
+            Iterators.toStream(
+                mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+                    .findInMailbox(mailbox, MessageRange.all(), 
MessageMapper.FetchType.Metadata, NO_LIMIT))
+                .map(MailboxMessage::getUid)
+                .forEach(uid -> handleMessageReIndexing(mailboxSession, 
mailboxRegistration, mailbox, uid));
+            LOGGER.info("Finish to reindex {}", path);
+        } finally {
+            mailboxManager.removeListener(path, mailboxRegistration, 
mailboxSession);
         }
     }
 
-    private boolean wasNotDeleted(Optional<ImpactingMessageEvent> 
impactingMessageEvent) {
-        return impactingMessageEvent.map(event -> 
!event.wasDeleted()).orElse(true);
+    private void handleMessageReIndexing(MailboxSession mailboxSession, 
MailboxRegistration mailboxRegistration, Mailbox mailbox, MessageUid uid) {
+        try {
+            Optional<ImpactingMessageEvent> impactingMessageEvent = 
findMostRelevant(mailboxRegistration.getImpactingEvents(uid));
+
+            Optional.of(uid)
+                .filter(x -> !wasDeleted(impactingMessageEvent))
+                .flatMap(Throwing.function(mUid -> 
fullyReadMessage(mailboxSession, mailbox, mUid)))
+                .map(message -> messageUpdateRegardingEvents(message, 
impactingMessageEvent))
+                .ifPresent(Throwing.consumer(message -> 
messageSearchIndex.add(mailboxSession, mailbox, message)));
+        } catch (Exception e) {
+            LOGGER.warn("ReIndexing failed for {} {}", 
mailbox.generateAssociatedPath(), uid, e);
+        }
     }
 
     private Optional<ImpactingMessageEvent> 
findMostRelevant(List<ImpactingMessageEvent> messageEvents) {
@@ -157,4 +156,19 @@ public class ReIndexerImpl implements ReIndexer {
         return Lists.reverse(messageEvents).stream().findFirst();
     }
 
+    private boolean wasDeleted(Optional<ImpactingMessageEvent> 
impactingMessageEvent) {
+        return 
impactingMessageEvent.map(ImpactingMessageEvent::wasDeleted).orElse(false);
+    }
+
+    private MailboxMessage messageUpdateRegardingEvents(MailboxMessage 
message, Optional<ImpactingMessageEvent> impactingMessageEvent) {
+        
impactingMessageEvent.flatMap(ImpactingMessageEvent::newFlags).ifPresent(message::setFlags);
+        return message;
+    }
+
+    private Optional<MailboxMessage> fullyReadMessage(MailboxSession 
mailboxSession, Mailbox mailbox, MessageUid mUid) throws MailboxException {
+        return 
Iterators.toStream(mailboxSessionMapperFactory.getMessageMapper(mailboxSession)
+            .findInMailbox(mailbox, MessageRange.one(mUid), 
MessageMapper.FetchType.Full, SINGLE_MESSAGE))
+            .findFirst();
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to