This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 20b2a9505f01621ee3017c06059c6777849d396c
Author: TungTV <vtt...@linagora.com>
AuthorDate: Tue Jan 14 16:00:35 2025 +0700

    Avoid key remote dispatch multiple time with same Events, sames Keys
---
 .../java/org/apache/james/events/EventDispatcher.java  |  9 ++++++---
 .../java/org/apache/james/events/InVMEventBus.java     | 18 ++++++++++++++++--
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 101b2c8813..32624cdb86 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -175,12 +175,15 @@ public class EventDispatcher {
         ImmutableList<Event> underlyingEvents = events.stream()
             .map(EventBus.EventWithRegistrationKey::event)
             .collect(ImmutableList.toImmutableList());
+
+        ImmutableSet<RegistrationKey> keys = events.stream()
+            .flatMap(event -> event.keys().stream())
+            .collect(ImmutableSet.toImmutableSet());
+
         return Mono.fromCallable(() -> 
eventSerializer.toJsonBytes(underlyingEvents))
             .flatMap(serializedEvent -> Mono.zipDelayError(
                 remoteGroupsDispatch(serializedEvent, underlyingEvents),
-                Flux.fromIterable(events)
-                    .concatMap(e -> remoteKeysDispatch(serializedEvent, 
e.keys()))
-                    .then()))
+                remoteKeysDispatch(serializedEvent, keys)))
             .then();
     }
 
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java 
b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
index 079ad648d5..bd7c15a9cd 100644
--- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
+++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java
@@ -88,13 +88,21 @@ public class InVMEventBus implements EventBus {
         ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream()
             .filter(e -> !e.event().isNoop())
             .collect(ImmutableList.toImmutableList());
+
+        ImmutableList<Event> underlyingEvents = events.stream()
+            .map(EventBus.EventWithRegistrationKey::event)
+            .collect(ImmutableList.toImmutableList());
+
+        ImmutableSet<RegistrationKey> keys = events.stream()
+            .flatMap(event -> event.keys().stream())
+            .collect(ImmutableSet.toImmutableSet());
+
         if (!notNoopEvents.isEmpty()) {
             return Flux.merge(
                     groupDeliveries(notNoopEvents.stream()
                         .map(EventWithRegistrationKey::event)
                         .collect(ImmutableList.toImmutableList())),
-                    Flux.fromIterable(events)
-                        .concatMap(e -> keyDeliveries(e.event(), e.keys())))
+                    keyDeliveries(underlyingEvents, keys))
                 .then()
                 .onErrorResume(throwable -> Mono.empty());
         }
@@ -130,6 +138,12 @@ public class InVMEventBus implements EventBus {
             .then();
     }
 
+    private Mono<Void> keyDeliveries(List<Event> events, Set<RegistrationKey> 
keys) {
+        return Flux.fromIterable(registeredListenersByKeys(keys))
+            .flatMap(listener -> eventDelivery.deliver(listener, events, 
EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE)
+            .then();
+    }
+
     private Mono<Void> groupDeliveries(List<Event> events) {
         return Flux.fromIterable(groups.entrySet())
             .flatMap(entry -> groupDelivery(events, entry.getValue(), 
entry.getKey()), EventBus.EXECUTION_RATE)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to