This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b07e80dc31170c53bb403e36b770269b37f1a074
Author: Gautier DI FOLCO <[email protected]>
AuthorDate: Tue Aug 20 15:06:45 2019 +0200

    JAMES-2853 Correctly limit concurrency with Reactor
---
 .../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java    | 3 +--
 .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java      | 6 ++----
 .../apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java | 4 +---
 .../mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java     | 4 ++--
 .../main/java/org/apache/james/mailbox/store/PreDeletionHooks.java  | 4 ++--
 5 files changed, 8 insertions(+), 13 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
index de53a32..f2e5319 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java
@@ -234,9 +234,8 @@ public class CassandraMessageDAO {
     public Flux<MessageResult> 
retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType 
fetchType, Limit limit) {
         return 
Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct()))
             .publishOn(Schedulers.elastic())
-            .limitRate(configuration.getMessageReadChunkSize())
             .flatMap(id -> retrieveRow(id, fetchType)
-                .flatMap(resultSet -> message(resultSet, id, fetchType)));
+                .flatMap(resultSet -> message(resultSet, id, fetchType)), 
configuration.getMessageReadChunkSize());
     }
 
     private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData 
messageId, FetchType fetchType) {
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 830ae98..33db97c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -92,8 +92,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     public List<MailboxMessage> find(Collection<MessageId> messageIds, 
FetchType fetchType) {
         return Flux.fromStream(messageIds.stream())
             .publishOn(Schedulers.elastic())
-            .limitRate(cassandraConfiguration.getMessageReadChunkSize())
-            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty()))
+            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) 
messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
             .collectList()
             .flatMapMany(composedMessageIds -> 
messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited()))
             .filter(CassandraMessageDAO.MessageResult::isFound)
@@ -178,9 +177,8 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
     public void delete(Multimap<MessageId, MailboxId> ids) {
         Flux.fromIterable(ids.asMap()
             .entrySet())
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
             .publishOn(Schedulers.elastic())
-            .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()))
+            .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), 
cassandraConfiguration.getExpungeChunkSize())
             .then()
             .block();
     }
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index bdbf91b..18ac059 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -203,7 +203,6 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return deletedMessageDAO.retrieveDeletedMessage(mailboxId, 
messageRange)
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
             .collect(Guavate.toImmutableList())
             .block();
     }
@@ -213,8 +212,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
 
         return Flux.fromStream(uids.stream())
-            .limitRate(cassandraConfiguration.getExpungeChunkSize())
-            .flatMap(messageUid -> expungeOne(mailboxId, messageUid))
+            .flatMap(messageUid -> expungeOne(mailboxId, messageUid), 
cassandraConfiguration.getExpungeChunkSize())
             .collect(Guavate.<SimpleMailboxMessage, MessageUid, 
MessageMetaData>toImmutableMap(MailboxMessage::getUid, 
MailboxMessage::metaData))
             .block();
     }
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
index 1140f4f..1ae2c67 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java
@@ -76,11 +76,11 @@ class CassandraAttachmentOwnerDAOTest {
 
     @Test
     void retrieveOwnersShouldNotThrowWhenMoreReferencesThanPaging() {
+        int concurrency = 128;
         int referenceCountExceedingPaging = 5050;
 
         Flux.range(0, referenceCountExceedingPaging)
-            .limitRate(128)
-            .flatMap(i -> testee.addOwner(ATTACHMENT_ID, 
Username.fromRawValue("owner" + i)))
+            .flatMap(i -> testee.addOwner(ATTACHMENT_ID, 
Username.fromRawValue("owner" + i)), concurrency)
             .blockLast();
 
         assertThat(testee.retrieveOwners(ATTACHMENT_ID).toIterable())
diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
index aac7b5c..b868d62 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java
@@ -34,6 +34,7 @@ import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class PreDeletionHooks {
+    private static final int CONCURRENCY = 1;
     public static final PreDeletionHooks NO_PRE_DELETION_HOOK = new 
PreDeletionHooks(ImmutableSet.of(), new NoopMetricFactory());
 
     static final String PRE_DELETION_HOOK_METRIC_NAME = "preDeletionHook";
@@ -50,9 +51,8 @@ public class PreDeletionHooks {
     public Mono<Void> runHooks(PreDeletionHook.DeleteOperation 
deleteOperation) {
         return Flux.fromIterable(hooks)
             .publishOn(Schedulers.elastic())
-            .limitRate(1)
             .flatMap(hook -> 
metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME,
-                Mono.from(hook.notifyDelete(deleteOperation))))
+                Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY)
             .then();
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to