This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit db80c6c29de11e37f79493844e665b190bb375db
Author: Benoit Tellier <[email protected]>
AuthorDate: Mon Jun 8 14:52:24 2020 +0700

    JAMES-3204 Push limit to Cassandra backend when reading messages
---
 .../mailbox/cassandra/DeleteMessageListener.java   |   3 +-
 .../cassandra/mail/CassandraMessageIdDAO.java      |  95 +++++++++++++-----
 .../cassandra/mail/CassandraMessageMapper.java     |   9 +-
 .../mail/task/MailboxMergingTaskRunner.java        |   3 +-
 .../mail/task/RecomputeMailboxCountersService.java |   3 +-
 .../cassandra/CassandraMailboxManagerTest.java     |  15 +--
 .../cassandra/mail/CassandraMessageIdDAOTest.java  | 111 ++++++++++++++++++++-
 .../mail/CassandraMessageIdMapperTest.java         |   9 +-
 .../cassandra/mail/CassandraMessageMapperTest.java |  11 +-
 .../java/org/apache/james/util/streams/Limit.java  |   4 +
 10 files changed, 211 insertions(+), 52 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 53b1d2e..e3d2eab 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -51,6 +51,7 @@ import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.util.streams.Limit;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -137,7 +138,7 @@ public class DeleteMessageListener implements 
MailboxListener.GroupMailboxListen
     }
 
     private Mono<Void> handleMailboxDeletion(CassandraId mailboxId) {
-        return messageIdDAO.retrieveMessages(mailboxId, MessageRange.all())
+        return messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), 
Limit.unlimited())
             .map(ComposedMessageIdWithMetaData::getComposedMessageId)
             .concatMap(metadata -> 
handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) 
metadata.getMessageId(), mailboxId)
                 .then(imapUidDAO.delete((CassandraMessageId) 
metadata.getMessageId(), mailboxId))
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
index d15e121..82beedf 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAO.java
@@ -57,6 +57,7 @@ import 
org.apache.james.mailbox.cassandra.ids.CassandraMessageId.Factory;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.util.streams.Limit;
 
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
@@ -71,6 +72,7 @@ public class CassandraMessageIdDAO {
 
     private static final String IMAP_UID_GTE = IMAP_UID + "_GTE";
     private static final String IMAP_UID_LTE = IMAP_UID + "_LTE";
+    public static final String LIMIT = "LIMIT_BIND_MARKER";
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final Factory messageIdFactory;
@@ -78,8 +80,11 @@ public class CassandraMessageIdDAO {
     private final PreparedStatement insert;
     private final PreparedStatement select;
     private final PreparedStatement selectAllUids;
+    private final PreparedStatement selectAllUidsLimited;
     private final PreparedStatement selectUidGte;
+    private final PreparedStatement selectUidGteLimited;
     private final PreparedStatement selectUidRange;
+    private final PreparedStatement selectUidRangeLimited;
     private final PreparedStatement update;
     private final PreparedStatement listStatement;
 
@@ -92,8 +97,11 @@ public class CassandraMessageIdDAO {
         this.update = prepareUpdate(session);
         this.select = prepareSelect(session);
         this.selectAllUids = prepareSelectAllUids(session);
+        this.selectAllUidsLimited = prepareSelectAllUidsLimited(session);
         this.selectUidGte = prepareSelectUidGte(session);
+        this.selectUidGteLimited = prepareSelectUidGteLimited(session);
         this.selectUidRange = prepareSelectUidRange(session);
+        this.selectUidRangeLimited = prepareSelectUidRangeLimited(session);
         this.listStatement = prepareList(session);
     }
 
@@ -144,8 +152,15 @@ public class CassandraMessageIdDAO {
 
     private PreparedStatement prepareSelectAllUids(Session session) {
         return session.prepare(select(FIELDS)
-                .from(TABLE_NAME)
-                .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+    }
+
+    private PreparedStatement prepareSelectAllUidsLimited(Session session) {
+        return session.prepare(select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
+            .limit(bindMarker(LIMIT)));
     }
 
     private PreparedStatement prepareList(Session session) {
@@ -155,17 +170,34 @@ public class CassandraMessageIdDAO {
 
     private PreparedStatement prepareSelectUidGte(Session session) {
         return session.prepare(select(FIELDS)
-                .from(TABLE_NAME)
-                .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
-                .and(gte(IMAP_UID, bindMarker(IMAP_UID))));
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
+            .and(gte(IMAP_UID, bindMarker(IMAP_UID))));
+    }
+
+    private PreparedStatement prepareSelectUidGteLimited(Session session) {
+        return session.prepare(select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
+            .and(gte(IMAP_UID, bindMarker(IMAP_UID)))
+            .limit(bindMarker(LIMIT)));
     }
 
     private PreparedStatement prepareSelectUidRange(Session session) {
         return session.prepare(select(FIELDS)
-                .from(TABLE_NAME)
-                .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
-                .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE)))
-                .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE))));
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
+            .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE)))
+            .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE))));
+    }
+
+    private PreparedStatement prepareSelectUidRangeLimited(Session session) {
+        return session.prepare(select(FIELDS)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID)))
+            .and(gte(IMAP_UID, bindMarker(IMAP_UID_GTE)))
+            .and(lte(IMAP_UID, bindMarker(IMAP_UID_LTE)))
+            .limit(bindMarker(LIMIT)));
     }
 
     public Mono<Void> delete(CassandraId mailboxId, MessageUid uid) {
@@ -226,8 +258,8 @@ public class CassandraMessageIdDAO {
                 .setLong(IMAP_UID, uid.asLong()));
     }
 
