This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a74ca3548cb4675052aa7ce1cd04a742cbf72510 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Wed Jun 24 13:18:40 2020 +0700 JAMES-3265 Reduce statement count upon CassandraMessageMapper::delete & Flags updates This effectively optimizes IMAP EXPUNGE & STORE operations, dramatically limit counter writes, and should prevent uneeded large tumbstones ranges creations for deletedMessages and recentMessages projections. --- .../cassandra/mail/CassandraIndexTableHandler.java | 107 +++++++++++++-- .../cassandra/mail/CassandraMailboxCounterDAO.java | 4 +- .../cassandra/mail/CassandraMessageMapper.java | 61 +++++---- .../mail/CassandraIndexTableHandlerTest.java | 2 +- .../cassandra/mail/CassandraMessageMapperTest.java | 145 +++++++++++++++++---- 5 files changed, 253 insertions(+), 66 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java index 8fc3d91..47fd1eb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandler.java @@ -19,15 +19,23 @@ package org.apache.james.mailbox.cassandra.mail; +import java.util.Collection; +import java.util.List; + import javax.inject.Inject; import javax.mail.Flags; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; +import org.apache.james.mailbox.model.MailboxCounters; +import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.UpdatedFlags; import org.apache.james.mailbox.store.mail.model.MailboxMessage; +import org.apache.james.util.streams.Iterators; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Flux; @@ -61,11 +69,39 @@ public class CassandraIndexTableHandler { return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, updateFirstUnseenOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags(), composedMessageIdWithMetaData.getComposedMessageId().getUid()), mailboxRecentDAO.removeFromRecent(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()), - deletedMessageDAO.removeDeleted(mailboxId, uid), + updateDeletedMessageProjectionOnDelete(mailboxId, uid, composedMessageIdWithMetaData.getFlags()), decrementCountersOnDelete(mailboxId, composedMessageIdWithMetaData.getFlags())) .then(); } + public Mono<Void> updateIndexOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) { + return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, + Flux.fromIterable(metaData) + .flatMap(message -> updateFirstUnseenOnDelete(mailboxId, message.getFlags(), message.getUid())), + Flux.fromIterable(metaData) + .flatMap(message -> updateRecentOnDelete(mailboxId, message.getUid(), message.getFlags())), + Flux.fromIterable(metaData) + .flatMap(message -> updateDeletedMessageProjectionOnDelete(mailboxId, message.getUid(), message.getFlags())), + decrementCountersOnDelete(mailboxId, metaData)) + .then(); + } + + private Mono<Void> updateRecentOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) { + if (flags.contains(Flags.Flag.RECENT)) { + return mailboxRecentDAO.removeFromRecent(mailboxId, uid); + } + + return Mono.empty(); + } + + private Mono<Void> updateDeletedMessageProjectionOnDelete(CassandraId mailboxId, MessageUid uid, Flags flags) { + if (flags.contains(Flags.Flag.DELETED)) { + return deletedMessageDAO.removeDeleted(mailboxId, uid); + } + + return Mono.empty(); + } + public Mono<Void> updateIndexOnAdd(MailboxMessage message, CassandraId mailboxId) { Flags flags = message.createFlags(); @@ -79,15 +115,32 @@ public class CassandraIndexTableHandler { } public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { + return updateIndexOnFlagsUpdate(mailboxId, ImmutableList.of(updatedFlags)); + } + + public Mono<Void> updateIndexOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { return Flux.mergeDelayError(Queues.XS_BUFFER_SIZE, manageUnseenMessageCountsOnFlagsUpdate(mailboxId, updatedFlags), manageRecentOnFlagsUpdate(mailboxId, updatedFlags), updateFirstUnseenOnFlagsUpdate(mailboxId, updatedFlags), - applicableFlagDAO.updateApplicableFlags(mailboxId, ImmutableSet.copyOf(updatedFlags.userFlagIterator())), + manageApplicableFlagsOnFlagsUpdate(mailboxId, updatedFlags), updateDeletedOnFlagsUpdate(mailboxId, updatedFlags)) .then(); } + private Mono<Void> manageApplicableFlagsOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { + return applicableFlagDAO.updateApplicableFlags(mailboxId, + updatedFlags.stream() + .flatMap(flags -> Iterators.toStream(flags.userFlagIterator())) + .collect(Guavate.toImmutableSet())); + } + + private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { + return Flux.fromIterable(updatedFlags) + .concatMap(flags -> updateDeletedOnFlagsUpdate(mailboxId, flags)) + .then(); + } + private Mono<Void> updateDeletedOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToSet(Flags.Flag.DELETED)) { return deletedMessageDAO.addDeleted(mailboxId, updatedFlags.getUid()); @@ -105,6 +158,19 @@ public class CassandraIndexTableHandler { return mailboxCounterDAO.decrementUnseenAndCount(mailboxId); } + private Mono<Void> decrementCountersOnDelete(CassandraId mailboxId, Collection<MessageMetaData> metaData) { + long seenCount = metaData.stream() + .map(MessageMetaData::getFlags) + .filter(flags -> flags.contains(Flags.Flag.SEEN)) + .count(); + + return mailboxCounterDAO.remove(MailboxCounters.builder() + .mailboxId(mailboxId) + .count(metaData.size()) + .unseen(seenCount) + .build()); + } + private Mono<Void> incrementCountersOnSave(CassandraId mailboxId, Flags flags) { if (flags.contains(Flags.Flag.SEEN)) { return mailboxCounterDAO.incrementCount(mailboxId); @@ -119,16 +185,35 @@ public class CassandraIndexTableHandler { return Mono.empty(); } - private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { - if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) { - return mailboxCounterDAO.incrementUnseen(mailboxId); - } - if (updatedFlags.isModifiedToSet(Flags.Flag.SEEN)) { - return mailboxCounterDAO.decrementUnseen(mailboxId); + private Mono<Void> manageUnseenMessageCountsOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { + int sum = updatedFlags.stream() + .mapToInt(flags -> { + if (flags.isModifiedToUnset(Flags.Flag.SEEN)) { + return 1; + } + if (flags.isModifiedToSet(Flags.Flag.SEEN)) { + return -1; + } + return 0; + }) + .sum(); + + if (sum != 0) { + return mailboxCounterDAO.add(MailboxCounters.builder() + .mailboxId(mailboxId) + .count(0) + .unseen(sum) + .build()); } return Mono.empty(); } + private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { + return Flux.fromIterable(updatedFlags) + .concatMap(flags -> manageRecentOnFlagsUpdate(mailboxId, flags)) + .then(); + } + private Mono<Void> manageRecentOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToUnset(Flags.Flag.RECENT)) { return mailboxRecentDAO.removeFromRecent(mailboxId, updatedFlags.getUid()); @@ -161,6 +246,12 @@ public class CassandraIndexTableHandler { return firstUnseenDAO.removeUnread(mailboxId, uid); } + private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, List<UpdatedFlags> updatedFlags) { + return Flux.fromIterable(updatedFlags) + .concatMap(flags -> updateFirstUnseenOnFlagsUpdate(mailboxId, flags)) + .then(); + } + private Mono<Void> updateFirstUnseenOnFlagsUpdate(CassandraId mailboxId, UpdatedFlags updatedFlags) { if (updatedFlags.isModifiedToUnset(Flags.Flag.SEEN)) { return firstUnseenDAO.addUnread(mailboxId, updatedFlags.getUid()); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java index 80ae4ba..460decf 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxCounterDAO.java @@ -136,7 +136,7 @@ public class CassandraMailboxCounterDAO { .build(); } - private Mono<Void> add(MailboxCounters counters) { + public Mono<Void> add(MailboxCounters counters) { CassandraId mailboxId = (CassandraId) counters.getMailboxId(); return cassandraAsyncExecutor.executeVoid( bindWithMailbox(mailboxId, addToCounters) @@ -144,7 +144,7 @@ public class CassandraMailboxCounterDAO { .setLong(UNSEEN, counters.getUnseen())); } - private Mono<Void> remove(MailboxCounters counters) { + public Mono<Void> remove(MailboxCounters counters) { CassandraId mailboxId = (CassandraId) counters.getMailboxId(); return cassandraAsyncExecutor.executeVoid( bindWithMailbox(mailboxId, removeToCounters) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 2934984..9fbf32c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -64,6 +64,7 @@ import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; public class CassandraMessageMapper implements MessageMapper { @@ -139,25 +140,30 @@ public class CassandraMessageMapper implements MessageMapper { @Override public void delete(Mailbox mailbox, MailboxMessage message) { - deleteAsFuture(message) + ComposedMessageIdWithMetaData metaData = message.getComposedMessageIdWithMetaData(); + + deleteAndHandleIndexUpdates(metaData) .block(); } - private Mono<Void> deleteAsFuture(MailboxMessage message) { - ComposedMessageIdWithMetaData composedMessageIdWithMetaData = message.getComposedMessageIdWithMetaData(); + private Mono<Void> deleteAndHandleIndexUpdates(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); + CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId(); - return deleteUsingMailboxId(composedMessageIdWithMetaData); + return delete(composedMessageIdWithMetaData) + .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)); } - private Mono<Void> deleteUsingMailboxId(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { + private Mono<Void> delete(ComposedMessageIdWithMetaData composedMessageIdWithMetaData) { ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId(); CassandraMessageId messageId = (CassandraMessageId) composedMessageId.getMessageId(); CassandraId mailboxId = (CassandraId) composedMessageId.getMailboxId(); MessageUid uid = composedMessageId.getUid(); + return Flux.merge( imapUidDAO.delete(messageId, mailboxId), messageIdDAO.delete(mailboxId, uid)) - .then(indexTableHandler.updateIndexOnDelete(composedMessageIdWithMetaData, mailboxId)); + .then(); } @Override @@ -211,25 +217,20 @@ public class CassandraMessageMapper implements MessageMapper { public Map<MessageUid, MessageMetaData> deleteMessages(Mailbox mailbox, List<MessageUid> uids) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return Flux.fromStream(uids.stream()) - .flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize()) - .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)) + return Flux.fromIterable(MessageRange.toRanges(uids)) + .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited())) + .flatMap(this::expungeOne, cassandraConfiguration.getExpungeChunkSize()) + .collect(Guavate.toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)) + .flatMap(messageMap -> indexTableHandler.updateIndexOnDelete(mailboxId, messageMap.values()) + .thenReturn(messageMap)) + .subscribeOn(Schedulers.elastic()) .block(); } - private Mono<SimpleMailboxMessage> expungeOne(CassandraId mailboxId, MessageUid messageUid) { - return retrieveComposedId(mailboxId, messageUid) - .flatMap(idWithMetadata -> deleteUsingMailboxId(idWithMetadata).thenReturn(idWithMetadata)) - .flatMap(idWithMetadata -> messageDAO.retrieveMessage(idWithMetadata, FetchType.Metadata) - .map(pair -> pair.toMailboxMessage(idWithMetadata, ImmutableList.of()))); - } - - private Mono<ComposedMessageIdWithMetaData> retrieveComposedId(CassandraId mailboxId, MessageUid uid) { - return messageIdDAO.retrieve(mailboxId, uid) - .handle((t, sink) -> - t.ifPresentOrElse( - sink::next, - () -> LOGGER.warn("Could not retrieve message {} {}", mailboxId, uid))); + private Mono<SimpleMailboxMessage> expungeOne(ComposedMessageIdWithMetaData metaData) { + return delete(metaData) + .then(messageDAO.retrieveMessage(metaData, FetchType.Metadata) + .map(pair -> pair.toMailboxMessage(metaData, ImmutableList.of()))); } @Override @@ -237,7 +238,7 @@ public class CassandraMessageMapper implements MessageMapper { ComposedMessageIdWithMetaData composedMessageIdWithMetaData = original.getComposedMessageIdWithMetaData(); MessageMetaData messageMetaData = copy(destinationMailbox, original); - deleteUsingMailboxId(composedMessageIdWithMetaData).block(); + deleteAndHandleIndexUpdates(composedMessageIdWithMetaData).block(); return messageMetaData; } @@ -357,14 +358,12 @@ public class CassandraMessageMapper implements MessageMapper { } private Mono<FlagsUpdateStageResult> updateIndexesForUpdatesResult(CassandraId mailboxId, FlagsUpdateStageResult result) { - return Flux.fromIterable(result.getSucceeded()) - .flatMap(Throwing - .function((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)) - .fallbackTo(failedIndex -> { - LOGGER.error("Could not update flag indexes for mailboxId {} UID {}. This will lead to inconsistencies across Cassandra tables", mailboxId, failedIndex.getUid()); - return Mono.empty(); - })) - .then(Mono.just(result)); + return indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, result.getSucceeded()) + .onErrorResume(e -> { + LOGGER.error("Could not update flag indexes for mailboxId {}. This will lead to inconsistencies across Cassandra tables", mailboxId, e); + return Mono.empty(); + }) + .thenReturn(result); } @Override diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java index ae090d1..beb08d5 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java @@ -286,7 +286,7 @@ class CassandraIndexTableHandlerTest { testee.updateIndexOnDelete(new ComposedMessageIdWithMetaData( new ComposedMessageId(MAILBOX_ID, CASSANDRA_MESSAGE_ID, MESSAGE_UID), - new Flags(), + new Flags(Flags.Flag.DELETED), MODSEQ), MAILBOX_ID).block(); diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java index c3c8bc3..8581377 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java @@ -59,39 +59,136 @@ class CassandraMessageMapperTest extends MessageMapperTest { return new CassandraMapperProvider(cassandraCluster.getCassandraCluster()); } - @Test - void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException { - saveMessages(); + @Nested + class StatementLimitationTests { + @Test + void deleteMessagesShouldGroupMessageReads(CassandraCluster cassandra) throws MailboxException { + saveMessages(); - StatementRecorder statementRecorder = new StatementRecorder(); - cassandra.getConf().recordStatements(statementRecorder); + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); - int limit = 2; - consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit)); + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); + assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith( + "SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted,flagDraft,flagFlagged,flagRecent,flagSeen," + + "flagUser,userFlags FROM messageIdTable WHERE mailboxId=:mailboxId AND "))) + .hasSize(1); + } - assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement( - "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " + - "FROM messageV2 WHERE messageId=:messageId;"))) - .hasSize(limit); - } + @Test + void deleteMessagesShouldGroupCounterUpdates(CassandraCluster cassandra) throws MailboxException { + saveMessages(); - @Test - void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException { - saveMessages(); + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); - StatementRecorder statementRecorder = new StatementRecorder(); - cassandra.getConf().recordStatements(statementRecorder); + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); - messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all()); + assertThat(statementRecorder.listExecutedStatements( + Selector.preparedStatementStartingWith("UPDATE mailboxCounters SET "))) + .hasSize(1); + } - assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement( - "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;"))) - .hasSize(1); - } + @Test + void deleteMessagesShouldNotDeleteMessageNotMarkedAsDeletedInDeletedProjection(CassandraCluster cassandra) throws MailboxException { + saveMessages(); - private void consume(Iterator<MailboxMessage> inMailbox) { - ImmutableList.copyOf(inMailbox); + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); + + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); + + assertThat(statementRecorder.listExecutedStatements( + Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;"))) + .isEmpty(); + } + + @Test + void deleteMessagesShouldNotDeleteMessageNotMarkedAsRecentInRecentProjection(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); + + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); + + assertThat(statementRecorder.listExecutedStatements( + Selector.preparedStatement("DELETE FROM messageDeleted WHERE mailboxId=:mailboxId AND uid=:uid;"))) + .isEmpty(); + } + + @Test + void deleteMessagesShouldNotDeleteMessageNotMarkedAsUnSeenInFirstUnseenProjection(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + FlagsUpdateCalculator markAsRead = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD); + messageMapper.updateFlags(benwaInboxMailbox, markAsRead, MessageRange.all()); + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); + + messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid())); + + assertThat(statementRecorder.listExecutedStatements( + Selector.preparedStatement("DELETE FROM firstUnseen WHERE mailboxId=:mailboxId AND uid=:uid;"))) + .isEmpty(); + } + + @Test + void updateFlagsShouldUpdateMailboxCountersOnce(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + cassandra.getConf().printStatements(); + + messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all()); + + + assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatementStartingWith( + "UPDATE mailboxCounters SET "))) + .hasSize(1); + } + + @Test + void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + + int limit = 2; + consume(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Full, limit)); + + + assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement( + "SELECT messageId,internalDate,bodyStartOctet,fullContentOctets,bodyOctets,bodyContent,headerContent,textualLineCount,properties,attachments " + + "FROM messageV2 WHERE messageId=:messageId;"))) + .hasSize(limit); + } + + @Test + void updateFlagsShouldLimitModSeqAllocation(CassandraCluster cassandra) throws MailboxException { + saveMessages(); + + StatementRecorder statementRecorder = new StatementRecorder(); + cassandra.getConf().recordStatements(statementRecorder); + + messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.ANSWERED), MessageManager.FlagsUpdateMode.REPLACE), MessageRange.all()); + + assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement( + "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;"))) + .hasSize(1); + } + + private void consume(Iterator<MailboxMessage> inMailbox) { + ImmutableList.copyOf(inMailbox); + } } @Nested --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org