This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7d8e681b901d21ce8b6e0abede3888bde343103e Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Tue Dec 10 22:07:27 2024 +0100 [PERF] MailboxChangeListener should leverage events --- .../james/jmap/change/JmapEventSerializer.scala | 3 +- .../james/jmap/change/MailboxChangeListener.scala | 35 ++++++++++++++++++---- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala index 7212e1cea9..e614af67ac 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala @@ -19,11 +19,10 @@ package org.apache.james.jmap.change -import java.util.Optional import java.util +import java.util.Optional import com.fasterxml.jackson.annotation.JsonProperty -import com.google.api.client.util.Preconditions import com.google.common.collect.ImmutableList import jakarta.inject.Inject import org.apache.james.core.Username diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala index 1581eb7c09..b7bcfffe26 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala @@ -20,6 +20,7 @@ package org.apache.james.jmap.change import java.time.{Clock, ZonedDateTime} +import java.util import jakarta.inject.{Inject, Named} import org.apache.james.core.Username @@ -39,6 +40,7 @@ import org.apache.james.mailbox.model.{MailboxACL, MailboxId} import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY import org.reactivestreams.Publisher import org.slf4j.{Logger, LoggerFactory} +import reactor.core.publisher.Mono import reactor.core.scala.publisher.{SFlux, SMono} import scala.jdk.CollectionConverters._ @@ -60,8 +62,28 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: override def reactiveEvent(event: Event): Publisher[Void] = jmapChanges(event.asInstanceOf[MailboxEvent]) .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY) + .map(toStateChangeEvent) + .flatMap(dispactChangeEvent, DEFAULT_CONCURRENCY) .`then`() + override def reactiveEvent(events: util.List[Event]): Publisher[Void] = + SFlux.fromIterable(events.asScala) + .filter(isHandling) + .concatMap(event => jmapChanges(event.asInstanceOf[MailboxEvent])) + .concatMap(saveChangeEvent, DEFAULT_CONCURRENCY) + .groupBy(_.getAccountId) + .flatMap(group => group.map(a => toStateChangeEvent(a)) + .fold[StateChangeEvent](StateChangeEvent(EventId.random(), Username.of(group.key().getIdentifier), Map())) { + (a, b) => mergeStateChangeEvents(a, b) + }) + .flatMap(dispactChangeEvent, DEFAULT_CONCURRENCY) + .`then`() + + private def mergeStateChangeEvents(one: StateChangeEvent, other: StateChangeEvent): StateChangeEvent = StateChangeEvent( + one.eventId, + one.username, + one.map ++ other.map) // will keep the rightmost value + override def getDefaultGroup: Group = MailboxChangeListenerGroup() override def isHandling(event: Event): Boolean = event.isInstanceOf[MailboxEvent] @@ -99,13 +121,16 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: } } - private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] = + private def saveChangeEvent(jmapChange: JmapChange): Publisher[JmapChange] = SMono(jmapChange match { - case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange) - case emailChange: EmailChange => emailChangeRepository.save(emailChange) + case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange).`then`(Mono.just(jmapChange)) + case emailChange: EmailChange => emailChangeRepository.save(emailChange).`then`(Mono.just(jmapChange)) case mailboxAndEmailChange: MailboxAndEmailChange => mailboxChangeRepository.save(mailboxAndEmailChange.getMailboxChange) - .`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange)) - }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId)))) + .`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange)).`then`(Mono.just(jmapChange)) + }) + + private def dispactChangeEvent(jmapChange: StateChangeEvent): Publisher[Void] = + SMono(eventBus.dispatch(jmapChange, AccountIdRegistrationKey(AccountId.fromUsername(jmapChange.getUsername)))) private def getSharees(mailboxId: MailboxId, username: Username): SMono[List[AccountId]] = { val session = mailboxManager.createSystemSession(username) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org