-    public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId 
mailboxId, MessageRange set) {
-        return retrieveRows(mailboxId, set)
+    public Flux<ComposedMessageIdWithMetaData> retrieveMessages(CassandraId 
mailboxId, MessageRange set, Limit limit) {
+        return retrieveRows(mailboxId, set, limit)
             .map(this::fromRowToComposedMessageIdWithFlags);
     }
 
@@ -236,36 +268,51 @@ public class CassandraMessageIdDAO {
             .map(this::fromRowToComposedMessageIdWithFlags);
     }
 
-    private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set) {
+    private Flux<Row> retrieveRows(CassandraId mailboxId, MessageRange set, 
Limit limit) {
         switch (set.getType()) {
         case ALL:
-            return selectAll(mailboxId);
+            return selectAll(mailboxId, limit);
         case FROM:
-            return selectFrom(mailboxId, set.getUidFrom());
+            return selectFrom(mailboxId, set.getUidFrom(), limit);
         case RANGE:
-            return selectRange(mailboxId, set.getUidFrom(), set.getUidTo());
+            return selectRange(mailboxId, set.getUidFrom(), set.getUidTo(), 
limit);
         case ONE:
             return Flux.concat(selectOneRow(mailboxId, set.getUidFrom()));
         }
         throw new UnsupportedOperationException();
     }
 
-    private Flux<Row> selectAll(CassandraId mailboxId) {
-        return cassandraAsyncExecutor.executeRows(selectAllUids.bind()
-                .setUUID(MAILBOX_ID, mailboxId.asUuid()));
+    private Flux<Row> selectAll(CassandraId mailboxId, Limit limit) {
+        return cassandraAsyncExecutor.executeRows(limit.getLimit()
+            .map(limitAsInt -> selectAllUidsLimited.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setInt(LIMIT, limitAsInt))
+            .orElse(selectAllUids.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())));
     }
 
-    private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid) {
-        return cassandraAsyncExecutor.executeRows(selectUidGte.bind()
+    private Flux<Row> selectFrom(CassandraId mailboxId, MessageUid uid, Limit 
limit) {
+        return cassandraAsyncExecutor.executeRows(limit.getLimit()
+            .map(limitAsInt -> selectUidGteLimited.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
-                .setLong(IMAP_UID, uid.asLong()));
+                .setLong(IMAP_UID, uid.asLong())
+                .setInt(LIMIT, limitAsInt))
+            .orElse(selectUidGte.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(IMAP_UID, uid.asLong())));
     }
 
-    private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, 
MessageUid to) {
-        return cassandraAsyncExecutor.executeRows(selectUidRange.bind()
+    private Flux<Row> selectRange(CassandraId mailboxId, MessageUid from, 
MessageUid to, Limit limit) {
+        return cassandraAsyncExecutor.executeRows(limit.getLimit()
+            .map(limitAsInt -> selectUidRangeLimited.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(IMAP_UID_GTE, from.asLong())
+                .setLong(IMAP_UID_LTE, to.asLong())
+                .setInt(LIMIT, limitAsInt))
+            .orElse(selectUidRange.bind()
                 .setUUID(MAILBOX_ID, mailboxId.asUuid())
                 .setLong(IMAP_UID_GTE, from.asLong())
-                .setLong(IMAP_UID_LTE, to.asLong()));
+                .setLong(IMAP_UID_LTE, to.asLong())));
     }
 
     private ComposedMessageIdWithMetaData 
fromRowToComposedMessageIdWithFlags(Row row) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 9b8c18e..39a7757 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -111,7 +111,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     @Override
     public Flux<MessageUid> listAllMessageUids(Mailbox mailbox) {
         CassandraId cassandraId = (CassandraId) mailbox.getMailboxId();
-        return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all())
+        return messageIdDAO.retrieveMessages(cassandraId, MessageRange.all(), 
Limit.unlimited())
             .map(metaData -> metaData.getComposedMessageId().getUid());
     }
 
