This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d5b4b3a4c53d34f231c9ae62d3c13151e0fcc56f Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Tue Apr 7 16:03:31 2020 +0700 JAMES-3136 Update message denormalisation tables concurrently --- .../cassandra/mail/CassandraMessageIdMapper.java | 24 +++++++++++++--------- .../cassandra/mail/CassandraMessageMapper.java | 11 +++++----- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 3cfd722..47cd80b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -130,12 +130,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public void save(MailboxMessage mailboxMessage) throws MailboxException { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); mailboxMapper.findMailboxById(mailboxId); - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage); + messageDAO.save(mailboxMessage) - .thenEmpty(imapUidDAO.insert(composedMessageIdWithMetaData)) - .thenEmpty(messageIdDAO.insert(composedMessageIdWithMetaData) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF)) - .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .thenEmpty(saveMessageMetadata(mailboxMessage, mailboxId)) .block(); } @@ -143,14 +140,21 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public void copyInMailbox(MailboxMessage mailboxMessage) throws MailboxException { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); mailboxMapper.findMailboxById(mailboxId); - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage); - Flux.merge( - imapUidDAO.insert(composedMessageIdWithMetaData), - messageIdDAO.insert(composedMessageIdWithMetaData)) - .thenEmpty(indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + + saveMessageMetadata(mailboxMessage, mailboxId) .block(); } + private Mono<Void> saveMessageMetadata(MailboxMessage mailboxMessage, CassandraId mailboxId) { + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = createMetadataFor(mailboxMessage); + return imapUidDAO.insert(composedMessageIdWithMetaData) + .thenEmpty(Flux.merge( + messageIdDAO.insert(composedMessageIdWithMetaData) + .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF), + indexTableHandler.updateIndexOnAdd(mailboxMessage, mailboxId)) + .then()); + } + private ComposedMessageIdWithMetaData createMetadataFor(MailboxMessage mailboxMessage) { ComposedMessageId composedMessageId = new ComposedMessageId( mailboxMessage.getMailboxId(), diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 64eaf35..7d189c2 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -271,8 +271,6 @@ public class CassandraMessageMapper implements MessageMapper { return block(addUidAndModseq(message, mailboxId) .flatMap(Throwing.function(messageWithUidAndModSeq -> save(mailbox, messageWithUidAndModSeq) .thenReturn(messageWithUidAndModSeq))) - .flatMap(messageWithUidAndModSeq -> indexTableHandler.updateIndexOnAdd(message, mailboxId) - .thenReturn(messageWithUidAndModSeq)) .map(MailboxMessage::metaData)); } @@ -393,8 +391,6 @@ public class CassandraMessageMapper implements MessageMapper { return block(addUidAndModseq(message, mailboxId) .flatMap(messageWithUidAndModseq -> insertIds(messageWithUidAndModseq, mailboxId) .thenReturn(messageWithUidAndModseq)) - .flatMap(messageWithUidAndModseq -> indexTableHandler.updateIndexOnAdd(message, mailboxId) - .thenReturn(messageWithUidAndModseq)) .map(MailboxMessage::metaData)); } @@ -411,8 +407,11 @@ public class CassandraMessageMapper implements MessageMapper { .modSeq(message.getModSeq()) .build(); return imapUidDAO.insert(composedMessageIdWithMetaData) - .then(messageIdDAO.insert(composedMessageIdWithMetaData) - .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF)); + .then(Flux.merge( + messageIdDAO.insert(composedMessageIdWithMetaData) + .retryBackoff(MAX_RETRY, MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF), + indexTableHandler.updateIndexOnAdd(message, mailboxId)) + .then()); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org