This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new c553363 JAMES-3601 Do not update browse start when not needed (#579)
c553363 is described below
commit c5533634c891d7ecf534394538af073757e76145
Author: Tellier Benoit <[email protected]>
AuthorDate: Mon Aug 9 08:51:28 2021 +0700
JAMES-3601 Do not update browse start when not needed (#579)
A common mis-configuration of the Distributed mailqueue is to attempt to
update the browse start too often, which results in attempts to update the
browse start while we are already in the last slice.
Repeatedly trying to needlessly update the browse start can seriously
undermine
application performance as browsing the mail queue is no cheap task.
Hopefully prior to attempt to update the browse start, we can ensure the
browse start is not the current slice.
Thus mis-configurations are mitigated and will not result in hammering
browse start update of the current slice
(mis-configuration however can result in suboptimal concurrently triggered
browse start updates when switching
slices, operators should still ensure their "updateBrowseStartPace" are
adequate, but failures to do so becomes
less problematic)
---
.../rabbitmq/view/cassandra/CassandraMailQueueBrowser.java | 6 +++++-
.../rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java | 10 ++++++++--
.../view/cassandra/CassandraMailQueueViewTestFactory.java | 2 +-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index eef1056..2b9b812 100644
---
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -127,7 +127,11 @@ public class CassandraMailQueueBrowser {
Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName
queueName) {
return browseStartDao.findBrowseStart(queueName)
- .flatMapMany(this::allSlicesStartingAt)
+ .flatMapMany(browseStart -> browseReferences(queueName,
browseStart));
+ }
+
+ Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName
queueName, Instant browseStart) {
+ return allSlicesStartingAt(browseStart)
.flatMapSequential(slice -> browseSlice(queueName, slice));
}
diff --git
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
index a3b406d..a7ae50f 100644
---
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
+++
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailDelete.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
+import java.time.Clock;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
@@ -47,18 +48,20 @@ public class CassandraMailQueueMailDelete {
private final EnqueuedMailsDAO enqueuedMailsDAO;
private final CassandraMailQueueBrowser cassandraMailQueueBrowser;
private final CassandraMailQueueViewConfiguration configuration;
+ private final Clock clock;
@Inject
CassandraMailQueueMailDelete(DeletedMailsDAO deletedMailsDao,
BrowseStartDAO browseStartDao,
ContentStartDAO contentStartDAO,
EnqueuedMailsDAO enqueuedMailsDAO, CassandraMailQueueBrowser
cassandraMailQueueBrowser,
- CassandraMailQueueViewConfiguration
configuration) {
+ CassandraMailQueueViewConfiguration
configuration, Clock clock) {
this.deletedMailsDao = deletedMailsDao;
this.browseStartDao = browseStartDao;
this.contentStartDAO = contentStartDAO;
this.enqueuedMailsDAO = enqueuedMailsDAO;
this.cassandraMailQueueBrowser = cassandraMailQueueBrowser;
this.configuration = configuration;
+ this.clock = clock;
}
Mono<Void> considerDeleted(EnqueueId enqueueId, MailQueueName
mailQueueName) {
@@ -86,7 +89,10 @@ public class CassandraMailQueueMailDelete {
}
private Mono<Instant> findNewBrowseStart(MailQueueName mailQueueName) {
- return cassandraMailQueueBrowser.browseReferences(mailQueueName)
+ Slice currentSlice = Slice.of(clock.instant());
+ return browseStartDao.findBrowseStart(mailQueueName)
+ .filter(browseStart ->
browseStart.isBefore(currentSlice.getStartSliceInstant()))
+ .flatMapMany(browseStart ->
cassandraMailQueueBrowser.browseReferences(mailQueueName, browseStart))
.map(enqueuedItem ->
enqueuedItem.getSlicingContext().getTimeRangeStart())
.next();
}
diff --git
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index e035c87..e1b5641 100644
---
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -43,7 +43,7 @@ public class CassandraMailQueueViewTestFactory {
CassandraMailQueueBrowser cassandraMailQueueBrowser = new
CassandraMailQueueBrowser(browseStartDao, deletedMailsDao, enqueuedMailsDao,
mimeMessageStoreFactory, configuration, clock);
CassandraMailQueueMailStore cassandraMailQueueMailStore = new
CassandraMailQueueMailStore(enqueuedMailsDao, browseStartDao, contentStartDAO,
configuration, clock);
- CassandraMailQueueMailDelete cassandraMailQueueMailDelete = new
CassandraMailQueueMailDelete(deletedMailsDao, browseStartDao, contentStartDAO,
enqueuedMailsDao, cassandraMailQueueBrowser, configuration);
+ CassandraMailQueueMailDelete cassandraMailQueueMailDelete = new
CassandraMailQueueMailDelete(deletedMailsDao, browseStartDao, contentStartDAO,
enqueuedMailsDao, cassandraMailQueueBrowser, configuration, clock);
return new CassandraMailQueueView.Factory(
cassandraMailQueueMailStore,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]