This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 91595e7ecc19d9fe5a121c5c8c3963bd4414ecd9
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu May 13 17:37:05 2021 +0700

    [REFACTORING] MailboxChangeListener was performing some blocking calls...
---
 .../james/mailbox/store/StoreRightManager.java     |  1 +
 .../james/jmap/api/change/MailboxChange.java       |  6 +-
 .../james/jmap/change/MailboxChangeListener.scala  | 85 ++++++++++------------
 3 files changed, 44 insertions(+), 48 deletions(-)

diff --git 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
index 43578ea..7274f0c 100644
--- 
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
+++ 
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java
@@ -151,6 +151,7 @@ public class StoreRightManager implements RightManager {
         return mailbox.getACL();
     }
 
+    @Override
     public MailboxACL listRights(MailboxId mailboxId, MailboxSession session) 
throws MailboxException {
         MailboxMapper mapper = 
mailboxSessionMapperFactory.getMailboxMapper(session);
         Mailbox mailbox = blockOptional(mapper.findMailboxById(mailboxId))
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
index c1deef5..3da4de7 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
@@ -130,14 +130,14 @@ public class MailboxChange implements JmapChange {
             this.stateFactory = stateFactory;
         }
 
-        public List<JmapChange> fromMailboxAdded(MailboxAdded mailboxAdded, 
ZonedDateTime now) {
-            return ImmutableList.of(MailboxChange.builder()
+        public JmapChange fromMailboxAdded(MailboxAdded mailboxAdded, 
ZonedDateTime now) {
+            return MailboxChange.builder()
                 .accountId(AccountId.fromUsername(mailboxAdded.getUsername()))
                 .state(stateFactory.generate())
                 .date(now)
                 .isCountChange(false)
                 .created(ImmutableList.of(mailboxAdded.getMailboxId()))
-                .build());
+                .build();
         }
 
         public List<JmapChange> fromMailboxRenamed(MailboxRenamed 
mailboxRenamed, ZonedDateTime now, List<AccountId> sharees) {
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 84337db..b4c6aa9 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -31,10 +31,9 @@ import org.apache.james.jmap.api.change.{EmailChange, 
EmailChangeRepository, Jma
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.change.MailboxChangeListener.LOGGER
 import org.apache.james.jmap.core.UuidState
+import org.apache.james.mailbox.MailboxManager
 import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, 
FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, 
MailboxRenamed}
-import org.apache.james.mailbox.exception.MailboxException
 import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
-import org.apache.james.mailbox.{MailboxManager, MailboxSession}
 import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY
 import org.reactivestreams.Publisher
 import org.slf4j.{Logger, LoggerFactory}
@@ -57,44 +56,43 @@ case class MailboxChangeListener @Inject() 
(@Named(InjectionKeys.JMAP) eventBus:
                                             clock: Clock) extends 
ReactiveGroupEventListener {
 
   override def reactiveEvent(event: Event): Publisher[Void] =
-    handleEvent(event.asInstanceOf[MailboxEvent])
-      .`then`(SMono.empty[Void])
-      .asJava
+    jmapChanges(event.asInstanceOf[MailboxEvent])
+      .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY)
+      .`then`()
 
   override def getDefaultGroup: Group = MailboxChangeListenerGroup()
 
   override def isHandling(event: Event): Boolean = 
event.isInstanceOf[MailboxEvent]
 
-  private def handleEvent(mailboxEvent: MailboxEvent): SMono[Unit] = {
+  private def jmapChanges(mailboxEvent: MailboxEvent): SFlux[JmapChange] = {
     val now: ZonedDateTime = ZonedDateTime.now(clock)
     val mailboxId: MailboxId = mailboxEvent.getMailboxId
     val username: Username = mailboxEvent.getUsername
 
-    SFlux.fromIterable(
-      mailboxEvent match {
-        case mailboxAdded: MailboxAdded =>
-          mailboxChangeFactory.fromMailboxAdded(mailboxAdded, now).asScala
-        case mailboxRenamed: MailboxRenamed =>
-          mailboxChangeFactory.fromMailboxRenamed(mailboxRenamed, now, 
getSharees(mailboxId, username).asJava).asScala
-        case mailboxACLUpdated: MailboxACLUpdated =>
-          mailboxChangeFactory.fromMailboxACLUpdated(mailboxACLUpdated, now, 
getSharees(mailboxId, username).asJava).asScala
-        case mailboxDeletion: MailboxDeletion =>
-          mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, 
now).asScala
-        case added: Added =>
-          val sharees = getSharees(mailboxId, username).asJava
-          mailboxChangeFactory.fromAdded(added, now, sharees).asScala
-            .concat(emailChangeFactory.fromAdded(added, now, sharees).asScala)
-        case flagsUpdated: FlagsUpdated =>
-          val sharees = getSharees(mailboxId, username).asJava
-          mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, 
sharees).asScala
-            .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, 
sharees).asScala)
-        case expunged: Expunged =>
-          val sharees = getSharees(mailboxId, username)
-          mailboxChangeFactory.fromExpunged(expunged, now, 
sharees.asJava).asScala
-            .concat(emailChangeFactory.fromExpunged(expunged, now, 
sharees.map(_.getIdentifier).map(Username.of).asJava).asScala)
-      })
-      .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY)
-      .`then`()
+    mailboxEvent match {
+      case mailboxAdded: MailboxAdded =>
+        SFlux.just(mailboxChangeFactory.fromMailboxAdded(mailboxAdded, now))
+      case mailboxRenamed: MailboxRenamed =>
+        getSharees(mailboxId, username)
+          .flatMapIterable(sharees => 
mailboxChangeFactory.fromMailboxRenamed(mailboxRenamed, now, 
sharees.asJava).asScala)
+      case mailboxACLUpdated: MailboxACLUpdated =>
+        getSharees(mailboxId, username)
+          .flatMapIterable(sharees => 
mailboxChangeFactory.fromMailboxACLUpdated(mailboxACLUpdated, now, 
sharees.asJava).asScala)
+      case mailboxDeletion: MailboxDeletion =>
+        
SFlux.fromIterable(mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, 
now).asScala)
+      case added: Added =>
+        getSharees(mailboxId, username)
+          .flatMapIterable(sharees => mailboxChangeFactory.fromAdded(added, 
now, sharees.asJava).asScala
+            .concat(emailChangeFactory.fromAdded(added, now, 
sharees.asJava).asScala))
+      case flagsUpdated: FlagsUpdated =>
+        getSharees(mailboxId, username)
+          .flatMapIterable(sharees => 
mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala
+          .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, 
sharees.asJava).asScala))
+      case expunged: Expunged =>
+        getSharees(mailboxId, username)
+          .flatMapIterable(sharees => 
mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala
+          .concat(emailChangeFactory.fromExpunged(expunged, now, 
sharees.map(_.getIdentifier).map(Username.of).asJava).asScala))
+    }
   }
 
   private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] =
