MAILBOX-296 Batch EXPUNGE command execution with Cassandra
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/c4e085cf Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/c4e085cf Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/c4e085cf Branch: refs/heads/master Commit: c4e085cff55712f6bff609aa98d1014ba9d90896 Parents: 0a243a9 Author: benwa <[email protected]> Authored: Thu May 18 15:25:47 2017 +0700 Committer: benwa <[email protected]> Committed: Fri May 19 17:30:35 2017 +0700 ---------------------------------------------------------------------- .../cassandra/mail/CassandraMessageMapper.java | 32 +++++++++++++------- .../james/util/streams/JamesCollectors.java | 8 +++++ 2 files changed, 29 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/c4e085cf/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index c00659a..195a98f 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -53,8 +53,10 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalConverter; +import org.apache.james.util.streams.JamesCollectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +70,7 @@ public class CassandraMessageMapper implements MessageMapper { .count(0L) .unseen(0L) .build(); + public static final int EXPUNGE_BATCH_SIZE = 100; private final CassandraModSeqProvider modSeqProvider; private final MailboxSession mailboxSession; @@ -212,25 +215,32 @@ public class CassandraMessageMapper implements MessageMapper { .orElse(null); } + @Override public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return FluentFutureStream.of(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)) - .thenComposeOnAll( - messageId -> - messageIdDAO.retrieve(mailboxId, messageId)) - .flatMap(OptionalConverter::toStream) - .thenCompose(ids -> - retrieveMessages( - ids.collect(Guavate.toImmutableList()), - FetchType.Metadata, - Optional.empty()) - ) - .performOnAll(message -> deleteAsFuture(message, mailboxId)) + .completableFuture() + .thenApply(JamesCollectors.chunk(EXPUNGE_BATCH_SIZE)) + .thenCompose(chunkedExpungedUids -> + CompletableFutureUtil.chainAll(chunkedExpungedUids, + uidChunk -> expungeUidChunk(mailboxId, uidChunk))) + .thenApply(s -> s.flatMap(i -> i)) .join() .collect(Guavate.toImmutableMap(MailboxMessage::getUid, SimpleMessageMetaData::new)); } + private CompletableFuture<Stream<SimpleMailboxMessage>> expungeUidChunk(CassandraId mailboxId, List<MessageUid> uidChunk) { + return FluentFutureStream.of(uidChunk.stream() + .map(uid -> messageIdDAO.retrieve(mailboxId, uid))) + .flatMap(OptionalConverter::toStream) + .performOnAll(this::deleteUsingMailboxId) + .thenComposeOnAll(idWithMetadata -> messageDAO.retrieveMessages(ImmutableList.of(idWithMetadata), FetchType.Metadata, Optional.empty())) + .flatMap(s -> s) + .map(pair -> pair.getKey().toMailboxMessage(ImmutableList.of())) + .completableFuture(); + } + @Override public MessageMetaData move(Mailbox destinationMailbox, MailboxMessage original) throws MailboxException { CassandraId originalMailboxId = (CassandraId) original.getMailboxId(); http://git-wip-us.apache.org/repos/asf/james-project/blob/c4e085cf/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java ---------------------------------------------------------------------- diff --git a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java index 10c34bd..1fd653a 100644 --- a/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java +++ b/server/container/util-java8/src/main/java/org/apache/james/util/streams/JamesCollectors.java @@ -22,8 +22,10 @@ package org.apache.james.util.streams; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collector; import java.util.stream.Collectors; +import java.util.stream.Stream; import com.google.common.base.Preconditions; @@ -33,4 +35,10 @@ public class JamesCollectors { AtomicInteger counter = new AtomicInteger(-1); return Collectors.groupingBy(x -> counter.incrementAndGet() / chunkSize); } + + public static <D> Function<Stream<D>, Stream<List<D>>> chunk(int chunkSize) { + return stream -> stream.collect(chunker(chunkSize)) + .values() + .stream(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
