This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c5932ce62f601dc9e940972ba091404b1acfa850
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Feb 14 16:13:12 2019 +0700

    JAMES-2630 Ensure Iterable flux get published on ElasticScheduler before 
rate limiting
---
 .../org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java   | 2 ++
 .../apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java  | 3 +++
 2 files changed, 5 insertions(+)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index 29e9738..ae37a96 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -91,6 +91,7 @@ import com.google.common.primitives.Bytes;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.util.function.Tuple2;
 
 public class CassandraMessageDAO {
@@ -232,6 +233,7 @@ public class CassandraMessageDAO {
 
     public Flux<MessageResult> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Limit limit) {
         return 
Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
+            .publishOn(Schedulers.elastic())
             .limitRate(configuration.getMessageReadChunkSize())
             .flatMap(id -> retrieveRow(id, fetchType)
                 .flatMap(resultSet -> message(resultSet, id, fetchType)));
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index fb0eb4f..f74b2ca 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -56,6 +56,7 @@ import com.google.common.collect.Multimap;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class CassandraMessageIdMapper implements MessageIdMapper {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMessageIdMapper.class);
@@ -91,6 +92,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     @Override
     public List<MailboxMessage> find(Collection<MessageId> messageIds, 
FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
+            .publishOn(Schedulers.elastic())
             .limitRate(cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty()))
             .collectList()
@@ -178,6 +180,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         Flux.fromIterable(ids.asMap()
             .entrySet())
             .limitRate(cassandraConfiguration.getExpungeChunkSize())
+            .publishOn(Schedulers.elastic())
             .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
             .then()
             .block();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to