This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit db80c6c29de11e37f79493844e665b190bb375db Author: Benoit Tellier <[email protected]> AuthorDate: Mon Jun 8 14:52:24 2020 +0700 JAMES-3204 Push limit to Cassandra backend when reading messages --- .../mailbox/cassandra/DeleteMessageListener.java | 3 +- .../cassandra/mail/CassandraMessageIdDAO.java | 95 +++++++++++++----- .../cassandra/mail/CassandraMessageMapper.java | 9 +- .../mail/task/MailboxMergingTaskRunner.java | 3 +- .../mail/task/RecomputeMailboxCountersService.java | 3 +- .../cassandra/CassandraMailboxManagerTest.java | 15 +-- .../cassandra/mail/CassandraMessageIdDAOTest.java | 111 ++++++++++++++++++++- .../mail/CassandraMessageIdMapperTest.java | 9 +- .../cassandra/mail/CassandraMessageMapperTest.java | 11 +- .../java/org/apache/james/util/streams/Limit.java | 4 + 10 files changed, 211 insertions(+), 52 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java index 53b1d2e..e3d2eab 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java @@ -51,6 +51,7 @@ import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MessageMetaData; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.util.streams.Limit; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -137,7 +138,7 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen } private Mono<Void> handleMailboxDeletion(CassandraId mailboxId) { - return messageIdDAO.retrieveMessages(mailboxId, MessageRange.all()) + return messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()) .map(ComposedMessageIdWithMetaData::getComposedMessageId) .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getMessageId(), mailboxId) .then(imapUidDAO.delete((CassandraMessageId) metadata.getMessageId(), mailboxId)) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java index d15e121..82beedf 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java @@ -57,6 +57,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId.Factory; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.util.streams.Limit; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Row; @@ -71,6 +72,7 @@ public class CassandraMessageIdDAO { private static final String IMAP_UID_GTE = IMAP_UID + "_GTE"; private static final String IMAP_UID_LTE = IMAP_UID + "_LTE"; + public static final String LIMIT = "LIMIT_BIND_MARKER"; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final Factory messageIdFactory; @@ -78,8 +80,11 @@ public class CassandraMessageIdDAO { private final PreparedStatement insert; private final PreparedStatement select; private final PreparedStatement selectAllUids; + private final PreparedStatement selectAllUidsLimited; private final PreparedStatement selectUidGte; + private final PreparedStatement selectUidGteLimited; private final PreparedStatement selectUidRange; + private final PreparedStatement selectUidRangeLimited; private final PreparedStatement update; private final PreparedStatement listStatement; @@ -92,8 +97,11 @@ public class CassandraMessageIdDAO { this.update = prepareUpdate(session); this.select = prepareSelect(session); this.selectAllUids = prepareSelectAllUids(session); + this.selectAllUidsLimited = prepareSelectAllUidsLimited(session); this.selectUidGte = prepareSelectUidGte(session); + this.selectUidGteLimited = prepareSelectUidGteLimited(session); this.selectUidRange = prepareSelectUidRange(session); + this.selectUidRangeLimited = prepareSelectUidRangeLimited(session); this.listStatement = prepareList(session); } @@ -144,8 +152,15 @@ public class CassandraMessageIdDAO { private PreparedStatement prepareSelectAllUids(Session session) { return session.prepare(select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))); + } + + private PreparedStatement prepareSelectAllUidsLimited(Session session) { + return session.prepare(select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) + .limit(bindMarker(LIMIT))); } private PreparedStatement prepareList(Session session) { @@ -155,17 +170,34 @@ public class CassandraMessageIdDAO { private PreparedStatement prepareSelectUidGte(Session session) { return session.prepare(select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) - .and(gte(IMAP_UID, bindMarker(IMAP_UID)))); + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) + .and(gte(IMAP_UID, bindMarker(IMAP_UID)))); + } + + private PreparedStatement prepareSelectUidGteLimited(Session session) { + return session.prepare(select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) + .and(gte(IMAP_UID, bindMarker(IMAP_UID))) + .limit(bindMarker(LIMIT))); } private PreparedStatement prepareSelectUidRange(Session session) { return session.prepare(select(FIELDS) - .from(TABLE_NAME) - .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) - .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE))) - .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE)))); + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) + .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE))) + .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE)))); + } + + private PreparedStatement prepareSelectUidRangeLimited(Session session) { + return session.prepare(select(FIELDS) + .from(TABLE_NAME) + .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))) + .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE))) + .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE))) + .limit(bindMarker(LIMIT))); } public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) { @@ -226,8 +258,8 @@ public class CassandraMessageIdDAO { .setLong(IMAP_UID, uid.asLong())); } - public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId, MessageRange set) { - return retrieveRows(mailboxId, set) + public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId mailboxId, MessageRange set, Limit limit) { + return retrieveRows(mailboxId, set, limit) .map(this::fromRowToComposedMessageIdWithFlags); } @@ -236,36 +268,51 @@ public class CassandraMessageIdDAO { .map(this::fromRowToComposedMessageIdWithFlags); } - private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set) { + private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set, Limit limit) { switch (set.getType()) { case ALL: - return selectAll(mailboxId); + return selectAll(mailboxId, limit); case FROM: - return selectFrom(mailboxId, set.getUidFrom()); + return selectFrom(mailboxId, set.getUidFrom(), limit); case RANGE: - return selectRange(mailboxId, set.getUidFrom(), set.getUidTo()); + return selectRange(mailboxId, set.getUidFrom(), set.getUidTo(), limit); case ONE: return Flux.concat(selectOneRow(mailboxId, set.getUidFrom())); } throw new UnsupportedOperationException(); } - private Flux<Row> selectAll(CassandraId mailboxId) { - return cassandraAsyncExecutor.executeRows(selectAllUids.bind() - .setUUID(MAILBOX_ID, mailboxId.asUuid())); + private Flux<Row> selectAll(CassandraId mailboxId, Limit limit) { + return cassandraAsyncExecutor.executeRows(limit.getLimit() + .map(limitAsInt -> selectAllUidsLimited.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setInt(LIMIT, limitAsInt)) + .orElse(selectAllUids.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()))); } - private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid) { - return cassandraAsyncExecutor.executeRows(selectUidGte.bind() + private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid, Limit limit) { + return cassandraAsyncExecutor.executeRows(limit.getLimit() + .map(limitAsInt -> selectUidGteLimited.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) - .setLong(IMAP_UID, uid.asLong())); + .setLong(IMAP_UID, uid.asLong()) + .setInt(LIMIT, limitAsInt)) + .orElse(selectUidGte.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(IMAP_UID, uid.asLong()))); } - private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, MessageUid to) { - return cassandraAsyncExecutor.executeRows(selectUidRange.bind() + private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, MessageUid to, Limit limit) { + return cassandraAsyncExecutor.executeRows(limit.getLimit() + .map(limitAsInt -> selectUidRangeLimited.bind() + .setUUID(MAILBOX_ID, mailboxId.asUuid()) + .setLong(IMAP_UID_GTE, from.asLong()) + .setLong(IMAP_UID_LTE, to.asLong()) + .setInt(LIMIT, limitAsInt)) + .orElse(selectUidRange.bind() .setUUID(MAILBOX_ID, mailboxId.asUuid()) .setLong(IMAP_UID_GTE, from.asLong()) - .setLong(IMAP_UID_LTE, to.asLong())); + .setLong(IMAP_UID_LTE, to.asLong()))); } private ComposedMessageIdWithMetaData fromRowToComposedMessageIdWithFlags(Row row) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 9b8c18e..39a7757 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -111,7 +111,7 @@ public class CassandraMessageMapper implements MessageMapper { @Override public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) { CassandraId cassandraId = (CassandraId) mailbox.getMailboxId(); - return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all()) + return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all(), Limit.unlimited()) .map(metaData -> metaData.getComposedMessageId().getUid()); } @@ -167,10 +167,11 @@ public class CassandraMessageMapper implements MessageMapper { } @Override - public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limit) { + public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, MessageRange messageRange, FetchType ftype, int limitAsInt) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return Limit.from(limit).applyOnFlux(messageIdDAO.retrieveMessages(mailboxId, messageRange)) + Limit limit = Limit.from(limitAsInt); + return limit.applyOnFlux(messageIdDAO.retrieveMessages(mailboxId, messageRange, limit)) .flatMap(id -> retrieveMessage(id, ftype), cassandraConfiguration.getMessageReadChunkSize()) .sort(Comparator.comparing(MailboxMessage::getUid)); } @@ -291,7 +292,7 @@ public class CassandraMessageMapper implements MessageMapper { public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - Flux<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range); + Flux<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()); FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block(); FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java index 8e7c6e0..2de3caf 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.MailboxACL; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.store.StoreMessageIdManager; import org.apache.james.task.Task; +import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ public class MailboxMergingTaskRunner { } private Task.Result moveMessages(CassandraId oldMailboxId, CassandraId newMailboxId, MailboxSession session, MailboxMergingTask.Context context) { - return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, MessageRange.all()) + return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, MessageRange.all(), Limit.unlimited()) .map(ComposedMessageIdWithMetaData::getComposedMessageId) .map(messageId -> moveMessage(newMailboxId, messageId, session, context)) .reduce(Task.Result.COMPLETED, Task::combine) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java index 7bb4093..be1fbec 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.model.MailboxCounters; import org.apache.james.mailbox.model.MessageRange; import org.apache.james.task.Task; import org.apache.james.task.Task.Result; +import org.apache.james.util.streams.Limit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -216,7 +217,7 @@ public class RecomputeMailboxCountersService { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); Counter counter = new Counter(mailboxId); - return imapUidToMessageIdDAO.retrieveMessages(mailboxId, MessageRange.all()) + return imapUidToMessageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()) .flatMap(message -> latestMetadata(mailboxId, message, options), MESSAGE_CONCURRENCY) .doOnNext(counter::process) .then(Mono.defer(() -> counterDAO.resetCounters(counter.snapshot()))) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java index 6e35beb..cc1a026 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java @@ -69,6 +69,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.util.ClassLoaderUtils; import org.apache.james.util.streams.Iterators; +import org.apache.james.util.streams.Limit; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -140,7 +141,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -179,7 +180,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -218,7 +219,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -257,7 +258,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -296,7 +297,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -425,7 +426,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) @@ -492,7 +493,7 @@ public class CassandraMailboxManagerTest extends MailboxManagerTest<CassandraMai softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, Optional.of(mailboxId)).collectList().block()) .isEmpty(); - softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional()) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java index 8188a29..8f53020 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java @@ -37,6 +37,7 @@ import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; import org.apache.james.mailbox.model.ComposedMessageId; import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData; import org.apache.james.mailbox.model.MessageRange; +import org.apache.james.util.streams.Limit; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -395,11 +396,37 @@ class CassandraMessageIdDAOTest { testee.insert(composedMessageIdWithMetaData2)) .blockLast(); - assertThat(testee.retrieveMessages(mailboxId, MessageRange.all()).toIterable()) + assertThat(testee.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()).toIterable()) .containsOnly(composedMessageIdWithMetaData, composedMessageIdWithMetaData2); } @Test + void retrieveMessagesShouldApplyLimitWhenRangeAll() { + CassandraMessageId messageId = messageIdFactory.generate(); + CassandraMessageId messageId2 = messageIdFactory.generate(); + CassandraId mailboxId = CassandraId.timeBased(); + MessageUid messageUid = MessageUid.of(1); + MessageUid messageUid2 = MessageUid.of(2); + + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + Flux.merge(testee.insert(composedMessageIdWithMetaData), + testee.insert(composedMessageIdWithMetaData2)) + .blockLast(); + + assertThat(testee.retrieveMessages(mailboxId, MessageRange.all(), Limit.limit(1)).toIterable()) + .containsOnly(composedMessageIdWithMetaData); + } + + @Test void retrieveMessagesShouldRetrieveSomeWhenRangeFrom() { CassandraMessageId messageId = messageIdFactory.generate(); CassandraMessageId messageId2 = messageIdFactory.generate(); @@ -429,9 +456,42 @@ class CassandraMessageIdDAOTest { testee.insert(composedMessageIdWithMetaData2)) .blockLast(); - assertThat(testee.retrieveMessages(mailboxId, MessageRange.from(messageUid2)).toIterable()) + assertThat(testee.retrieveMessages(mailboxId, MessageRange.from(messageUid2), Limit.unlimited()).toIterable()) .containsOnly(composedMessageIdWithMetaData, composedMessageIdWithMetaData2); } + @Test + void retrieveMessagesShouldAppluLimitWhenRangeFrom() { + CassandraMessageId messageId = messageIdFactory.generate(); + CassandraMessageId messageId2 = messageIdFactory.generate(); + CassandraMessageId messageId3 = messageIdFactory.generate(); + CassandraId mailboxId = CassandraId.timeBased(); + MessageUid messageUid = MessageUid.of(1); + MessageUid messageUid2 = MessageUid.of(2); + MessageUid messageUid3 = MessageUid.of(3); + + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId3, messageUid3)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + Flux.merge(testee.insert( + ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build()), + testee.insert(composedMessageIdWithMetaData), + testee.insert(composedMessageIdWithMetaData2)) + .blockLast(); + + assertThat(testee.retrieveMessages(mailboxId, MessageRange.from(messageUid2), Limit.limit(1)).toIterable()) + .containsOnly(composedMessageIdWithMetaData); + } @Test void retrieveMessagesShouldRetrieveSomeWhenRange() { @@ -472,11 +532,54 @@ class CassandraMessageIdDAOTest { .build())) .blockLast(); - assertThat(testee.retrieveMessages(mailboxId, MessageRange.range(messageUid2, messageUid3)).toIterable()) + assertThat(testee.retrieveMessages(mailboxId, MessageRange.range(messageUid2, messageUid3), Limit.unlimited()).toIterable()) .containsOnly(composedMessageIdWithMetaData, composedMessageIdWithMetaData2); } @Test + void retrieveMessagesShouldApplyLimitWhenRange() { + CassandraMessageId messageId = messageIdFactory.generate(); + CassandraMessageId messageId2 = messageIdFactory.generate(); + CassandraMessageId messageId3 = messageIdFactory.generate(); + CassandraMessageId messageId4 = messageIdFactory.generate(); + CassandraId mailboxId = CassandraId.timeBased(); + MessageUid messageUid = MessageUid.of(1); + MessageUid messageUid2 = MessageUid.of(2); + MessageUid messageUid3 = MessageUid.of(3); + MessageUid messageUid4 = MessageUid.of(4); + + testee.insert(ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId, messageUid)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build()) + .block(); + + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId2, messageUid2)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + + ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId3, messageUid3)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build(); + Flux.merge(testee.insert(composedMessageIdWithMetaData), + testee.insert(composedMessageIdWithMetaData2), + testee.insert(ComposedMessageIdWithMetaData.builder() + .composedMessageId(new ComposedMessageId(mailboxId, messageId4, messageUid4)) + .flags(new Flags()) + .modSeq(ModSeq.of(1)) + .build())) + .blockLast(); + + assertThat(testee.retrieveMessages(mailboxId, MessageRange.range(messageUid2, messageUid3), Limit.limit(1)).toIterable()) + .containsOnly(composedMessageIdWithMetaData); + } + + @Test void retrieveMessagesShouldRetrieveOneWhenRangeOne() { CassandraMessageId messageId = messageIdFactory.generate(); CassandraMessageId messageId2 = messageIdFactory.generate(); @@ -505,7 +608,7 @@ class CassandraMessageIdDAOTest { .build())) .blockLast(); - assertThat(testee.retrieveMessages(mailboxId, MessageRange.one(messageUid2)).toIterable()) + assertThat(testee.retrieveMessages(mailboxId, MessageRange.one(messageUid2), Limit.unlimited()).toIterable()) .containsOnly(composedMessageIdWithMetaData); } } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java index 891529a..104a72a 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java @@ -39,6 +39,7 @@ import org.apache.james.mailbox.model.MessageRange; import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.MessageIdMapperTest; +import org.apache.james.util.streams.Limit; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -103,7 +104,7 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Metadata)) .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -127,7 +128,7 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Metadata)) .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -151,7 +152,7 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Metadata)) .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -175,7 +176,7 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest { SoftAssertions.assertSoftly(Throwing.consumer(softly -> { softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), MessageMapper.FetchType.Metadata)) .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java index 97f86ef..b97cf66 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java @@ -36,8 +36,8 @@ import org.apache.james.mailbox.store.mail.MessageMapper.FetchType; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.MapperProvider; import org.apache.james.mailbox.store.mail.model.MessageMapperTest; +import org.apache.james.util.streams.Limit; import org.assertj.core.api.SoftAssertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -55,7 +55,6 @@ class CassandraMessageMapperTest extends MessageMapperTest { return new CassandraMapperProvider(cassandraCluster.getCassandraCluster()); } - @Disabled("Currently generates a read to messageV2 per stored message despite the limit") @Test void findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster cassandra) throws MailboxException { saveMessages(); @@ -98,7 +97,7 @@ class CassandraMessageMapperTest extends MessageMapperTest { softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Metadata, 1)) .toIterable() .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -121,7 +120,7 @@ class CassandraMessageMapperTest extends MessageMapperTest { softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Metadata, 1)) .toIterable() .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -144,7 +143,7 @@ class CassandraMessageMapperTest extends MessageMapperTest { softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Metadata, 1)) .toIterable() .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } @@ -167,7 +166,7 @@ class CassandraMessageMapperTest extends MessageMapperTest { softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, MessageRange.all(), FetchType.Metadata, 1)) .toIterable() .isEmpty(); - softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block()) + softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) benwaInboxMailbox.getMailboxId(), MessageRange.all(), Limit.unlimited()).collectList().block()) .isEmpty(); })); } diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java index 466f1fb..70ffbbd 100644 --- a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java +++ b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java @@ -61,6 +61,10 @@ public class Limit { return limit; } + public boolean isUnlimited() { + return !limit.isPresent(); + } + public <T> Stream<T> applyOnStream(Stream<T> stream) { return limit .map(stream::limit) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