@@ -167,10 +167,11 @@ public class CassandraMessageMapper implements 
MessageMapper {
     }
 
     @Override
-    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, 
MessageRange messageRange, FetchType ftype, int limit) {
+    public Flux<MailboxMessage> findInMailboxReactive(Mailbox mailbox, 
MessageRange messageRange, FetchType ftype, int limitAsInt) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return 
Limit.from(limit).applyOnFlux(messageIdDAO.retrieveMessages(mailboxId, 
messageRange))
+        Limit limit = Limit.from(limitAsInt);
+        return limit.applyOnFlux(messageIdDAO.retrieveMessages(mailboxId, 
messageRange, limit))
             .flatMap(id -> retrieveMessage(id, ftype), 
cassandraConfiguration.getMessageReadChunkSize())
             .sort(Comparator.comparing(MailboxMessage::getUid));
     }
@@ -291,7 +292,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
     public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, 
FlagsUpdateCalculator flagUpdateCalculator, MessageRange range) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        Flux<ComposedMessageIdWithMetaData> toBeUpdated = 
messageIdDAO.retrieveMessages(mailboxId, range);
+        Flux<ComposedMessageIdWithMetaData> toBeUpdated = 
messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited());
 
         FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, 
toBeUpdated, flagUpdateCalculator).block();
         FlagsUpdateStageResult finalResult = 
handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
index 8e7c6e0..2de3caf 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/MailboxMergingTaskRunner.java
@@ -37,6 +37,7 @@ import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.StoreMessageIdManager;
 import org.apache.james.task.Task;
+import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,7 +69,7 @@ public class MailboxMergingTaskRunner {
     }
 
     private Task.Result moveMessages(CassandraId oldMailboxId, CassandraId 
newMailboxId, MailboxSession session, MailboxMergingTask.Context context) {
-        return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, 
MessageRange.all())
+        return cassandraMessageIdDAO.retrieveMessages(oldMailboxId, 
MessageRange.all(), Limit.unlimited())
             .map(ComposedMessageIdWithMetaData::getComposedMessageId)
             .map(messageId -> moveMessage(newMailboxId, messageId, session, 
context))
             .reduce(Task.Result.COMPLETED, Task::combine)
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index 7bb4093..be1fbec 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.model.MailboxCounters;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.task.Task;
 import org.apache.james.task.Task.Result;
