This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 819f25da21c96383e930827e585ecedf0c7078a3 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Feb 26 14:37:19 2020 +0700 JAMES-3074 Cassandra: On the fly UidValidity sanitizing Impact: minor. Statistically, one entry out of 2 billion is affected. --- .../cassandra/mail/CassandraMailboxDAO.java | 52 +++++++++++++---- .../cassandra/mail/CassandraMailboxDAOTest.java | 68 ++++++++++++++++++++++ 2 files changed, 108 insertions(+), 12 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java index 02a7b13..87c9505 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java @@ -65,6 +65,7 @@ public class CassandraMailboxDAO { private final PreparedStatement deleteStatement; private final PreparedStatement insertStatement; private final PreparedStatement updateStatement; + private final PreparedStatement updateUidValidityStatement; @Inject public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils) { @@ -72,6 +73,7 @@ public class CassandraMailboxDAO { this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider); this.insertStatement = prepareInsert(session); this.updateStatement = prepareUpdate(session); + this.updateUidValidityStatement = prepareUpdateUidValidity(session); this.deleteStatement = prepareDelete(session); this.listStatement = prepareList(session); this.readStatement = prepareRead(session); @@ -98,6 +100,12 @@ public class CassandraMailboxDAO { .where(eq(ID, bindMarker(ID)))); } + private PreparedStatement prepareUpdateUidValidity(Session session) { + return session.prepare(update(TABLE_NAME) + .with(set(UIDVALIDITY, bindMarker(UIDVALIDITY))) + .where(eq(ID, bindMarker(ID)))); + } + private PreparedStatement prepareDelete(Session session) { return session.prepare(QueryBuilder.delete() .from(TABLE_NAME) @@ -138,27 +146,47 @@ public class CassandraMailboxDAO { return executor.executeSingleRow(readStatement.bind() .setUUID(ID, mailboxId.asUuid()) .setConsistencyLevel(QUORUM)) - .map(row -> mailboxFromRow(row, mailboxId)); + .flatMap(row -> mailboxFromRow(row, mailboxId)); } - private Mailbox mailboxFromRow(Row row, CassandraId cassandraId) { - return new Mailbox( - new MailboxPath( - row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE), - Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)), - row.getString(NAME)), - UidValidity.of(row.getLong(UIDVALIDITY)), - cassandraId); + private Mono<Mailbox> mailboxFromRow(Row row, CassandraId cassandraId) { + return sanitizeUidValidity(cassandraId, row.getLong(UIDVALIDITY)) + .map(uidValidity -> new Mailbox( + new MailboxPath( + row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE), + Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)), + row.getString(NAME)), + uidValidity, + cassandraId)); + } + + private Mono<UidValidity> sanitizeUidValidity(CassandraId cassandraId, long uidValidityAsLong) { + if (!UidValidity.isValid(uidValidityAsLong)) { + UidValidity newUidValidity = UidValidity.generate(); + return updateUidValidity(cassandraId, newUidValidity) + .then(Mono.just(newUidValidity)); + } + return Mono.just(UidValidity.ofValid(uidValidityAsLong)); + } + + /** + * Expected concurrency issue in the absence of performance expensive LightWeight transaction + * As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't + * outweigh the performance costs + */ + private Mono<Void> updateUidValidity(CassandraId cassandraId, UidValidity uidValidity) { + return executor.executeVoid(updateUidValidityStatement.bind() + .setUUID(ID, cassandraId.asUuid()) + .setLong(UIDVALIDITY, uidValidity.asLong())); } public Flux<Mailbox> retrieveAllMailboxes() { return executor.execute(listStatement.bind()) .flatMapMany(cassandraUtils::convertToFlux) - .map(this::toMailboxWithId); + .flatMap(this::toMailboxWithId); } - private Mailbox toMailboxWithId(Row row) { + private Mono<Mailbox> toMailboxWithId(Row row) { return mailboxFromRow(row, CassandraId.of(row.getUUID(ID))); } - } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java index 9aa1486..54d0134 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java @@ -19,6 +19,10 @@ package org.apache.james.mailbox.cassandra.mail; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.set; +import static com.datastax.driver.core.querybuilder.QueryBuilder.update; +import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn; import static org.assertj.core.api.Assertions.assertThat; import java.util.List; @@ -26,6 +30,8 @@ import java.util.Optional; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.Scenario; +import org.apache.james.backends.cassandra.Scenario.Barrier; import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule; import org.apache.james.core.Username; @@ -37,9 +43,13 @@ import org.apache.james.mailbox.model.MailboxId; import org.apache.james.mailbox.model.MailboxPath; import org.apache.james.mailbox.model.UidValidity; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + class CassandraMailboxDAOTest { private static final UidValidity UID_VALIDITY_1 = UidValidity.ofValid(145); private static final UidValidity UID_VALIDITY_2 = UidValidity.ofValid(147); @@ -84,6 +94,64 @@ class CassandraMailboxDAOTest { } @Test + void retrieveMailboxShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) { + testee.save(mailbox1).block(); + + // Hack to insert a faulty value + cassandra.getConf().execute(update("mailbox") + .with(set("uidvalidity", -12)) + .where(eq("id", CASSANDRA_ID_1.asUuid()))); + + Optional<Mailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1) + .blockOptional(); + assertThat(readMailbox).isPresent() + .hasValueSatisfying(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue()); + } + + @Test + void retrieveAllShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) { + testee.save(mailbox1).block(); + + // Hack to insert a faulty value + cassandra.getConf().execute(update("mailbox") + .with(set("uidvalidity", -12)) + .where(eq("id", CASSANDRA_ID_1.asUuid()))); + + List<Mailbox> readMailbox = testee.retrieveAllMailboxes().collectList().block(); + assertThat(readMailbox).hasSize(1) + .allSatisfy(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue()); + } + + @Disabled("Expected concurrency issue in the absence of performance expensive LightWeight transaction" + + "As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't" + + "outweigh the costs") + @Test + void retrieveMailboxShouldNotBeSubjectToDataRaceUponUidValiditySanitizing(CassandraCluster cassandra) throws Exception { + testee.save(mailbox1).block(); + + // Hack to insert a faulty value + cassandra.getConf().execute(update("mailbox") + .with(set("uidvalidity", -12)) + .where(eq("id", CASSANDRA_ID_1.asUuid()))); + + Barrier barrier = new Barrier(2); + cassandra.getConf().registerScenario(awaitOn(barrier) + .times(2) + .whenQueryStartsWith("UPDATE mailbox SET")); + + Mono<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1).cache(); + Mono<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1).cache(); + readMailbox1.subscribeOn(Schedulers.elastic()).subscribe(); + readMailbox2.subscribeOn(Schedulers.elastic()).subscribe(); + + barrier.awaitCaller(); + barrier.releaseCaller(); + + assertThat(readMailbox1.block().getUidValidity()) + .isEqualTo(readMailbox2.block().getUidValidity()); + } + + @Test void saveShouldOverride() { testee.save(mailbox1).block(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
