This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c490eeaff0af77d65a35e90e1f6b4684b9e322ad
Author: Benoit Tellier <[email protected]>
AuthorDate: Sun Mar 28 14:44:20 2021 +0700

    JAMES-3435 Use writeConsistency to determine read consistency needs upon 
writes.
---
 .../CassandraMailboxSessionMapperFactory.java        |  2 +-
 .../mailbox/cassandra/DeleteMessageListener.java     | 20 +++++++++++++++++---
 .../cassandra/mail/CassandraMessageIdMapper.java     | 11 +++++++++--
 .../cassandra/mail/CassandraMessageMapper.java       | 10 +++++++++-
 4 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 26a8a04..671b894 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -217,6 +217,6 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     public DeleteMessageListener deleteMessageListener() {
         return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, 
messageDAOV3, attachmentDAOV2, ownerDAO,
             attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, 
applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
-            mailboxCounterDAO, mailboxRecentsDAO, blobStore);
+            mailboxCounterDAO, mailboxRecentsDAO, blobStore, 
cassandraConfiguration);
     }
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index ddfe04b..d48623b 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra;
 
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
 import static org.apache.james.util.FunctionalUtils.negate;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
 
 import javax.inject.Inject;
 
+import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.events.Event;
 import org.apache.james.events.EventListener;
@@ -95,13 +98,16 @@ public class DeleteMessageListener implements 
EventListener.GroupEventListener {
     private final CassandraMailboxCounterDAO counterDAO;
     private final CassandraMailboxRecentsDAO recentsDAO;
     private final BlobStore blobStore;
+    private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
     public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                  CassandraMessageDAOV3 messageDAOV3, 
CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
                                  CassandraAttachmentMessageIdDAO 
attachmentMessageIdDAO, CassandraACLMapper aclMapper,
                                  CassandraUserMailboxRightsDAO rightsDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
-                                 CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO 
counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) {
+                                 CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO,
+                                 CassandraMailboxCounterDAO counterDAO, 
CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
+                                 CassandraConfiguration 
cassandraConfiguration) {
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
         this.messageDAO = messageDAO;
@@ -117,6 +123,7 @@ public class DeleteMessageListener implements 
EventListener.GroupEventListener {
         this.counterDAO = counterDAO;
         this.recentsDAO = recentsDAO;
         this.blobStore = blobStore;
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     @Override
@@ -239,15 +246,22 @@ public class DeleteMessageListener implements 
EventListener.GroupEventListener {
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES, 
CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, 
chooseReadConsistencyUponWrites())
             .hasElements()
             .map(negate());
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId 
excludedId) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES, 
CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, 
chooseReadConsistencyUponWrites())
             .filter(metadata -> 
!metadata.getComposedMessageId().getMailboxId().equals(excludedId))
             .hasElements()
             .map(negate());
     }
+
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice 
chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
 }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 9cb0d5e..1b5f58e 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -155,6 +155,13 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         }
     }
 
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice 
chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
+
     @Override
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
@@ -220,7 +227,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     }
 
     private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, 
Optional<CassandraId> mailboxId) {
-        return imapUidDAO.retrieve(messageId, mailboxId, STRONG)
+        return imapUidDAO.retrieve(messageId, mailboxId, 
chooseReadConsistencyUponWrites())
             .flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
             .then();
     }
@@ -283,7 +290,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
 
     private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> 
updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, 
MessageManager.FlagsUpdateMode updateMode) {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, 
Optional.of(cassandraId), STRONG)
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, 
Optional.of(cassandraId), chooseReadConsistencyUponWrites())
             .flatMap(oldComposedId -> updateFlags(newState, updateMode, 
cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
             .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
             .collectList();
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 68a7e72..faa3464 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static 
org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.security.SecureRandom;
@@ -409,7 +410,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId 
mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> 
failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = 
Flux.fromIterable(failed)
-                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) 
ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), STRONG),
+                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) 
ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), 
chooseReadConsistencyUponWrites()),
                     DEFAULT_CONCURRENCY);
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
@@ -417,6 +418,13 @@ public class CassandraMessageMapper implements 
MessageMapper {
         }
     }
 
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice 
chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
+
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, 
Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator 
flagsUpdateCalculator) {
         return computeNewModSeq(mailboxId)
             .flatMapMany(newModSeq -> toBeUpdated

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to