This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 23131f2dd6fdf4d45f4940bbcc5524424437d64e Author: Benoit Tellier <[email protected]> AuthorDate: Sun Mar 28 22:57:33 2021 +0700 JAMES-3435 Allow to avoid mailbox SERIAL reads when not linked to writes Let's consider now read-repairs as writes thus requires strong consistency. --- .../cassandra/mail/CassandraMailboxMapper.java | 19 ++++++++---- .../cassandra/mail/CassandraMailboxPathV3DAO.java | 34 ++++++++++++++++++++-- .../task/SolveMailboxInconsistenciesService.java | 3 +- .../mail/CassandraMailboxPathV3DAOTest.java | 3 +- 4 files changed, 49 insertions(+), 10 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java index f4dbd1d..4ee3a75 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java @@ -19,6 +19,9 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.STRONG; +import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.WEAK; + import java.security.SecureRandom; import java.time.Duration; @@ -101,7 +104,7 @@ public class CassandraMailboxMapper implements MailboxMapper { return mailboxDAO.retrieveMailbox(id) .flatMap(mailboxEntry -> SolveMailboxInconsistenciesService.Inconsistency .detectMailboxDaoInconsistency(mailboxEntry, - mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath())) + mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG)) .flatMap(inconsistency -> inconsistency.fix(new SolveMailboxInconsistenciesService.Context(), mailboxDAO, mailboxPathV3DAO) .then(Mono.just(mailboxEntry)))); @@ -111,11 +114,17 @@ public class CassandraMailboxMapper implements MailboxMapper { private Mono<Mailbox> performReadRepair(MailboxPath path) { if (shouldReadRepair()) { - return mailboxPathV3DAO.retrieve(path) + return mailboxPathV3DAO.retrieve(path, STRONG) .flatMap(this::performPathReadRepair); } - return mailboxPathV3DAO.retrieve(path); + return mailboxPathV3DAO.retrieve(path, consistencyChoice()); + } + private CassandraMailboxPathV3DAO.ConsistencyChoice consistencyChoice() { + if (cassandraConfiguration.isMailboxReadStrongConsistency()) { + return STRONG; + } + return WEAK; } private Flux<Mailbox> performReadRepair(Flux<Mailbox> pathEntries) { @@ -250,13 +259,13 @@ public class CassandraMailboxMapper implements MailboxMapper { .flatMapMany(needSupport -> { if (needSupport) { return Flux.concat( - mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser), + mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice()), Flux.concat( mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser), mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser)) .flatMap(this::retrieveMailbox, CONCURRENCY)); } - return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser); + return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice()); }); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java index 3a6d46b..af34896 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java @@ -32,6 +32,8 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Tab import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Table.UIDVALIDITY; import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Table.USER; +import java.util.function.Function; + import javax.inject.Inject; import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration; @@ -46,6 +48,7 @@ import org.apache.james.mailbox.model.UidValidity; import org.apache.james.util.FunctionalUtils; import org.apache.james.util.ReactorUtils; +import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; @@ -55,6 +58,22 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CassandraMailboxPathV3DAO { + // todo factorize me in CassandraConsistenciesConfiguration + public enum ConsistencyChoice { + WEAK(CassandraConsistenciesConfiguration::getRegular), + STRONG(CassandraConsistenciesConfiguration::getLightweightTransaction); + + private final Function<CassandraConsistenciesConfiguration, ConsistencyLevel> choice; + + ConsistencyChoice(Function<CassandraConsistenciesConfiguration, ConsistencyLevel> choice) { + this.choice = choice; + } + + public ConsistencyLevel choose(CassandraConsistenciesConfiguration configuration) { + return choice.apply(configuration); + } + } + private final CassandraAsyncExecutor cassandraAsyncExecutor; private final CassandraUtils cassandraUtils; private final PreparedStatement delete; @@ -117,22 +136,31 @@ public class CassandraMailboxPathV3DAO { } public Mono<Mailbox> retrieve(MailboxPath mailboxPath) { + return retrieve(mailboxPath, consistenciesConfiguration.getLightweightTransaction()); + } + + public Mono<Mailbox> retrieve(MailboxPath mailboxPath, ConsistencyChoice consistencyChoice) { + return retrieve(mailboxPath, consistencyChoice.choose(consistenciesConfiguration)); + } + + private Mono<Mailbox> retrieve(MailboxPath mailboxPath, ConsistencyLevel consistencyLevel) { return cassandraAsyncExecutor.executeSingleRow( select.bind() .setString(NAMESPACE, mailboxPath.getNamespace()) .setString(USER, sanitizeUser(mailboxPath.getUser())) .setString(MAILBOX_NAME, mailboxPath.getName()) - .setConsistencyLevel(consistenciesConfiguration.getLightweightTransaction())) + .setConsistencyLevel(consistencyLevel)) .map(this::fromRowToCassandraIdAndPath) .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess)) .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath))); } - public Flux<Mailbox> listUserMailboxes(String namespace, Username user) { + public Flux<Mailbox> listUserMailboxes(String namespace, Username user, ConsistencyChoice consistencyChoice) { return cassandraAsyncExecutor.execute( selectUser.bind() .setString(NAMESPACE, namespace) - .setString(USER, sanitizeUser(user))) + .setString(USER, sanitizeUser(user)) + .setConsistencyLevel(consistencyChoice.choose(consistenciesConfiguration))) .flatMapMany(cassandraUtils::convertToFlux) .map(this::fromRowToCassandraIdAndPath) .map(FunctionalUtils.toFunction(this::logReadSuccess)); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java index cef2c29..5f5f695 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail.task; +import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.STRONG; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import java.util.Collection; @@ -423,7 +424,7 @@ public class SolveMailboxInconsistenciesService { } private Mono<Inconsistency> detectMailboxDaoInconsistency(Mailbox mailboxEntry) { - Mono<Mailbox> pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath()); + Mono<Mailbox> pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG); return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry); } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java index 92d1aa9..a59c759 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java @@ -19,6 +19,7 @@ package org.apache.james.mailbox.cassandra.mail; +import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.STRONG; import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_1; import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_2; import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_3; @@ -95,7 +96,7 @@ class CassandraMailboxPathV3DAOTest { testee.save(MAILBOX_3).block(); List<Mailbox> cassandraIds = testee - .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser()) + .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser(), STRONG) .collectList() .block(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
