MAILBOX-296 Batch EXPUNGE command execution with Cassandra

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c4e085cf
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c4e085cf
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c4e085cf

Branch: refs/heads/master
Commit: c4e085cff55712f6bff609aa98d1014ba9d90896
Parents: 0a243a9
Author: benwa <[email protected]>
Authored: Thu May 18 15:25:47 2017 +0700
Committer: benwa <[email protected]>
Committed: Fri May 19 17:30:35 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraMessageMapper.java  | 32 +++++++++++++-------
 .../james/util/streams/JamesCollectors.java     |  8 +++++
 2 files changed, 29 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/c4e085cf/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index c00659a..195a98f 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -53,8 +53,10 @@ import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.CompletableFutureUtil;
 import org.apache.james.util.FluentFutureStream;
 import org.apache.james.util.OptionalConverter;
+import org.apache.james.util.streams.JamesCollectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +70,7 @@ public class CassandraMessageMapper implements MessageMapper {
         .count(0L)
         .unseen(0L)
         .build();
+    public static final int EXPUNGE_BATCH_SIZE = 100;
 
     private final CassandraModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
@@ -212,25 +215,32 @@ public class CassandraMessageMapper implements 
MessageMapper {
             .orElse(null);
     }
 
+    @Override
     public Map<MessageUid, MessageMetaData> 
expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return 
FluentFutureStream.of(deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange))
-            .thenComposeOnAll(
-                messageId ->
-                    messageIdDAO.retrieve(mailboxId, messageId))
-            .flatMap(OptionalConverter::toStream)
-            .thenCompose(ids ->
-                retrieveMessages(
-                    ids.collect(Guavate.toImmutableList()),
-                    FetchType.Metadata,
-                    Optional.empty())
-            )
-            .performOnAll(message -> deleteAsFuture(message, mailboxId))
+            .completableFuture()
+            .thenApply(JamesCollectors.chunk(EXPUNGE_BATCH_SIZE))
+            .thenCompose(chunkedExpungedUids ->
+                CompletableFutureUtil.chainAll(chunkedExpungedUids,
+                    uidChunk ->  expungeUidChunk(mailboxId, uidChunk)))
+            .thenApply(s -> s.flatMap(i -> i))
             .join()
             .collect(Guavate.toImmutableMap(MailboxMessage::getUid, 
SimpleMessageMetaData::new));
     }
 
+    private CompletableFuture<Stream<SimpleMailboxMessage>> 
expungeUidChunk(CassandraId mailboxId, List<MessageUid> uidChunk) {
+        return FluentFutureStream.of(uidChunk.stream()
+            .map(uid -> messageIdDAO.retrieve(mailboxId, uid)))
+            .flatMap(OptionalConverter::toStream)
+            .performOnAll(this::deleteUsingMailboxId)
+            .thenComposeOnAll(idWithMetadata -> 
messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), 
FetchType.Metadata, Optional.empty()))
+            .flatMap(s -> s)
+            .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of()))
+            .completableFuture();
+    }
+
     @Override
     public MessageMetaData move(Mailbox destinationMailbox, MailboxMessage 
original) throws MailboxException {
         CassandraId originalMailboxId = (CassandraId) original.getMailboxId();

http://git-wip-us.apache.org/repos/asf/james-project/blob/c4e085cf/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
----------------------------------------------------------------------
diff --git 
a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
 
b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
index 10c34bd..1fd653a 100644
--- 
a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
+++ 
b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java
@@ -22,8 +22,10 @@ package org.apache.james.util.streams;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import com.google.common.base.Preconditions;
 
@@ -33,4 +35,10 @@ public class JamesCollectors {
         AtomicInteger counter = new AtomicInteger(-1);
         return Collectors.groupingBy(x -> counter.incrementAndGet() / 
chunkSize);
     }
+
+    public static <D> Function<Stream<D>, Stream<List<D>>> chunk(int 
chunkSize) {
+        return stream -> stream.collect(chunker(chunkSize))
+            .values()
+            .stream();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to