+import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -216,7 +217,7 @@ public class RecomputeMailboxCountersService {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         Counter counter = new Counter(mailboxId);
 
-        return imapUidToMessageIdDAO.retrieveMessages(mailboxId, 
MessageRange.all())
+        return imapUidToMessageIdDAO.retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited())
             .flatMap(message -> latestMetadata(mailboxId, message, options), 
MESSAGE_CONCURRENCY)
             .doOnNext(counter::process)
             .then(Mono.defer(() -> 
counterDAO.resetCounters(counter.snapshot())))
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
index 6e35beb..cc1a026 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraMailboxManagerTest.java
@@ -69,6 +69,7 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
 import org.apache.james.util.ClassLoaderUtils;
 import org.apache.james.util.streams.Iterators;
+import org.apache.james.util.streams.Limit;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Nested;
@@ -140,7 +141,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -179,7 +180,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -218,7 +219,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -257,7 +258,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -296,7 +297,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -425,7 +426,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
@@ -492,7 +493,7 @@ public class CassandraMailboxManagerTest extends 
MailboxManagerTest<CassandraMai
                 
softly.assertThat(imapUidDAO(cassandraCluster).retrieve(cassandraMessageId, 
Optional.of(mailboxId)).collectList().block())
                     .isEmpty();
 
-                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all()).collectList().block())
+                
softly.assertThat(messageIdDAO(cassandraCluster).retrieveMessages(mailboxId, 
MessageRange.all(), Limit.unlimited()).collectList().block())
                     .isEmpty();
 
                 
softly.assertThat(attachmentDAO(cassandraCluster).getAttachment(attachmentId).blockOptional())
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
index 8188a29..8f53020 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdDAOTest.java
@@ -37,6 +37,7 @@ import 
org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.util.streams.Limit;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -395,11 +396,37 @@ class CassandraMessageIdDAOTest {
                 testee.insert(composedMessageIdWithMetaData2))
         .blockLast();
 
-        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.all()).toIterable())
+        assertThat(testee.retrieveMessages(mailboxId, MessageRange.all(), 
Limit.unlimited()).toIterable())
             .containsOnly(composedMessageIdWithMetaData, 
composedMessageIdWithMetaData2);
     }
 
     @Test
+    void retrieveMessagesShouldApplyLimitWhenRangeAll() {
+        CassandraMessageId messageId = messageIdFactory.generate();
+        CassandraMessageId messageId2 = messageIdFactory.generate();
+        CassandraId mailboxId = CassandraId.timeBased();
+        MessageUid messageUid = MessageUid.of(1);
+        MessageUid messageUid2 = MessageUid.of(2);
+
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build();
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = 
ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, 
messageId2, messageUid2))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build();
+        Flux.merge(testee.insert(composedMessageIdWithMetaData),
+                testee.insert(composedMessageIdWithMetaData2))
+            .blockLast();
+
+        assertThat(testee.retrieveMessages(mailboxId, MessageRange.all(), 
Limit.limit(1)).toIterable())
+            .containsOnly(composedMessageIdWithMetaData);
+    }
+
+    @Test
     void retrieveMessagesShouldRetrieveSomeWhenRangeFrom() {
         CassandraMessageId messageId = messageIdFactory.generate();
         CassandraMessageId messageId2 = messageIdFactory.generate();
@@ -429,9 +456,42 @@ class CassandraMessageIdDAOTest {
                 testee.insert(composedMessageIdWithMetaData2))
         .blockLast();
 
-        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.from(messageUid2)).toIterable())
+        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.from(messageUid2), Limit.unlimited()).toIterable())
             .containsOnly(composedMessageIdWithMetaData, 
composedMessageIdWithMetaData2);
     }
