This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e3b61d371aa5dd98ef65edab12c4390a50b79636
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Jul 15 12:11:06 2020 +0700

    JAMES-3155 Limit the number of flags updated at the same time
    
    Large flags operations caused expensive updates to be performed "at once"
    by ElasticSearch.
    
    We should rather limit the number of flags being modified at once, and 
perform then "by batch".
---
 .../events/ElasticSearchListeningMessageSearchIndex.java           | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 64f4d89..2d0df24 100644
--- 
a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ 
b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -77,6 +77,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class ElasticSearchListeningMessageSearchIndex extends 
ListeningMessageSearchIndex {
+    private static final int FLAGS_UPDATE_PROCESSING_WINDOW_SIZE = 32;
+
     public static class ElasticSearchListeningMessageSearchIndexGroup extends 
Group {
 
     }
@@ -203,8 +205,9 @@ public class ElasticSearchListeningMessageSearchIndex 
extends ListeningMessageSe
             .map(Throwing.<UpdatedFlags, UpdatedRepresentation>function(
                 updatedFlags -> 
createUpdatedDocumentPartFromUpdatedFlags(mailboxId, updatedFlags))
                 .sneakyThrow())
-            .collect(toImmutableList())
-            .flatMap(updates -> elasticSearchIndexer.update(updates, 
routingKey))
+            .window(FLAGS_UPDATE_PROCESSING_WINDOW_SIZE)
+            .concatMap(flux -> flux.collect(toImmutableList())
+                .flatMap(updates -> elasticSearchIndexer.update(updates, 
routingKey)))
             .then();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to