This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7795d0b779c0989858bdb1c576c218b466aa47d9 Author: Benoit Tellier <[email protected]> AuthorDate: Mon May 4 18:27:18 2020 +0700 JAMES-3149 CassandraMailQueueBrowser caller should choose on which scheduler he dispatch This avoid enforcing the thread context switch upon browse, optimizing thread usage. Such an allocation is redundant when updating browse start. --- .../queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java | 4 +--- .../queue/rabbitmq/view/cassandra/CassandraMailQueueView.java | 8 +++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 31e448c..91223ad 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -47,7 +47,6 @@ import com.google.common.base.Preconditions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; public class CassandraMailQueueBrowser { @@ -110,8 +109,7 @@ public class CassandraMailQueueBrowser { Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) { return browseStartDao.findBrowseStart(queueName) .flatMapMany(this::allSlicesStartingAt) - .flatMapSequential(slice -> browseSlice(queueName, slice)) - .subscribeOn(Schedulers.elastic()); + .flatMapSequential(slice -> browseSlice(queueName, slice)); } private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index aede929..b82de97 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -34,6 +34,7 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.Eventsourcin import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; public class CassandraMailQueueView implements MailQueueView { @@ -91,13 +92,17 @@ public class CassandraMailQueueView implements MailQueueView { public ManageableMailQueue.MailQueueIterator browse() { return new CassandraMailQueueBrowser.CassandraMailQueueIterator( cassandraMailQueueBrowser.browse(mailQueueName) + .subscribeOn(Schedulers.elastic()) .toIterable() .iterator()); } @Override public long getSize() { - return cassandraMailQueueBrowser.browseReferences(mailQueueName).count().block(); + return cassandraMailQueueBrowser.browseReferences(mailQueueName) + .count() + .subscribeOn(Schedulers.elastic()) + .block(); } @Override @@ -117,6 +122,7 @@ public class CassandraMailQueueView implements MailQueueView { .flatMap(mailReference -> cassandraMailQueueMailDelete.considerDeleted(mailReference.getEnqueueId(), mailQueueName)) .count() .doOnNext(ignored -> cassandraMailQueueMailDelete.updateBrowseStart(mailQueueName)) + .subscribeOn(Schedulers.elastic()) .block(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