@@ -103,23 +101,21 @@ case class MailboxChangeListener @Inject() 
(@Named(InjectionKeys.JMAP) eventBus:
       case emailChange: EmailChange => emailChangeRepository.save(emailChange)
     }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), 
AccountIdRegistrationKey(jmapChange.getAccountId))))
 
-
-  private def getSharees(mailboxId: MailboxId, username: Username): 
List[AccountId] = {
-    val mailboxSession: MailboxSession = 
mailboxManager.createSystemSession(username)
-    try {
-      val mailboxACL = mailboxManager.listRights(mailboxId, mailboxSession)
-      mailboxACL.getEntries.keySet
+  private def getSharees(mailboxId: MailboxId, username: Username): 
SMono[List[AccountId]] = {
+    val session = mailboxManager.createSystemSession(username)
+    SMono(mailboxManager.getMailboxReactive(mailboxId, session))
+      .map(mailbox => mailbox.getResolvedAcl(session))
+      .map(mailboxACL => mailboxACL.getEntries.keySet
         .asScala
         .filter(!_.isNegative)
         .filter(_.getNameType == MailboxACL.NameType.user)
         .map(_.getName)
         .map(AccountId.fromString)
-        .toList
-    } catch {
-      case e: MailboxException =>
-        LOGGER.warn("Could not get sharees for mailbox [%s] when listening to 
change events", mailboxId)
-        List.empty
-    }
+        .toList)
+      .onErrorResume(e => {
+        LOGGER.warn("Could not get sharees for mailbox [%s] when listening to 
change events", mailboxId, e)
+        SMono.just(List.empty)
+      })
   }
 
   private def toStateChangeEvent(jmapChange: JmapChange): StateChangeEvent = 
jmapChange match {
@@ -130,8 +126,7 @@ case class MailboxChangeListener @Inject() 
(@Named(InjectionKeys.JMAP) eventBus:
         Some(UuidState.fromJava(emailChange.getState))
           .filter(_ => !emailChange.getCreated.isEmpty)
           .map(emailDeliveryState => Map(EmailDeliveryTypeName -> 
emailDeliveryState))
-          .getOrElse(Map())).toMap
-    )
+          .getOrElse(Map())).toMap)
     case mailboxChange: MailboxChange => StateChangeEvent(
       eventId = EventId.random(),
       username = Username.of(mailboxChange.getAccountId.getIdentifier),

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to