This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d5b4b3a4c53d34f231c9ae62d3c13151e0fcc56f
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Tue Apr 7 16:03:31 2020 +0700

    JAMES-3136 Update message denormalisation tables concurrently
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 24 +++++++++++++---------
 .../cassandra/mail/CassandraMessageMapper.java     | 11 +++++-----
 2 files changed, 19 insertions(+), 16 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3cfd722..47cd80b 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -130,12 +130,9 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
         mailboxMapper.findMailboxById(mailboxId);
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
createMetadataFor(mailboxMessage);
+
         messageDAO.save(mailboxMessage)
-            .thenEmpty(imapUidDAO.insert(composedMessageIdWithMetaData))
-            .thenEmpty(messageIdDAO.insert(composedMessageIdWithMetaData)
-                .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF))
-            .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, 
mailboxId))
+            .thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId))
             .block();
     }
 
@@ -143,14 +140,21 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     public void copyInMailbox(MailboxMessage mailboxMessage) throws 
MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
         mailboxMapper.findMailboxById(mailboxId);
-        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
createMetadataFor(mailboxMessage);
-        Flux.merge(
-                imapUidDAO.insert(composedMessageIdWithMetaData),
-                messageIdDAO.insert(composedMessageIdWithMetaData))
-            .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, 
mailboxId))
+
+        saveMessageMetadata(mailboxMessage, mailboxId)
             .block();
     }
 
+    private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, 
CassandraId mailboxId) {
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
createMetadataFor(mailboxMessage);
+        return imapUidDAO.insert(composedMessageIdWithMetaData)
+            .thenEmpty(Flux.merge(
+                messageIdDAO.insert(composedMessageIdWithMetaData)
+                    .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, 
MAX_RETRY_BACKOFF),
+                indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId))
+            .then());
+    }
+
     private ComposedMessageIdWithMetaData createMetadataFor(MailboxMessage 
mailboxMessage) {
         ComposedMessageId composedMessageId = new ComposedMessageId(
             mailboxMessage.getMailboxId(),
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 64eaf35..7d189c2 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -271,8 +271,6 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return block(addUidAndModseq(message, mailboxId)
             .flatMap(Throwing.function(messageWithUidAndModSeq -> 
save(mailbox, messageWithUidAndModSeq)
                 .thenReturn(messageWithUidAndModSeq)))
-            .flatMap(messageWithUidAndModSeq -> 
indexTableHandler.updateIndexOnAdd(message, mailboxId)
-                .thenReturn(messageWithUidAndModSeq))
             .map(MailboxMessage::metaData));
     }
 
@@ -393,8 +391,6 @@ public class CassandraMessageMapper implements 
MessageMapper {
         return block(addUidAndModseq(message, mailboxId)
             .flatMap(messageWithUidAndModseq -> 
insertIds(messageWithUidAndModseq, mailboxId)
                 .thenReturn(messageWithUidAndModseq))
-            .flatMap(messageWithUidAndModseq -> 
indexTableHandler.updateIndexOnAdd(message, mailboxId)
-                .thenReturn(messageWithUidAndModseq))
             .map(MailboxMessage::metaData));
     }
 
@@ -411,8 +407,11 @@ public class CassandraMessageMapper implements 
MessageMapper {
                 .modSeq(message.getModSeq())
                 .build();
         return imapUidDAO.insert(composedMessageIdWithMetaData)
-            .then(messageIdDAO.insert(composedMessageIdWithMetaData)
-                .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, 
MAX_RETRY_BACKOFF));
+            .then(Flux.merge(
+                messageIdDAO.insert(composedMessageIdWithMetaData)
+                    .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, 
MAX_RETRY_BACKOFF),
+                indexTableHandler.updateIndexOnAdd(message, mailboxId))
+            .then());
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to