+    @Test
+    void retrieveMessagesShouldAppluLimitWhenRangeFrom() {
+        CassandraMessageId messageId = messageIdFactory.generate();
+        CassandraMessageId messageId2 = messageIdFactory.generate();
+        CassandraMessageId messageId3 = messageIdFactory.generate();
+        CassandraId mailboxId = CassandraId.timeBased();
+        MessageUid messageUid = MessageUid.of(1);
+        MessageUid messageUid2 = MessageUid.of(2);
+        MessageUid messageUid3 = MessageUid.of(3);
+
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
+            .composedMessageId(new ComposedMessageId(mailboxId, messageId2, 
messageUid2))
+            .flags(new Flags())
+            .modSeq(ModSeq.of(1))
+            .build();
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = 
ComposedMessageIdWithMetaData.builder()
+            .composedMessageId(new ComposedMessageId(mailboxId, messageId3, 
messageUid3))
+            .flags(new Flags())
+            .modSeq(ModSeq.of(1))
+            .build();
+        Flux.merge(testee.insert(
+            ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build()),
+            testee.insert(composedMessageIdWithMetaData),
+            testee.insert(composedMessageIdWithMetaData2))
+            .blockLast();
+
+        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.from(messageUid2), Limit.limit(1)).toIterable())
+            .containsOnly(composedMessageIdWithMetaData);
+    }
 
     @Test
     void retrieveMessagesShouldRetrieveSomeWhenRange() {
@@ -472,11 +532,54 @@ class CassandraMessageIdDAOTest {
                     .build()))
         .blockLast();
 
-        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.range(messageUid2, messageUid3)).toIterable())
+        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.range(messageUid2, messageUid3), Limit.unlimited()).toIterable())
             .containsOnly(composedMessageIdWithMetaData, 
composedMessageIdWithMetaData2);
     }
 
     @Test
+    void retrieveMessagesShouldApplyLimitWhenRange() {
+        CassandraMessageId messageId = messageIdFactory.generate();
+        CassandraMessageId messageId2 = messageIdFactory.generate();
+        CassandraMessageId messageId3 = messageIdFactory.generate();
+        CassandraMessageId messageId4 = messageIdFactory.generate();
+        CassandraId mailboxId = CassandraId.timeBased();
+        MessageUid messageUid = MessageUid.of(1);
+        MessageUid messageUid2 = MessageUid.of(2);
+        MessageUid messageUid3 = MessageUid.of(3);
+        MessageUid messageUid4 = MessageUid.of(4);
+
+        testee.insert(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, messageId, 
messageUid))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+            .build())
+            .block();
+
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData = 
ComposedMessageIdWithMetaData.builder()
+            .composedMessageId(new ComposedMessageId(mailboxId, messageId2, 
messageUid2))
+            .flags(new Flags())
+            .modSeq(ModSeq.of(1))
+            .build();
+
+        ComposedMessageIdWithMetaData composedMessageIdWithMetaData2 = 
ComposedMessageIdWithMetaData.builder()
+            .composedMessageId(new ComposedMessageId(mailboxId, messageId3, 
messageUid3))
+            .flags(new Flags())
+            .modSeq(ModSeq.of(1))
+            .build();
+        Flux.merge(testee.insert(composedMessageIdWithMetaData),
+            testee.insert(composedMessageIdWithMetaData2),
+            testee.insert(ComposedMessageIdWithMetaData.builder()
+                .composedMessageId(new ComposedMessageId(mailboxId, 
messageId4, messageUid4))
+                .flags(new Flags())
+                .modSeq(ModSeq.of(1))
+                .build()))
+            .blockLast();
+
+        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.range(messageUid2, messageUid3), Limit.limit(1)).toIterable())
+            .containsOnly(composedMessageIdWithMetaData);
+    }
+
+    @Test
     void retrieveMessagesShouldRetrieveOneWhenRangeOne() {
         CassandraMessageId messageId = messageIdFactory.generate();
         CassandraMessageId messageId2 = messageIdFactory.generate();
@@ -505,7 +608,7 @@ class CassandraMessageIdDAOTest {
                     .build()))
         .blockLast();
 
