This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit e3b61d371aa5dd98ef65edab12c4390a50b79636 Author: Benoit Tellier <[email protected]> AuthorDate: Wed Jul 15 12:11:06 2020 +0700 JAMES-3155 Limit the number of flags updated at the same time Large flags operations caused expensive updates to be performed "at once" by ElasticSearch. We should rather limit the number of flags being modified at once, and perform then "by batch". --- .../events/ElasticSearchListeningMessageSearchIndex.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java index 64f4d89..2d0df24 100644 --- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java +++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java @@ -77,6 +77,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSearchIndex { + private static final int FLAGS_UPDATE_PROCESSING_WINDOW_SIZE = 32; + public static class ElasticSearchListeningMessageSearchIndexGroup extends Group { } @@ -203,8 +205,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function( updatedFlags -> createUpdatedDocumentPartFromUpdatedFlags(mailboxId, updatedFlags)) .sneakyThrow()) - .collect(toImmutableList()) - .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey)) + .window(FLAGS_UPDATE_PROCESSING_WINDOW_SIZE) + .concatMap(flux -> flux.collect(toImmutableList()) + .flatMap(updates -> elasticSearchIndexer.update(updates, routingKey))) .then(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
