MAILBOX-297 Avoid reading message table upon flags updates MailboxMessage metadata (UID, mailboxID, modseq, MessageId and Flags) are enough
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0535d4b5 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0535d4b5 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0535d4b5 Branch: refs/heads/master Commit: 0535d4b5b129c89635b3bbe64f1d820cceb87d0e Parents: d496bb2 Author: benwa <[email protected]> Authored: Mon May 22 10:43:38 2017 +0700 Committer: benwa <[email protected]> Committed: Mon May 29 16:58:49 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 39 +++++++++----------- 1 file changed, 17 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0535d4b5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index b256153..349cd08 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -280,7 +280,7 @@ public class CassandraMessageMapper implements MessageMapper { @Override public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return retrieveMessages(retrieveMessageIds(mailboxId, set), FetchType.Metadata, Optional.empty()) + return messageIdDAO.retrieveMessages(mailboxId, set) .join() .flatMap(message -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, message)) .map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags) @@ -329,26 +329,24 @@ public class CassandraMessageMapper implements MessageMapper { imapUidDAO.insert(composedMessageIdWithMetaData)); } - private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) { - return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message) + private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, composedMessageIdWithMetaData) .map(Stream::of) - .orElse(handleRetries(mailbox, flagUpdateCalculator, message)); + .orElse(handleRetries(mailbox, flagUpdateCalculator, composedMessageIdWithMetaData)); } - private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox mailbox, MailboxMessage message) { + private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData oldMetaData) { try { - long oldModSeq = message.getModSeq(); - Flags oldFlags = message.createFlags(); + long oldModSeq = oldMetaData.getModSeq(); + Flags oldFlags = oldMetaData.getFlags(); Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags); long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, involveFlagsChanges); - message.setFlags(newFlags); - message.setModSeq(newModSeq); - if (updateFlags(message, oldModSeq)) { + if (updateFlags(oldMetaData, newFlags, newModSeq)) { return Optional.of(UpdatedFlags.builder() - .uid(message.getUid()) + .uid(oldMetaData.getComposedMessageId().getUid()) .modSeq(newModSeq) .oldFlags(oldFlags) .newFlags(newFlags) @@ -372,13 +370,13 @@ public class CassandraMessageMapper implements MessageMapper { return oldFlags.equals(newFlags); } - private boolean updateFlags(MailboxMessage message, long oldModSeq) { + private boolean updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) { ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() - .composedMessageId(new ComposedMessageId(message.getMailboxId(), message.getMessageId(), message.getUid())) - .modSeq(message.getModSeq()) - .flags(message.createFlags()) + .composedMessageId(oldMetadata.getComposedMessageId()) + .modSeq(newModSeq) + .flags(newFlags) .build(); - return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldModSeq) + return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldMetadata.getModSeq()) .thenCompose(success -> Optional.of(success) .filter(b -> b) .map((Boolean any) -> messageIdDAO.updateMetadata(composedMessageIdWithMetaData) @@ -387,12 +385,12 @@ public class CassandraMessageMapper implements MessageMapper { .join(); } - private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) { + private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData oldMetaData) { try { return Stream.of( new FunctionRunnerWithRetry(maxRetries) .executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox, - message.getMessageId(), + oldMetaData.getComposedMessageId().getMessageId(), flagUpdateCalculator))); } catch (MessageDeletedDuringFlagsUpdateException e) { mailboxSession.getLog().warn(e.getMessage()); @@ -413,9 +411,6 @@ public class CassandraMessageMapper implements MessageMapper { .orElseThrow(MailboxDeleteDuringUpdateException::new); return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, - messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData), FetchType.Metadata, Optional.empty()).join() - .findFirst() - .map(pair -> pair.getLeft().toMailboxMessage(ImmutableList.of())) - .orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(cassandraId, (CassandraMessageId) messageId))); + composedMessageIdWithMetaData); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
