This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7d8e681b901d21ce8b6e0abede3888bde343103e
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Tue Dec 10 22:07:27 2024 +0100

    [PERF] MailboxChangeListener should leverage events
---
 .../james/jmap/change/JmapEventSerializer.scala    |  3 +-
 .../james/jmap/change/MailboxChangeListener.scala  | 35 ++++++++++++++++++----
 2 files changed, 31 insertions(+), 7 deletions(-)

diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
index 7212e1cea9..e614af67ac 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
@@ -19,11 +19,10 @@
 
 package org.apache.james.jmap.change
 
-import java.util.Optional
 import java.util
+import java.util.Optional
 
 import com.fasterxml.jackson.annotation.JsonProperty
-import com.google.api.client.util.Preconditions
 import com.google.common.collect.ImmutableList
 import jakarta.inject.Inject
 import org.apache.james.core.Username
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 1581eb7c09..b7bcfffe26 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -20,6 +20,7 @@
 package org.apache.james.jmap.change
 
 import java.time.{Clock, ZonedDateTime}
+import java.util
 
 import jakarta.inject.{Inject, Named}
 import org.apache.james.core.Username
@@ -39,6 +40,7 @@ import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
 import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY
 import org.reactivestreams.Publisher
 import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
 import reactor.core.scala.publisher.{SFlux, SMono}
 
 import scala.jdk.CollectionConverters._
@@ -60,8 +62,28 @@ case class MailboxChangeListener @Inject() 
(@Named(InjectionKeys.JMAP) eventBus:
   override def reactiveEvent(event: Event): Publisher[Void] =
     jmapChanges(event.asInstanceOf[MailboxEvent])
       .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY)
+      .map(toStateChangeEvent)
+      .flatMap(dispactChangeEvent, DEFAULT_CONCURRENCY)
       .`then`()
 
+  override def reactiveEvent(events: util.List[Event]): Publisher[Void] =
+    SFlux.fromIterable(events.asScala)
+      .filter(isHandling)
+      .concatMap(event => jmapChanges(event.asInstanceOf[MailboxEvent]))
+      .concatMap(saveChangeEvent, DEFAULT_CONCURRENCY)
+      .groupBy(_.getAccountId)
+      .flatMap(group => group.map(a => toStateChangeEvent(a))
+        .fold[StateChangeEvent](StateChangeEvent(EventId.random(), 
Username.of(group.key().getIdentifier), Map())) {
+          (a, b) => mergeStateChangeEvents(a, b)
+        })
+      .flatMap(dispactChangeEvent, DEFAULT_CONCURRENCY)
+      .`then`()
+
+  private def mergeStateChangeEvents(one: StateChangeEvent, other: 
StateChangeEvent): StateChangeEvent = StateChangeEvent(
+    one.eventId,
+    one.username,
+    one.map ++ other.map) // will keep the rightmost value
+
   override def getDefaultGroup: Group = MailboxChangeListenerGroup()
 
   override def isHandling(event: Event): Boolean = 
event.isInstanceOf[MailboxEvent]
@@ -99,13 +121,16 @@ case class MailboxChangeListener @Inject() 
(@Named(InjectionKeys.JMAP) eventBus:
     }
   }
 
-  private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] =
+  private def saveChangeEvent(jmapChange: JmapChange): Publisher[JmapChange] =
     SMono(jmapChange match {
-      case mailboxChange: MailboxChange => 
mailboxChangeRepository.save(mailboxChange)
-      case emailChange: EmailChange => emailChangeRepository.save(emailChange)
+      case mailboxChange: MailboxChange => 
mailboxChangeRepository.save(mailboxChange).`then`(Mono.just(jmapChange))
+      case emailChange: EmailChange => 
emailChangeRepository.save(emailChange).`then`(Mono.just(jmapChange))
       case mailboxAndEmailChange: MailboxAndEmailChange => 
mailboxChangeRepository.save(mailboxAndEmailChange.getMailboxChange)
-        
.`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange))
-    }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), 
AccountIdRegistrationKey(jmapChange.getAccountId))))
+        
.`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange)).`then`(Mono.just(jmapChange))
+    })
+
+  private def dispactChangeEvent(jmapChange: StateChangeEvent): 
Publisher[Void] =
+    SMono(eventBus.dispatch(jmapChange, 
AccountIdRegistrationKey(AccountId.fromUsername(jmapChange.getUsername))))
 
   private def getSharees(mailboxId: MailboxId, username: Username): 
SMono[List[AccountId]] = {
     val session = mailboxManager.createSystemSession(username)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to