MAILBOX-296 Allow EXPUNGE to go beyond 5.000

We were limited by the initial rea, which produced a timeout after 5000 ms.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bdd8dd94
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bdd8dd94
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bdd8dd94

Branch: refs/heads/master
Commit: bdd8dd9421cac2194145b52dbb2b7927373fdc35
Parents: 133fbd4
Author: benwa <[email protected]>
Authored: Tue May 23 18:00:52 2017 +0700
Committer: benwa <[email protected]>
Committed: Thu May 25 09:21:22 2017 +0700

----------------------------------------------------------------------
 .../mailbox/cassandra/mail/CassandraMessageMapper.java | 13 +++++--------
 1 file changed, 5 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/bdd8dd94/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 195a98f..b256153 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -53,7 +53,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
 import org.apache.james.util.streams.JamesCollectors;
@@ -219,14 +218,12 @@ public class CassandraMessageMapper implements 
MessageMapper {
     public Map<MessageUid, MessageMetaData> 
expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
-        return 
FluentFutureStream.of(deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange))
-            .completableFuture()
-            .thenApply(JamesCollectors.chunk(EXPUNGE_BATCH_SIZE))
-            .thenCompose(chunkedExpungedUids ->
-                CompletableFutureUtil.chainAll(chunkedExpungedUids,
-                    uidChunk ->  expungeUidChunk(mailboxId, uidChunk)))
-            .thenApply(s -> s.flatMap(i -> i))
+        return deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange)
             .join()
+            .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE))
+            .values().stream()
+            .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk))
+            .flatMap(CompletableFuture::join)
             .collect(Guavate.toImmutableMap(MailboxMessage::getUid, 
SimpleMessageMetaData::new));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to