MAILBOX-357 ListeningSearchIndex should not rely on "available messages"
This method is not part of the mailbox-api and messages will not be carried along in distributed events. AddedImpl no longer need to carry along cached messages. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/07489896 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/07489896 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/07489896 Branch: refs/heads/master Commit: 074898961aa1847639f1a583ec693cc799773cbf Parents: 8d2db2c Author: Benoit Tellier <[email protected]> Authored: Mon Dec 10 14:00:13 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Wed Dec 12 17:50:58 2018 +0700 ---------------------------------------------------------------------- .../mailbox/store/StoreMessageManager.java | 4 +- .../james/mailbox/store/event/EventFactory.java | 16 +- .../store/event/MailboxEventDispatcher.java | 8 +- .../search/ListeningMessageSearchIndex.java | 319 +++++++++---------- .../processor/base/SelectedMailboxImplTest.java | 3 +- 5 files changed, 169 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/07489896/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java index fc5492f..434c948 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java @@ -739,7 +739,7 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana for (MailboxMessage message : originalRows.getEntriesSeen()) { messagesMap.put(message.getUid(), immutableMailboxMessageFactory.from(to.getMailboxEntity().getMailboxId(), message)); } - dispatcher.added(session, copiedUids, to.getMailboxEntity(), messagesMap.build()); + dispatcher.added(session, copiedUids, to.getMailboxEntity()); dispatcher.moved(session, MessageMoves.builder() .previousMailboxIds(getMailboxEntity().getMailboxId()) @@ -759,7 +759,7 @@ public class StoreMessageManager implements org.apache.james.mailbox.MessageMana for (MailboxMessage message : originalRows.getEntriesSeen()) { messagesMap.put(message.getUid(), immutableMailboxMessageFactory.from(to.getMailboxEntity().getMailboxId(), message)); } - dispatcher.added(session, moveUids, to.getMailboxEntity(), messagesMap.build()); + dispatcher.added(session, moveUids, to.getMailboxEntity()); dispatcher.expunged(session, collectMetadata(moveResult.getOriginalMessages()), getMailboxEntity()); dispatcher.moved(session, MessageMoves.builder() http://git-wip-us.apache.org/repos/asf/james-project/blob/07489896/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java index dd47954..1fd51e5 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/EventFactory.java @@ -51,14 +51,12 @@ public class EventFactory { public final class AddedImpl extends MailboxListener.Added implements MailboxAware { private final Map<MessageUid, MessageMetaData> added; - private final Map<MessageUid, MailboxMessage> availableMessages; private final Mailbox mailbox; - public AddedImpl(MailboxSession.SessionId sessionId, User user, Mailbox mailbox, SortedMap<MessageUid, MessageMetaData> uids, Map<MessageUid, MailboxMessage> availableMessages) { + public AddedImpl(MailboxSession.SessionId sessionId, User user, Mailbox mailbox, SortedMap<MessageUid, MessageMetaData> uids) { super(sessionId, user, new StoreMailboxPath(mailbox), mailbox.getMailboxId()); this.added = ImmutableMap.copyOf(uids); this.mailbox = mailbox; - this.availableMessages = ImmutableMap.copyOf(availableMessages); } @Override @@ -75,10 +73,6 @@ public class EventFactory { public Mailbox getMailbox() { return mailbox; } - - public Map<MessageUid, MailboxMessage> getAvailableMessages() { - return availableMessages; - } } public final class ExpungedImpl extends MailboxListener.Expunged implements MailboxAware { @@ -193,12 +187,12 @@ public class EventFactory { } } - public MailboxListener.Added added(MailboxSession session, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox, Map<MessageUid, MailboxMessage> cachedMessages) { - return added(session.getSessionId(), session.getUser().getCoreUser(), uids, mailbox, cachedMessages); + public MailboxListener.Added added(MailboxSession session, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox) { + return added(session.getSessionId(), session.getUser().getCoreUser(), uids, mailbox); } - public MailboxListener.Added added(MailboxSession.SessionId sessionId, User user, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox, Map<MessageUid, MailboxMessage> cachedMessages) { - return new AddedImpl(sessionId, user, mailbox, uids, cachedMessages); + public MailboxListener.Added added(MailboxSession.SessionId sessionId, User user, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox) { + return new AddedImpl(sessionId, user, mailbox, uids); } public MailboxListener.Expunged expunged(MailboxSession session, Map<MessageUid, MessageMetaData> uids, Mailbox mailbox) { http://git-wip-us.apache.org/repos/asf/james-project/blob/07489896/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxEventDispatcher.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxEventDispatcher.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxEventDispatcher.java index 206e8b7..2d6afd9 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxEventDispatcher.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/event/MailboxEventDispatcher.java @@ -81,8 +81,8 @@ public class MailboxEventDispatcher { * @param uids Sorted map with uids and message meta data * @param mailbox The mailbox */ - public void added(MailboxSession session, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox, Map<MessageUid, MailboxMessage> cachedMessages) { - listener.event(eventFactory.added(session, uids, mailbox, cachedMessages)); + public void added(MailboxSession session, SortedMap<MessageUid, MessageMetaData> uids, Mailbox mailbox) { + listener.event(eventFactory.added(session, uids, mailbox)); } public void added(MailboxSession session, Mailbox mailbox, MailboxMessage mailboxMessage) { @@ -90,14 +90,14 @@ public class MailboxEventDispatcher { SortedMap<MessageUid, MessageMetaData> metaDataMap = ImmutableSortedMap.<MessageUid, MessageMetaData>naturalOrder() .put(messageMetaData.getUid(), messageMetaData) .build(); - added(session, metaDataMap, mailbox, ImmutableMap.of(mailboxMessage.getUid(), mailboxMessage)); + added(session, metaDataMap, mailbox); } public void added(MailboxSession session, MessageMetaData messageMetaData, Mailbox mailbox) { SortedMap<MessageUid, MessageMetaData> metaDataMap = ImmutableSortedMap.<MessageUid, MessageMetaData>naturalOrder() .put(messageMetaData.getUid(), messageMetaData) .build(); - added(session, metaDataMap, mailbox, ImmutableMap.<MessageUid, MailboxMessage>of()); + added(session, metaDataMap, mailbox); } /** http://git-wip-us.apache.org/repos/asf/james-project/blob/07489896/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java ---------------------------------------------------------------------- diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java index 01a0d21..104291e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java @@ -1,162 +1,157 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.mailbox.store.search; - -import java.util.List; -import java.util.Optional; - -import org.apache.james.mailbox.Event; -import org.apache.james.mailbox.MailboxListener; -import org.apache.james.mailbox.MailboxManager; -import org.apache.james.mailbox.MailboxSession; -import org.apache.james.mailbox.MessageUid; -import org.apache.james.mailbox.exception.MailboxException; -import org.apache.james.mailbox.model.MessageRange; -import org.apache.james.mailbox.model.UpdatedFlags; -import org.apache.james.mailbox.store.MailboxSessionMapperFactory; -import org.apache.james.mailbox.store.event.EventFactory; -import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; -import org.apache.james.mailbox.store.mail.model.Mailbox; -import org.apache.james.mailbox.store.mail.model.MailboxMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link MessageSearchIndex} which needs to get registered as global {@link MailboxListener} and so get - * notified about message changes. This will then allow to update the underlying index. - * - * - */ -public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener { - private static final Logger LOGGER = LoggerFactory.getLogger(ListeningMessageSearchIndex.class); - - private static final int UNLIMITED = -1; - private final MailboxSessionMapperFactory factory; - private final MailboxManager mailboxManager; - - public ListeningMessageSearchIndex(MailboxSessionMapperFactory factory, MailboxManager mailboxManager) { - this.factory = factory; - this.mailboxManager = mailboxManager; - } - - /** - * Process the {@link org.apache.james.mailbox.Event} and update the index if - * something relevant is received - */ - @Override - public void event(Event event) { - try { - MailboxSession session = mailboxManager.createSystemSession(event.getUser().asString()); - if (event instanceof MessageEvent) { - if (event instanceof EventFactory.AddedImpl) { - EventFactory.AddedImpl added = (EventFactory.AddedImpl) event; - Mailbox mailbox = added.getMailbox(); - - for (MessageUid uid : added.getUids()) { - retrieveMailboxMessage(session, added, mailbox, uid) - .ifPresent(mailboxMessage -> addMessage(session, mailbox, mailboxMessage)); - } - } else if (event instanceof EventFactory.ExpungedImpl) { - EventFactory.ExpungedImpl expunged = (EventFactory.ExpungedImpl) event; - try { - delete(session, expunged.getMailbox(), expunged.getUids()); - } catch (MailboxException e) { - LOGGER.error("Unable to deleted messages {} from index for mailbox {}", expunged.getUids(), expunged.getMailbox(), e); - } - } else if (event instanceof EventFactory.FlagsUpdatedImpl) { - EventFactory.FlagsUpdatedImpl flagsUpdated = (EventFactory.FlagsUpdatedImpl) event; - Mailbox mailbox = flagsUpdated.getMailbox(); - - try { - update(session, mailbox, flagsUpdated.getUpdatedFlags()); - } catch (MailboxException e) { - LOGGER.error("Unable to update flags in index for mailbox {}", mailbox, e); - } - } - } else if (event instanceof EventFactory.MailboxDeletionImpl) { - deleteAll(session, ((EventFactory.MailboxDeletionImpl) event).getMailbox()); - } - } catch (MailboxException e) { - LOGGER.error("Unable to update index", e); - } - } - - private Optional<MailboxMessage> retrieveMailboxMessage(MailboxSession session, EventFactory.AddedImpl added, Mailbox mailbox, MessageUid next) { - Optional<MailboxMessage> firstChoice = Optional.ofNullable(added.getAvailableMessages().get(next)); - if (firstChoice.isPresent()) { - return firstChoice; - } else { - try { - return Optional.of(factory.getMessageMapper(session) - .findInMailbox(mailbox, MessageRange.one(next), FetchType.Full, UNLIMITED) - .next()); - } catch (Exception e) { - LOGGER.error("Could not retrieve message {} in mailbox {}", next, mailbox.getMailboxId().serialize(), e); - return Optional.empty(); - } - } - } - - private void addMessage(MailboxSession session, Mailbox mailbox, MailboxMessage message) { - try { - add(session, mailbox, message); - } catch (MailboxException e) { - LOGGER.error("Unable to index message {} for mailbox {}", message.getUid(), mailbox, e); - } - } - - /** - * Add the {@link MailboxMessage} for the given {@link Mailbox} to the index - * - * @param session The mailbox session performing the message addition - * @param mailbox mailbox on which the message addition was performed - * @param message The added message - * @throws MailboxException - */ - public abstract void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws MailboxException; - - /** - * Delete the concerned UIDs for the given {@link Mailbox} from the index - * - * @param session The mailbox session performing the expunge - * @param mailbox mailbox on which the expunge was performed - * @param expungedUids UIDS to be deleted - * @throws MailboxException - */ - public abstract void delete(MailboxSession session, Mailbox mailbox, List<MessageUid> expungedUids) throws MailboxException; - - /** - * Delete the messages contained in the given {@link Mailbox} from the index - * - * @param session The mailbox session performing the expunge - * @param mailbox mailbox on which the expunge was performed - * @throws MailboxException - */ - public abstract void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxException; - - /** - * Update the messages concerned by the updated flags list for the given {@link Mailbox} - * - * @param session session that performed the update - * @param mailbox mailbox containing the updated messages - * @param updatedFlagsList list of flags that were updated - * @throws MailboxException - */ - public abstract void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws MailboxException; -} +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.mailbox.store.search; + +import java.util.List; +import java.util.Optional; + +import org.apache.james.mailbox.Event; +import org.apache.james.mailbox.MailboxListener; +import org.apache.james.mailbox.MailboxManager; +import org.apache.james.mailbox.MailboxSession; +import org.apache.james.mailbox.MessageUid; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.mailbox.model.UpdatedFlags; +import org.apache.james.mailbox.store.MailboxSessionMapperFactory; +import org.apache.james.mailbox.store.event.EventFactory; +import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; +import org.apache.james.mailbox.store.mail.model.Mailbox; +import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link MessageSearchIndex} which needs to get registered as global {@link MailboxListener} and so get + * notified about message changes. This will then allow to update the underlying index. + * + * + */ +public abstract class ListeningMessageSearchIndex implements MessageSearchIndex, MailboxListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ListeningMessageSearchIndex.class); + + private static final int UNLIMITED = -1; + private final MailboxSessionMapperFactory factory; + private final MailboxManager mailboxManager; + + public ListeningMessageSearchIndex(MailboxSessionMapperFactory factory, MailboxManager mailboxManager) { + this.factory = factory; + this.mailboxManager = mailboxManager; + } + + /** + * Process the {@link org.apache.james.mailbox.Event} and update the index if + * something relevant is received + */ + @Override + public void event(Event event) { + try { + MailboxSession session = mailboxManager.createSystemSession(event.getUser().asString()); + if (event instanceof MessageEvent) { + if (event instanceof EventFactory.AddedImpl) { + EventFactory.AddedImpl added = (EventFactory.AddedImpl) event; + Mailbox mailbox = added.getMailbox(); + + for (MessageUid uid : added.getUids()) { + retrieveMailboxMessage(session, mailbox, uid) + .ifPresent(mailboxMessage -> addMessage(session, mailbox, mailboxMessage)); + } + } else if (event instanceof EventFactory.ExpungedImpl) { + EventFactory.ExpungedImpl expunged = (EventFactory.ExpungedImpl) event; + try { + delete(session, expunged.getMailbox(), expunged.getUids()); + } catch (MailboxException e) { + LOGGER.error("Unable to deleted messages {} from index for mailbox {}", expunged.getUids(), expunged.getMailbox(), e); + } + } else if (event instanceof EventFactory.FlagsUpdatedImpl) { + EventFactory.FlagsUpdatedImpl flagsUpdated = (EventFactory.FlagsUpdatedImpl) event; + Mailbox mailbox = flagsUpdated.getMailbox(); + + try { + update(session, mailbox, flagsUpdated.getUpdatedFlags()); + } catch (MailboxException e) { + LOGGER.error("Unable to update flags in index for mailbox {}", mailbox, e); + } + } + } else if (event instanceof EventFactory.MailboxDeletionImpl) { + deleteAll(session, ((EventFactory.MailboxDeletionImpl) event).getMailbox()); + } + } catch (MailboxException e) { + LOGGER.error("Unable to update index", e); + } + } + + private Optional<MailboxMessage> retrieveMailboxMessage(MailboxSession session, Mailbox mailbox, MessageUid uid) { + try { + return Optional.of(factory.getMessageMapper(session) + .findInMailbox(mailbox, MessageRange.one(uid), FetchType.Full, UNLIMITED) + .next()); + } catch (Exception e) { + LOGGER.error("Could not retrieve message {} in mailbox {}", uid.asLong(), mailbox.getMailboxId().serialize(), e); + return Optional.empty(); + } + } + + private void addMessage(MailboxSession session, Mailbox mailbox, MailboxMessage message) { + try { + add(session, mailbox, message); + } catch (MailboxException e) { + LOGGER.error("Unable to index message {} for mailbox {}", message.getUid(), mailbox, e); + } + } + + /** + * Add the {@link MailboxMessage} for the given {@link Mailbox} to the index + * + * @param session The mailbox session performing the message addition + * @param mailbox mailbox on which the message addition was performed + * @param message The added message + * @throws MailboxException + */ + public abstract void add(MailboxSession session, Mailbox mailbox, MailboxMessage message) throws MailboxException; + + /** + * Delete the concerned UIDs for the given {@link Mailbox} from the index + * + * @param session The mailbox session performing the expunge + * @param mailbox mailbox on which the expunge was performed + * @param expungedUids UIDS to be deleted + * @throws MailboxException + */ + public abstract void delete(MailboxSession session, Mailbox mailbox, List<MessageUid> expungedUids) throws MailboxException; + + /** + * Delete the messages contained in the given {@link Mailbox} from the index + * + * @param session The mailbox session performing the expunge + * @param mailbox mailbox on which the expunge was performed + * @throws MailboxException + */ + public abstract void deleteAll(MailboxSession session, Mailbox mailbox) throws MailboxException; + + /** + * Update the messages concerned by the updated flags list for the given {@link Mailbox} + * + * @param session session that performed the update + * @param mailbox mailbox containing the updated messages + * @param updatedFlagsList list of flags that were updated + * @throws MailboxException + */ + public abstract void update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) throws MailboxException; +} http://git-wip-us.apache.org/repos/asf/james-project/blob/07489896/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java ---------------------------------------------------------------------- diff --git a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java index dfb1797..7973ab2 100644 --- a/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java +++ b/protocols/imap/src/test/java/org/apache/james/imap/processor/base/SelectedMailboxImplTest.java @@ -63,7 +63,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; public class SelectedMailboxImplTest { @@ -171,6 +170,6 @@ public class SelectedMailboxImplTest { TreeMap<MessageUid, MessageMetaData> result = new TreeMap<>(); result.put(EMITTED_EVENT_UID, new SimpleMessageMetaData(EMITTED_EVENT_UID, MOD_SEQ, new Flags(), SIZE, new Date(), new DefaultMessageId())); mailboxListener.event(new EventFactory().added(MailboxSession.SessionId.of(random.nextLong()), - mock(User.class), result, mailbox, ImmutableMap.of())); + mock(User.class), result, mailbox)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
