This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 20b2a9505f01621ee3017c06059c6777849d396c Author: TungTV <vtt...@linagora.com> AuthorDate: Tue Jan 14 16:00:35 2025 +0700 Avoid key remote dispatch multiple time with same Events, sames Keys --- .../java/org/apache/james/events/EventDispatcher.java | 9 ++++++--- .../java/org/apache/james/events/InVMEventBus.java | 18 ++++++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java index 101b2c8813..32624cdb86 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java @@ -175,12 +175,15 @@ public class EventDispatcher { ImmutableList<Event> underlyingEvents = events.stream() .map(EventBus.EventWithRegistrationKey::event) .collect(ImmutableList.toImmutableList()); + + ImmutableSet<RegistrationKey> keys = events.stream() + .flatMap(event -> event.keys().stream()) + .collect(ImmutableSet.toImmutableSet()); + return Mono.fromCallable(() -> eventSerializer.toJsonBytes(underlyingEvents)) .flatMap(serializedEvent -> Mono.zipDelayError( remoteGroupsDispatch(serializedEvent, underlyingEvents), - Flux.fromIterable(events) - .concatMap(e -> remoteKeysDispatch(serializedEvent, e.keys())) - .then())) + remoteKeysDispatch(serializedEvent, keys))) .then(); } diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java index 079ad648d5..bd7c15a9cd 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/InVMEventBus.java @@ -88,13 +88,21 @@ public class InVMEventBus implements EventBus { ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream() .filter(e -> !e.event().isNoop()) .collect(ImmutableList.toImmutableList()); + + ImmutableList<Event> underlyingEvents = events.stream() + .map(EventBus.EventWithRegistrationKey::event) + .collect(ImmutableList.toImmutableList()); + + ImmutableSet<RegistrationKey> keys = events.stream() + .flatMap(event -> event.keys().stream()) + .collect(ImmutableSet.toImmutableSet()); + if (!notNoopEvents.isEmpty()) { return Flux.merge( groupDeliveries(notNoopEvents.stream() .map(EventWithRegistrationKey::event) .collect(ImmutableList.toImmutableList())), - Flux.fromIterable(events) - .concatMap(e -> keyDeliveries(e.event(), e.keys()))) + keyDeliveries(underlyingEvents, keys)) .then() .onErrorResume(throwable -> Mono.empty()); } @@ -130,6 +138,12 @@ public class InVMEventBus implements EventBus { .then(); } + private Mono<Void> keyDeliveries(List<Event> events, Set<RegistrationKey> keys) { + return Flux.fromIterable(registeredListenersByKeys(keys)) + .flatMap(listener -> eventDelivery.deliver(listener, events, EventDelivery.DeliveryOption.none()), EventBus.EXECUTION_RATE) + .then(); + } + private Mono<Void> groupDeliveries(List<Event> events) { return Flux.fromIterable(groups.entrySet()) .flatMap(entry -> groupDelivery(events, entry.getValue(), entry.getKey()), EventBus.EXECUTION_RATE) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org