This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c5932ce62f601dc9e940972ba091404b1acfa850 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Feb 14 16:13:12 2019 +0700 JAMES-2630 Ensure Iterable flux get published on ElasticScheduler before rate limiting --- .../org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java | 2 ++ .../apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 29e9738..ae37a96 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -91,6 +91,7 @@ import com.google.common.primitives.Bytes; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; public class CassandraMessageDAO { @@ -232,6 +233,7 @@ public class CassandraMessageDAO { public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) + .publishOn(Schedulers.elastic()) .limitRate(configuration.getMessageReadChunkSize()) .flatMap(id -> retrieveRow(id, fetchType) .flatMap(resultSet -> message(resultSet, id, fetchType))); diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index fb0eb4f..f74b2ca 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -56,6 +56,7 @@ import com.google.common.collect.Multimap; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class CassandraMessageIdMapper implements MessageIdMapper { private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class); @@ -91,6 +92,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { @Override public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) { return Flux.fromStream(messageIds.stream()) + .publishOn(Schedulers.elastic()) .limitRate(cassandraConfiguration.getMessageReadChunkSize()) .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())) .collectList() @@ -178,6 +180,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { Flux.fromIterable(ids.asMap() .entrySet()) .limitRate(cassandraConfiguration.getExpungeChunkSize()) + .publishOn(Schedulers.elastic()) .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue())) .then() .block(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
