This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c490eeaff0af77d65a35e90e1f6b4684b9e322ad Author: Benoit Tellier <[email protected]> AuthorDate: Sun Mar 28 14:44:20 2021 +0700 JAMES-3435 Use writeConsistency to determine read consistency needs upon writes. --- .../CassandraMailboxSessionMapperFactory.java | 2 +- .../mailbox/cassandra/DeleteMessageListener.java | 20 +++++++++++++++++--- .../cassandra/mail/CassandraMessageIdMapper.java | 11 +++++++++-- .../cassandra/mail/CassandraMessageMapper.java | 10 +++++++++- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java index 26a8a04..671b894 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java @@ -217,6 +217,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa public DeleteMessageListener deleteMessageListener() { return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2, ownerDAO, attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO, - mailboxCounterDAO, mailboxRecentsDAO, blobStore); + mailboxCounterDAO, mailboxRecentsDAO, blobStore, cassandraConfiguration); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java index ddfe04b..d48623b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java @@ -19,6 +19,8 @@ package org.apache.james.mailbox.cassandra; +import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG; +import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK; import static org.apache.james.util.FunctionalUtils.negate; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; @@ -27,6 +29,7 @@ import java.util.function.Predicate; import javax.inject.Inject; +import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.blob.api.BlobStore; import org.apache.james.events.Event; import org.apache.james.events.EventListener; @@ -95,13 +98,16 @@ public class DeleteMessageListener implements EventListener.GroupEventListener { private final CassandraMailboxCounterDAO counterDAO; private final CassandraMailboxRecentsDAO recentsDAO; private final BlobStore blobStore; + private final CassandraConfiguration cassandraConfiguration; @Inject public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO, CassandraMessageDAOV3 messageDAOV3, CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO, CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper, CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) { + CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, + CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore, + CassandraConfiguration cassandraConfiguration) { this.imapUidDAO = imapUidDAO; this.messageIdDAO = messageIdDAO; this.messageDAO = messageDAO; @@ -117,6 +123,7 @@ public class DeleteMessageListener implements EventListener.GroupEventListener { this.counterDAO = counterDAO; this.recentsDAO = recentsDAO; this.blobStore = blobStore; + this.cassandraConfiguration = cassandraConfiguration; } @Override @@ -239,15 +246,22 @@ public class DeleteMessageListener implements EventListener.GroupEventListener { } private Mono<Boolean> isReferenced(CassandraMessageId id) { - return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG) + return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites()) .hasElements() .map(negate()); } private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) { - return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG) + return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites()) .filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId)) .hasElements() .map(negate()); } + + private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() { + if (cassandraConfiguration.isMessageWriteStrongConsistency()) { + return STRONG; + } + return WEAK; + } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 9cb0d5e..1b5f58e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -155,6 +155,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } } + private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() { + if (cassandraConfiguration.isMessageWriteStrongConsistency()) { + return STRONG; + } + return WEAK; + } + @Override public void save(MailboxMessage mailboxMessage) throws MailboxException { CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId(); @@ -220,7 +227,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { } private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) { - return imapUidDAO.retrieve(messageId, mailboxId, STRONG) + return imapUidDAO.retrieve(messageId, mailboxId, chooseReadConsistencyUponWrites()) .flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY) .then(); } @@ -283,7 +290,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) { CassandraId cassandraId = (CassandraId) mailboxId; - return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), STRONG) + return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites()) .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY) .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) .collectList(); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 68a7e72..faa3464 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -20,6 +20,7 @@ package org.apache.james.mailbox.cassandra.mail; import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG; +import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import java.security.SecureRandom; @@ -409,7 +410,7 @@ public class CassandraMessageMapper implements MessageMapper { private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) { if (!failed.isEmpty()) { Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed) - .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), STRONG), + .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), chooseReadConsistencyUponWrites()), DEFAULT_CONCURRENCY); return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator); } else { @@ -417,6 +418,13 @@ public class CassandraMessageMapper implements MessageMapper { } } + private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() { + if (cassandraConfiguration.isMessageWriteStrongConsistency()) { + return STRONG; + } + return WEAK; + } + private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) { return computeNewModSeq(mailboxId) .flatMapMany(newModSeq -> toBeUpdated --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
