MAILBOX-296 Allow EXPUNGE to go beyond 5.000 We were limited by the initial rea, which produced a timeout after 5000 ms.
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/bdd8dd94 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/bdd8dd94 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/bdd8dd94 Branch: refs/heads/master Commit: bdd8dd9421cac2194145b52dbb2b7927373fdc35 Parents: 133fbd4 Author: benwa <[email protected]> Authored: Tue May 23 18:00:52 2017 +0700 Committer: benwa <[email protected]> Committed: Thu May 25 09:21:22 2017 +0700 ---------------------------------------------------------------------- .../mailbox/cassandra/mail/CassandraMessageMapper.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/bdd8dd94/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 195a98f..b256153 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -53,7 +53,6 @@ import org.apache.james.mailbox.store.mail.MessageMapper; import org.apache.james.mailbox.store.mail.model.Mailbox; import org.apache.james.mailbox.store.mail.model.MailboxMessage; import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.apache.james.util.CompletableFutureUtil; import org.apache.james.util.FluentFutureStream; import org.apache.james.util.OptionalConverter; import org.apache.james.util.streams.JamesCollectors; @@ -219,14 +218,12 @@ public class CassandraMessageMapper implements MessageMapper { public Map<MessageUid, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox mailbox, MessageRange messageRange) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); - return FluentFutureStream.of(deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange)) - .completableFuture() - .thenApply(JamesCollectors.chunk(EXPUNGE_BATCH_SIZE)) - .thenCompose(chunkedExpungedUids -> - CompletableFutureUtil.chainAll(chunkedExpungedUids, - uidChunk -> expungeUidChunk(mailboxId, uidChunk))) - .thenApply(s -> s.flatMap(i -> i)) + return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) .join() + .collect(JamesCollectors.chunker(EXPUNGE_BATCH_SIZE)) + .values().stream() + .map(uidChunk -> expungeUidChunk(mailboxId, uidChunk)) + .flatMap(CompletableFuture::join) .collect(Guavate.toImmutableMap(MailboxMessage::getUid, SimpleMessageMetaData::new)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