-        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.one(messageUid2)).toIterable())
+        assertThat(testee.retrieveMessages(mailboxId, 
MessageRange.one(messageUid2), Limit.unlimited()).toIterable())
             .containsOnly(composedMessageIdWithMetaData);
     }
 }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
index 891529a..104a72a 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
@@ -39,6 +39,7 @@ import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.MessageIdMapperTest;
+import org.apache.james.util.streams.Limit;
 import org.assertj.core.api.SoftAssertions;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
@@ -103,7 +104,7 @@ class CassandraMessageIdMapperTest extends 
MessageIdMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 
softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), 
MessageMapper.FetchType.Metadata))
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -127,7 +128,7 @@ class CassandraMessageIdMapperTest extends 
MessageIdMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 
softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), 
MessageMapper.FetchType.Metadata))
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -151,7 +152,7 @@ class CassandraMessageIdMapperTest extends 
MessageIdMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 
softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), 
MessageMapper.FetchType.Metadata))
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -175,7 +176,7 @@ class CassandraMessageIdMapperTest extends 
MessageIdMapperTest {
             SoftAssertions.assertSoftly(Throwing.consumer(softly -> {
                 
softly.assertThat(sut.find(ImmutableList.of(message1.getMessageId()), 
MessageMapper.FetchType.Metadata))
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index 97f86ef..b97cf66 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -36,8 +36,8 @@ import 
org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.MapperProvider;
 import org.apache.james.mailbox.store.mail.model.MessageMapperTest;
+import org.apache.james.util.streams.Limit;
 import org.assertj.core.api.SoftAssertions;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -55,7 +55,6 @@ class CassandraMessageMapperTest extends MessageMapperTest {
         return new 
CassandraMapperProvider(cassandraCluster.getCassandraCluster());
     }
 
-    @Disabled("Currently generates a read to messageV2 per stored message 
despite the limit")
     @Test
     void 
findInMailboxLimitShouldLimitProjectionReadCassandraQueries(CassandraCluster 
cassandra) throws MailboxException {
         saveMessages();
@@ -98,7 +97,7 @@ class CassandraMessageMapperTest extends MessageMapperTest {
                 
softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, 
MessageRange.all(), FetchType.Metadata, 1))
                     .toIterable()
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -121,7 +120,7 @@ class CassandraMessageMapperTest extends MessageMapperTest {
                 
softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, 
MessageRange.all(), FetchType.Metadata, 1))
                     .toIterable()
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -144,7 +143,7 @@ class CassandraMessageMapperTest extends MessageMapperTest {
                 
softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, 
MessageRange.all(), FetchType.Metadata, 1))
                     .toIterable()
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
@@ -167,7 +166,7 @@ class CassandraMessageMapperTest extends MessageMapperTest {
                 
softly.assertThat(messageMapper.findInMailbox(benwaInboxMailbox, 
MessageRange.all(), FetchType.Metadata, 1))
                     .toIterable()
                     .isEmpty();
-                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all()).collectList().block())
+                softly.assertThat(messageIdDAO.retrieveMessages((CassandraId) 
benwaInboxMailbox.getMailboxId(), MessageRange.all(), 
Limit.unlimited()).collectList().block())
                     .isEmpty();
             }));
         }
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java 
b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
index 466f1fb..70ffbbd 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
@@ -61,6 +61,10 @@ public class Limit {
         return limit;
     }
 
+    public boolean isUnlimited() {
+        return !limit.isPresent();
+    }
+
     public <T> Stream<T> applyOnStream(Stream<T> stream) {
         return limit
             .map(stream::limit)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to