This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b07e80dc31170c53bb403e36b770269b37f1a074 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Tue Aug 20 15:06:45 2019 +0200 JAMES-2853 Correctly limit concurrency with Reactor --- .../apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java | 3 +-- .../james/mailbox/cassandra/mail/CassandraMessageIdMapper.java | 6 ++---- .../apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java | 4 +--- .../mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java | 4 ++-- .../main/java/org/apache/james/mailbox/store/PreDeletionHooks.java | 4 ++-- 5 files changed, 8 insertions(+), 13 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index de53a32..f2e5319 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -234,9 +234,8 @@ public class CassandraMessageDAO { public Flux<MessageResult> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) { return Flux.fromStream(limit.applyOnStream(messageIds.stream().distinct())) .publishOn(Schedulers.elastic()) - .limitRate(configuration.getMessageReadChunkSize()) .flatMap(id -> retrieveRow(id, fetchType) - .flatMap(resultSet -> message(resultSet, id, fetchType))); + .flatMap(resultSet -> message(resultSet, id, fetchType)), configuration.getMessageReadChunkSize()); } private Mono<ResultSet> retrieveRow(ComposedMessageIdWithMetaData messageId, FetchType fetchType) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 830ae98..33db97c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -92,8 +92,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public List<MailboxMessage> find(Collection<MessageId> messageIds, FetchType fetchType) { return Flux.fromStream(messageIds.stream()) .publishOn(Schedulers.elastic()) - .limitRate(cassandraConfiguration.getMessageReadChunkSize()) - .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())) + .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize()) .collectList() .flatMapMany(composedMessageIds -> messageDAO.retrieveMessages(composedMessageIds, fetchType, Limit.unlimited())) .filter(CassandraMessageDAO.MessageResult::isFound) @@ -178,9 +177,8 @@ public class CassandraMessageIdMapper implements MessageIdMapper { public void delete(Multimap<MessageId, MailboxId> ids) { Flux.fromIterable(ids.asMap() .entrySet()) - .limitRate(cassandraConfiguration.getExpungeChunkSize()) .publishOn(Schedulers.elastic()) - .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue())) + .flatMap(entry -> deleteAsMono(entry.getKey(), entry.getValue()), cassandraConfiguration.getExpungeChunkSize()) .then() .block(); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index bdbf91b..18ac059 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -203,7 +203,6 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return deletedMessageDAO.retrieveDeletedMessage(mailboxId, messageRange) - .limitRate(cassandraConfiguration.getExpungeChunkSize()) .collect(Guavate.toImmutableList()) .block(); } @@ -213,8 +212,7 @@ public class CassandraMessageMapper implements MessageMapper { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return Flux.fromStream(uids.stream()) - .limitRate(cassandraConfiguration.getExpungeChunkSize()) - .flatMap(messageUid -> expungeOne(mailboxId, messageUid)) + .flatMap(messageUid -> expungeOne(mailboxId, messageUid), cassandraConfiguration.getExpungeChunkSize()) .collect(Guavate.<SimpleMailboxMessage, MessageUid, MessageMetaData>toImmutableMap(MailboxMessage::getUid, MailboxMessage::metaData)) .block(); } diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java index 1140f4f..1ae2c67 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentOwnerDAOTest.java @@ -76,11 +76,11 @@ class CassandraAttachmentOwnerDAOTest { @Test void retrieveOwnersShouldNotThrowWhenMoreReferencesThanPaging() { + int concurrency = 128; int referenceCountExceedingPaging = 5050; Flux.range(0, referenceCountExceedingPaging) - .limitRate(128) - .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i))) + .flatMap(i -> testee.addOwner(ATTACHMENT_ID, Username.fromRawValue("owner" + i)), concurrency) .blockLast(); assertThat(testee.retrieveOwners(ATTACHMENT_ID).toIterable()) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java index aac7b5c..b868d62 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/PreDeletionHooks.java @@ -34,6 +34,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class PreDeletionHooks { + private static final int CONCURRENCY = 1; public static final PreDeletionHooks NO_PRE_DELETION_HOOK = new PreDeletionHooks(ImmutableSet.of(), new NoopMetricFactory()); static final String PRE_DELETION_HOOK_METRIC_NAME = "preDeletionHook"; @@ -50,9 +51,8 @@ public class PreDeletionHooks { public Mono<Void> runHooks(PreDeletionHook.DeleteOperation deleteOperation) { return Flux.fromIterable(hooks) .publishOn(Schedulers.elastic()) - .limitRate(1) .flatMap(hook -> metricFactory.runPublishingTimerMetric(PRE_DELETION_HOOK_METRIC_NAME, - Mono.from(hook.notifyDelete(deleteOperation)))) + Mono.from(hook.notifyDelete(deleteOperation))), CONCURRENCY) .then(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
