This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 26bf6d6449c606b0d7ce2cfecea68d0adc9657c5 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Mar 6 22:36:25 2020 +0700 JAMES-3105 Allow resetting counters Resetting counters is intrinsecly flowed in the face of concurrent operations. --- .../cassandra/mail/CassandraMailboxCounterDAO.java | 86 ++++++++++--- .../mail/CassandraMailboxCounterDAOTest.java | 143 ++++++++++++++++++--- 2 files changed, 195 insertions(+), 34 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java index 0c12e28..01f2564 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java @@ -25,13 +25,15 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.incr; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.update; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.COUNT; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.MAILBOX_ID; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.TABLE_NAME; +import static org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable.UNSEEN; import javax.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.mailbox.cassandra.ids.CassandraId; -import org.apache.james.mailbox.cassandra.table.CassandraMailboxCountersTable; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.model.Mailbox; import org.apache.james.mailbox.model.MailboxCounters; @@ -48,6 +50,8 @@ public class CassandraMailboxCounterDAO { private final PreparedStatement readStatement; private final PreparedStatement incrementUnseenCountStatement; private final PreparedStatement incrementMessageCountStatement; + private final PreparedStatement addToCounters; + private final PreparedStatement removeToCounters; private final PreparedStatement decrementUnseenCountStatement; private final PreparedStatement decrementMessageCountStatement; @@ -55,36 +59,82 @@ public class CassandraMailboxCounterDAO { public CassandraMailboxCounterDAO(Session session) { cassandraAsyncExecutor = new CassandraAsyncExecutor(session); readStatement = createReadStatement(session); - incrementMessageCountStatement = updateMailboxStatement(session, incr(CassandraMailboxCountersTable.COUNT)); - incrementUnseenCountStatement = updateMailboxStatement(session, incr(CassandraMailboxCountersTable.UNSEEN)); - decrementMessageCountStatement = updateMailboxStatement(session, decr(CassandraMailboxCountersTable.COUNT)); - decrementUnseenCountStatement = updateMailboxStatement(session, decr(CassandraMailboxCountersTable.UNSEEN)); + incrementMessageCountStatement = updateMailboxStatement(session, incr(COUNT)); + incrementUnseenCountStatement = updateMailboxStatement(session, incr(UNSEEN)); + addToCounters = session.prepare(update(TABLE_NAME) + .with(incr(COUNT, bindMarker(COUNT))) + .and(incr(UNSEEN, bindMarker(UNSEEN))) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + removeToCounters = session.prepare(update(TABLE_NAME) + .with(decr(COUNT, bindMarker(COUNT))) + .and(decr(UNSEEN, bindMarker(UNSEEN))) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + decrementMessageCountStatement = updateMailboxStatement(session, decr(COUNT)); + decrementUnseenCountStatement = updateMailboxStatement(session, decr(UNSEEN)); } private PreparedStatement createReadStatement(Session session) { return session.prepare( - select(CassandraMailboxCountersTable.UNSEEN, CassandraMailboxCountersTable.COUNT) - .from(CassandraMailboxCountersTable.TABLE_NAME) - .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, bindMarker(CassandraMailboxCountersTable.MAILBOX_ID)))); + select(UNSEEN, COUNT) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } private PreparedStatement updateMailboxStatement(Session session, Assignment operation) { return session.prepare( - update(CassandraMailboxCountersTable.TABLE_NAME) + update(TABLE_NAME) .with(operation) - .where(eq(CassandraMailboxCountersTable.MAILBOX_ID, bindMarker(CassandraMailboxCountersTable.MAILBOX_ID)))); + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); } public Mono<MailboxCounters> retrieveMailboxCounters(CassandraId mailboxId) { return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, readStatement)) .map(row -> MailboxCounters.builder() .mailboxId(mailboxId) - .count(row.getLong(CassandraMailboxCountersTable.COUNT)) - .unseen(row.getLong(CassandraMailboxCountersTable.UNSEEN)) + .count(row.getLong(COUNT)) + .unseen(row.getLong(UNSEEN)) .build()); } - public Mono<Long> countMessagesInMailbox(Mailbox mailbox) throws MailboxException { + public Mono<Void> resetCounters(MailboxCounters counters) { + CassandraId mailboxId = (CassandraId) counters.getMailboxId(); + + return retrieveMailboxCounters(mailboxId) + .switchIfEmpty(Mono.just(emptyCounters(mailboxId))) + .flatMap(storedCounters -> { + if (storedCounters.equals(counters)) { + return Mono.empty(); + } + return remove(storedCounters) + .then(add(counters)); + }); + } + + private MailboxCounters emptyCounters(CassandraId mailboxId) { + return MailboxCounters.builder() + .count(0) + .unseen(0) + .mailboxId(mailboxId) + .build(); + } + + private Mono<Void> add(MailboxCounters counters) { + CassandraId mailboxId = (CassandraId) counters.getMailboxId(); + return cassandraAsyncExecutor.executeVoid( + bindWithMailbox(mailboxId, addToCounters) + .setLong(COUNT, counters.getCount()) + .setLong(UNSEEN, counters.getUnseen())); + } + + private Mono<Void> remove(MailboxCounters counters) { + CassandraId mailboxId = (CassandraId) counters.getMailboxId(); + return cassandraAsyncExecutor.executeVoid( + bindWithMailbox(mailboxId, removeToCounters) + .setLong(COUNT, counters.getCount()) + .setLong(UNSEEN, counters.getUnseen())); + } + + public Mono<Long> countMessagesInMailbox(Mailbox mailbox) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return countMessagesInMailbox(mailboxId); @@ -92,14 +142,14 @@ public class CassandraMailboxCounterDAO { public Mono<Long> countMessagesInMailbox(CassandraId cassandraId) { return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(cassandraId, readStatement)) - .map(row -> row.getLong(CassandraMailboxCountersTable.COUNT)); + .map(row -> row.getLong(COUNT)); } - public Mono<Long> countUnseenMessagesInMailbox(Mailbox mailbox) throws MailboxException { + public Mono<Long> countUnseenMessagesInMailbox(Mailbox mailbox) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return cassandraAsyncExecutor.executeSingleRow(bindWithMailbox(mailboxId, readStatement)) - .map(row -> row.getLong(CassandraMailboxCountersTable.UNSEEN)); + .map(row -> row.getLong(UNSEEN)); } public Mono<Void> decrementCount(CassandraId mailboxId) { @@ -120,6 +170,6 @@ public class CassandraMailboxCounterDAO { private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) { return statement.bind() - .setUUID(CassandraMailboxCountersTable.MAILBOX_ID, mailboxId.asUuid()); + .setUUID(MAILBOX_ID, mailboxId.asUuid()); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAOTest.java index 15f5939..62aeb01 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAOTest.java @@ -52,36 +52,36 @@ class CassandraMailboxCounterDAOTest { } @Test - void countMessagesInMailboxShouldReturnEmptyByDefault() throws Exception { + void countMessagesInMailboxShouldReturnEmptyByDefault() { assertThat(testee.countMessagesInMailbox(mailbox).hasElement().block()).isFalse(); } @Test - void countUnseenMessagesInMailboxShouldReturnEmptyByDefault() throws Exception { + void countUnseenMessagesInMailboxShouldReturnEmptyByDefault() { assertThat(testee.countUnseenMessagesInMailbox(mailbox).hasElement().block()).isFalse(); } @Test - void retrieveMailboxCounterShouldReturnEmptyByDefault() throws Exception { + void retrieveMailboxCounterShouldReturnEmptyByDefault() { assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).hasElement().block()).isFalse(); } @Test - void incrementCountShouldAddOneWhenAbsent() throws Exception { + void incrementCountShouldAddOneWhenAbsent() { testee.incrementCount(MAILBOX_ID).block(); assertThat(testee.countMessagesInMailbox(mailbox).block()).isEqualTo(1L); } @Test - void incrementUnseenShouldAddOneWhenAbsent() throws Exception { + void incrementUnseenShouldAddOneWhenAbsent() { testee.incrementUnseen(MAILBOX_ID).block(); assertThat(testee.countUnseenMessagesInMailbox(mailbox).block()).isEqualTo(1L); } @Test - void incrementUnseenShouldAddOneWhenAbsentOnMailboxCounters() throws Exception { + void incrementUnseenShouldAddOneWhenAbsentOnMailboxCounters() { testee.incrementUnseen(MAILBOX_ID).block(); assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).block()) @@ -93,7 +93,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void incrementCountShouldAddOneWhenAbsentOnMailboxCounters() throws Exception { + void incrementCountShouldAddOneWhenAbsentOnMailboxCounters() { testee.incrementCount(MAILBOX_ID).block(); assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).block()) @@ -105,7 +105,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void retrieveMailboxCounterShouldWorkWhenFullRow() throws Exception { + void retrieveMailboxCounterShouldWorkWhenFullRow() { testee.incrementCount(MAILBOX_ID).block(); testee.incrementUnseen(MAILBOX_ID).block(); @@ -118,7 +118,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementCountShouldRemoveOne() throws Exception { + void decrementCountShouldRemoveOne() { testee.incrementCount(MAILBOX_ID).block(); testee.decrementCount(MAILBOX_ID).block(); @@ -128,7 +128,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementUnseenShouldRemoveOne() throws Exception { + void decrementUnseenShouldRemoveOne() { testee.incrementUnseen(MAILBOX_ID).block(); testee.decrementUnseen(MAILBOX_ID).block(); @@ -138,7 +138,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void incrementUnseenShouldHaveNoImpactOnMessageCount() throws Exception { + void incrementUnseenShouldHaveNoImpactOnMessageCount() { testee.incrementUnseen(MAILBOX_ID).block(); assertThat(testee.countMessagesInMailbox(mailbox).block()) @@ -146,7 +146,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void incrementCountShouldHaveNoEffectOnUnseenCount() throws Exception { + void incrementCountShouldHaveNoEffectOnUnseenCount() { testee.incrementCount(MAILBOX_ID).block(); assertThat(testee.countUnseenMessagesInMailbox(mailbox).block()) @@ -154,7 +154,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementUnseenShouldHaveNoEffectOnMessageCount() throws Exception { + void decrementUnseenShouldHaveNoEffectOnMessageCount() { testee.incrementCount(MAILBOX_ID).block(); testee.decrementUnseen(MAILBOX_ID).block(); @@ -164,7 +164,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementCountShouldHaveNoEffectOnUnseenCount() throws Exception { + void decrementCountShouldHaveNoEffectOnUnseenCount() { testee.incrementUnseen(MAILBOX_ID).block(); testee.decrementCount(MAILBOX_ID).block(); @@ -174,7 +174,7 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementCountCanLeadToNegativeValue() throws Exception { + void decrementCountCanLeadToNegativeValue() { testee.decrementCount(MAILBOX_ID).block(); assertThat(testee.countMessagesInMailbox(mailbox).block()) @@ -182,10 +182,121 @@ class CassandraMailboxCounterDAOTest { } @Test - void decrementUnseenCanLeadToNegativeValue() throws Exception { + void decrementUnseenCanLeadToNegativeValue() { testee.decrementUnseen(MAILBOX_ID).block(); assertThat(testee.countUnseenMessagesInMailbox(mailbox).block()) .isEqualTo(-1L); } + + @Test + void resetCountersShouldNoopWhenZeroAndNoData() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(0) + .count(0) + .mailboxId(MAILBOX_ID) + .build(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .isEmpty(); + } + + @Test + void resetCountersShouldNoopWhenZeroAndZeroData() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(0) + .count(0) + .mailboxId(MAILBOX_ID) + .build(); + + testee.incrementUnseen(MAILBOX_ID).block(); + testee.decrementUnseen(MAILBOX_ID).block(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } + + @Test + void resetCountersShouldReInitCountWhenNothing() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(45) + .count(78) + .mailboxId(MAILBOX_ID) + .build(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } + + @Test + void resetCountersShouldReInitCountWhenData() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(45) + .count(78) + .mailboxId(MAILBOX_ID) + .build(); + + testee.incrementCount(MAILBOX_ID).block(); + testee.incrementUnseen(MAILBOX_ID).block(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } + + @Test + void resetCountersShouldBeIdempotent() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(45) + .count(78) + .mailboxId(MAILBOX_ID) + .build(); + + testee.resetCounters(counters).block(); + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } + + @Test + void resetCountersShouldReInitCountWhenZeroUnseen() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(0) + .count(78) + .mailboxId(MAILBOX_ID) + .build(); + + testee.incrementCount(MAILBOX_ID).block(); + testee.incrementUnseen(MAILBOX_ID).block(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } + + @Test + void resetCountersShouldReInitCountWhenZeroCount() { + MailboxCounters counters = MailboxCounters.builder() + .unseen(46) + .count(0) + .mailboxId(MAILBOX_ID) + .build(); + + testee.incrementCount(MAILBOX_ID).block(); + testee.incrementUnseen(MAILBOX_ID).block(); + + testee.resetCounters(counters).block(); + + assertThat(testee.retrieveMailboxCounters(MAILBOX_ID).blockOptional()) + .contains(counters); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
