MAILBOX-297 Avoid reading message table upon flags updates

MailboxMessage metadata (UID, mailboxID, modseq, MessageId and Flags) are enough


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0535d4b5
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0535d4b5
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0535d4b5

Branch: refs/heads/master
Commit: 0535d4b5b129c89635b3bbe64f1d820cceb87d0e
Parents: d496bb2
Author: benwa <[email protected]>
Authored: Mon May 22 10:43:38 2017 +0700
Committer: benwa <[email protected]>
Committed: Mon May 29 16:58:49 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 39 +++++++++-----------
 1 file changed, 17 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/0535d4b5/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index b256153..349cd08 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -280,7 +280,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     @Override
     public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return retrieveMessages(retrieveMessageIds(mailboxId, set), 
FetchType.Metadata, Optional.empty())
+        return messageIdDAO.retrieveMessages(mailboxId, set)
                 .join()
                 .flatMap(message -> updateFlagsOnMessage(mailbox, 
flagUpdateCalculator, message))
                 .map((UpdatedFlags updatedFlags) -> 
indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
@@ -329,26 +329,24 @@ public class CassandraMessageMapper implements 
MessageMapper {
                 imapUidDAO.insert(composedMessageIdWithMetaData));
     }
 
-    private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) {
-        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message)
+    private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData 
composedMessageIdWithMetaData) {
+        return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, 
composedMessageIdWithMetaData)
             .map(Stream::of)
-            .orElse(handleRetries(mailbox, flagUpdateCalculator, message));
+            .orElse(handleRetries(mailbox, flagUpdateCalculator, 
composedMessageIdWithMetaData));
     }
 
-    private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator 
flagUpdateCalculator, Mailbox mailbox, MailboxMessage message) {
+    private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator 
flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData 
oldMetaData) {
         try {
-            long oldModSeq = message.getModSeq();
-            Flags oldFlags = message.createFlags();
+            long oldModSeq = oldMetaData.getModSeq();
+            Flags oldFlags = oldMetaData.getFlags();
             Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);
 
             boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags);
             long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, 
involveFlagsChanges);
 
-            message.setFlags(newFlags);
-            message.setModSeq(newModSeq);
-            if (updateFlags(message, oldModSeq)) {
+            if (updateFlags(oldMetaData, newFlags, newModSeq)) {
                 return Optional.of(UpdatedFlags.builder()
-                    .uid(message.getUid())
+                    .uid(oldMetaData.getComposedMessageId().getUid())
                     .modSeq(newModSeq)
                     .oldFlags(oldFlags)
                     .newFlags(newFlags)
@@ -372,13 +370,13 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return oldFlags.equals(newFlags);
     }
 
-    private boolean updateFlags(MailboxMessage message, long oldModSeq) {
+    private boolean updateFlags(ComposedMessageIdWithMetaData oldMetadata, 
Flags newFlags, long newModSeq) {
         ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
-                .composedMessageId(new 
ComposedMessageId(message.getMailboxId(), message.getMessageId(), 
message.getUid()))
-                .modSeq(message.getModSeq())
-                .flags(message.createFlags())
+                .composedMessageId(oldMetadata.getComposedMessageId())
+                .modSeq(newModSeq)
+                .flags(newFlags)
                 .build();
-        return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, 
oldModSeq)
+        return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, 
oldMetadata.getModSeq())
             .thenCompose(success -> Optional.of(success)
                 .filter(b -> b)
                 .map((Boolean any) -> 
messageIdDAO.updateMetadata(composedMessageIdWithMetaData)
@@ -387,12 +385,12 @@ public class CassandraMessageMapper implements 
MessageMapper {
             .join();
     }
 
-    private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) {
+    private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData 
oldMetaData) {
         try {
             return Stream.of(
                 new FunctionRunnerWithRetry(maxRetries)
                     .executeAndRetrieveObject(() -> 
retryMessageFlagsUpdate(mailbox,
-                            message.getMessageId(),
+                            oldMetaData.getComposedMessageId().getMessageId(),
                             flagUpdateCalculator)));
         } catch (MessageDeletedDuringFlagsUpdateException e) {
             mailboxSession.getLog().warn(e.getMessage());
@@ -413,9 +411,6 @@ public class CassandraMessageMapper implements 
MessageMapper {
             .orElseThrow(MailboxDeleteDuringUpdateException::new);
         return tryMessageFlagsUpdate(flagUpdateCalculator,
                 mailbox,
-                
messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData), 
FetchType.Metadata, Optional.empty()).join()
-                    .findFirst()
-                    .map(pair -> 
pair.getLeft().toMailboxMessage(ImmutableList.of()))
-                    .orElseThrow(() -> new 
MessageDeletedDuringFlagsUpdateException(cassandraId, (CassandraMessageId) 
messageId)));
+                composedMessageIdWithMetaData);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to