This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 898742a2ceba69a21f602ac0f8499d842fc3d5cf
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Thu Dec 12 17:07:05 2024 +0100

    [PERF] RabbitMQEventBus support for grouped events
---
 .../org/apache/james/events/EventSerializer.java   |  8 +++++
 .../org/apache/james/events/EventDispatcher.java   | 40 ++++++++++++++++++++++
 .../org/apache/james/events/GroupRegistration.java | 16 +++++++++
 .../james/events/GroupRegistrationHandler.java     | 11 +++---
 .../org/apache/james/events/ListenerExecutor.java  | 37 ++++++++++++++++++++
 .../org/apache/james/events/RabbitMQEventBus.java  | 14 ++++++++
 .../james/events/delivery/InVmEventDelivery.java   |  3 ++
 .../james/event/json/MailboxEventSerializer.scala  |  4 +++
 8 files changed, 128 insertions(+), 5 deletions(-)

diff --git 
a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java 
b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java
index 5f04bf14e9..72360b861a 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java
@@ -32,6 +32,10 @@ public interface EventSerializer {
         return toJson(event).getBytes(StandardCharsets.UTF_8);
     }
 
+    default byte[] toJsonBytes(Collection<Event> event) {
+        return toJson(event).getBytes(StandardCharsets.UTF_8);
+    }
+
     Event asEvent(String serialized);
 
     List<Event> asEvents(String serialized);
@@ -39,4 +43,8 @@ public interface EventSerializer {
     default Event fromBytes(byte[] serialized) {
         return asEvent(new String(serialized, StandardCharsets.UTF_8));
     }
+
+    default List<Event> asEventsFromBytes(byte[] serialized) {
+        return asEvents(new String(serialized, StandardCharsets.UTF_8));
+    }
 }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 365e6001f3..101b2c8813 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -32,6 +32,7 @@ import static 
org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID;
 
 import java.time.Duration;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -122,6 +123,17 @@ public class EventDispatcher {
             .then();
     }
 
+    Mono<Void> dispatch(Collection<EventBus.EventWithRegistrationKey> events) {
+        return Flux
+            .concat(
+                Flux.fromIterable(events)
+                    .concatMap(e -> dispatchToLocalListeners(e.event(), 
e.keys()))
+                    .then(),
+                dispatchToRemoteListeners(events))
+            .doOnError(throwable -> LOGGER.error("error while dispatching 
event", throwable))
+            .then();
+    }
+
     private Mono<Void> dispatchToLocalListeners(Event event, 
Set<RegistrationKey> keys) {
         return Flux.fromIterable(keys)
             .flatMap(key -> 
Flux.fromIterable(localListenerRegistry.getLocalListeners(key))
@@ -159,6 +171,19 @@ public class EventDispatcher {
             .then();
     }
 
+    private Mono<Void> 
dispatchToRemoteListeners(Collection<EventBus.EventWithRegistrationKey> events) 
{
+        ImmutableList<Event> underlyingEvents = events.stream()
+            .map(EventBus.EventWithRegistrationKey::event)
+            .collect(ImmutableList.toImmutableList());
+        return Mono.fromCallable(() -> 
eventSerializer.toJsonBytes(underlyingEvents))
+            .flatMap(serializedEvent -> Mono.zipDelayError(
+                remoteGroupsDispatch(serializedEvent, underlyingEvents),
+                Flux.fromIterable(events)
+                    .concatMap(e -> remoteKeysDispatch(serializedEvent, 
e.keys()))
+                    .then()))
+            .then();
+    }
+
     private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, Event 
event) {
         return remoteDispatchWithAcks(serializedEvent)
             .doOnError(ex -> LOGGER.error(
@@ -171,6 +196,21 @@ public class EventDispatcher {
                 .then(propagateErrorIfNeeded(ex)));
     }
 
+    private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, 
List<Event> events) {
+        return remoteDispatchWithAcks(serializedEvent)
+            .onErrorResume(ex -> Flux.fromIterable(events)
+                .map(event -> {
+                    LOGGER.error(
+                        "cannot dispatch event of type '{}' belonging '{}' 
with id '{}' to remote groups, store it into dead letter",
+                        event.getClass().getSimpleName(),
+                        event.getUsername().asString(),
+                        event.getEventId().getId(),
+                        ex);
+                    return deadLetters.store(dispatchingFailureGroup, event);
+                })
+                .then(propagateErrorIfNeeded(ex)));
+    }
+
     private Mono<Void> propagateErrorIfNeeded(Throwable throwable) {
         if (configuration.eventBusPropagateDispatchError()) {
             return Mono.error(throwable);
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index eb0e55a194..76cd3d9b3c 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -27,6 +27,7 @@ import static 
org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
 import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
 import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
 
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Predicate;
@@ -173,6 +174,13 @@ class GroupRegistration implements Registration {
             .onErrorResume(throwable -> retryHandler.handleRetry(event, 
currentRetryCount, throwable));
     }
 
+    public Mono<Void> runListenerReliably(int currentRetryCount, List<Event> 
events) {
+        return runListener(events)
+            .onErrorResume(throwable -> Flux.fromIterable(events)
+                .concatMap(event -> retryHandler.handleRetry(event, 
currentRetryCount, throwable))
+                .then());
+    }
+
     private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
         return Mono.fromCallable(() -> eventSerializer.fromBytes(eventAsBytes))
             .subscribeOn(Schedulers.parallel());
@@ -190,6 +198,14 @@ class GroupRegistration implements Registration {
             event);
     }
 
+    private Mono<Void> runListener(List<Event> events) {
+        return listenerExecutor.execute(
+            listener,
+            MDCBuilder.create()
+                .addToContext(EventBus.StructuredLoggingFields.GROUP, 
group.asString()),
+            events);
+    }
+
     private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) {
         return 
Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders())
             .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT)))
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index e5dae70668..5480af0a8f 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -30,6 +30,7 @@ import static 
org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
 import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -138,10 +139,10 @@ class GroupRegistrationHandler {
     private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
         byte[] eventAsBytes = acknowledgableDelivery.getBody();
 
-        return deserializeEvent(eventAsBytes)
-            .flatMapIterable(aa -> groupRegistrations.values()
+        return deserializeEvents(eventAsBytes)
+            .flatMapIterable(events -> groupRegistrations.values()
                 .stream()
-                .map(group -> Pair.of(group, aa))
+                .map(group -> Pair.of(group, events))
                 .collect(ImmutableList.toImmutableList()))
             .flatMap(event -> 
event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight()))
             
.then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.boundedElastic()))
@@ -154,8 +155,8 @@ class GroupRegistrationHandler {
             });
     }
 
-    private Mono<Event> deserializeEvent(byte[] eventAsBytes) {
-        return Mono.fromCallable(() -> 
eventSerializer.fromBytes(eventAsBytes));
+    private Mono<List<Event>> deserializeEvents(byte[] eventAsBytes) {
+        return Mono.fromCallable(() -> 
eventSerializer.asEventsFromBytes(eventAsBytes));
     }
 
     void stop() {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
index 2eb14cc97a..a05a317108 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
@@ -22,6 +22,8 @@ package org.apache.james.events;
 import static org.apache.james.events.EventBus.Metrics.timerName;
 
 import java.time.Duration;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
@@ -50,6 +52,19 @@ class ListenerExecutor {
         return Mono.empty();
     }
 
+    Mono<Void> execute(EventListener.ReactiveEventListener listener, 
MDCBuilder mdcBuilder, List<Event> events) {
+        if (events.size() == 1) {
+            return execute(listener, mdcBuilder, events.getFirst());
+        }
+        if (events.stream().noneMatch(listener::isHandling)) {
+            return Mono.empty();
+        }
+        return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
+            Mono.from(listener.reactiveEvent(events))
+                .contextWrite(ReactorUtils.context("ListenerExecutor", 
mdc(listener, mdcBuilder, events)))
+                .timeout(TIMEOUT)));
+    }
+
     private MDCBuilder mdc(EventListener listener, MDCBuilder mdcBuilder, 
Event event) {
         return mdcBuilder
             .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
event.getEventId().getId().toString())
@@ -57,4 +72,26 @@ class ListenerExecutor {
             .addToContext(EventBus.StructuredLoggingFields.USER, 
event.getUsername().asString())
             .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
     }
+
+    private MDCBuilder mdc(EventListener listener, MDCBuilder mdcBuilder, 
List<Event> events) {
+        if (events.size() == 1) {
+            return mdcBuilder
+                .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
events.getFirst().getEventId().toString())
+                .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.getFirst().getClass().getCanonicalName())
+                .addToContext(EventBus.StructuredLoggingFields.USER, 
events.getFirst().getUsername().asString())
+                .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
+        }
+
+        return mdcBuilder
+            .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, 
events.stream()
+                .map(e -> e.getEventId().getId().toString())
+                .collect(Collectors.joining(",")))
+            .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.stream()
+                .map(e -> e.getClass().getCanonicalName())
+                .collect(Collectors.joining(",")))
+            .addToContext(EventBus.StructuredLoggingFields.USER, 
events.stream()
+                .map(e -> e.getUsername().asString())
+                .collect(Collectors.joining(",")))
+            .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, 
listener.getClass().getCanonicalName());
+    }
 }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
index a81ec7a8a3..ccd8df7bac 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java
@@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Mono;
@@ -153,6 +154,19 @@ public class RabbitMQEventBus implements EventBus, 
Startable {
         return Mono.empty();
     }
 
+    @Override
+    public Mono<Void> dispatch(Collection<EventWithRegistrationKey> events) {
+        Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
+
+        ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream()
+            .filter(e -> !e.event().isNoop())
+            .collect(ImmutableList.toImmutableList());
+        if (!notNoopEvents.isEmpty()) {
+            return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric("rabbit-dispatch", 
eventDispatcher.dispatch(events)));
+        }
+        return Mono.empty();
+    }
+
     @Override
     public Mono<Void> reDeliver(Group group, Event event) {
         Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE);
diff --git 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
index 66fda0526f..7140f5e7fa 100644
--- 
a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
+++ 
b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java
@@ -92,6 +92,9 @@ public class InVmEventDelivery implements EventDelivery {
     }
 
     private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener 
listener, List<Event> events) {
+        if (events.stream().noneMatch(listener::isHandling)) {
+            return Mono.empty();
+        }
         return Mono.defer(() -> 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
                 listener.reactiveEvent(events))))
             .contextWrite(context("deliver", buildMDC(listener, events)));
diff --git 
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
 
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
index 3eeee7fc55..88ae462624 100644
--- 
a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
+++ 
b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala
@@ -401,6 +401,7 @@ class JsonSerialize(mailboxIdFactory: MailboxId.Factory, 
messageIdFactory: Messa
     def toJson(event: Event): String = Json.toJson(event).toString()
     def toJson(event: Iterable[Event]): String = Json.toJson(event).toString()
     def toJsonBytes(event: Event): Array[Byte] = 
Json.toBytes(Json.toJson(event))
+    def toJsonBytes(event: Iterable[Event]): Array[Byte] = 
Json.toBytes(Json.toJson(event))
     def fromJson(json: String): JsResult[Event] = 
Json.fromJson[Event](Json.parse(json))
     def fromJsonAsEvents(json: String): JsResult[List[Event]] = if 
(json.startsWith("{")) {
       Json.fromJson[Event](Json.parse(json)).map(event => List(event))
@@ -413,6 +414,7 @@ class JsonSerialize(mailboxIdFactory: MailboxId.Factory, 
messageIdFactory: Messa
   def toJson(event: JavaEvent): String = 
eventSerializerPrivateWrapper.toJson(ScalaConverter.toScala(event))
   def toJson(event: util.Collection[JavaEvent]): String = 
eventSerializerPrivateWrapper.toJson(event.asScala.map(ScalaConverter.toScala))
   def toJsonBytes(event: JavaEvent): Array[Byte] = 
eventSerializerPrivateWrapper.toJsonBytes(ScalaConverter.toScala(event))
+  def toJsonBytes(event: util.Collection[JavaEvent]): Array[Byte] = 
eventSerializerPrivateWrapper.toJsonBytes(event.asScala.map(ScalaConverter.toScala))
   def fromJson(json: String): JsResult[JavaEvent] = 
eventSerializerPrivateWrapper.fromJson(json)
     .map(event => event.toJava)
   def fromJsonAsEvents(json: String): JsResult[List[JavaEvent]] = 
eventSerializerPrivateWrapper.fromJsonAsEvents(json)
@@ -426,6 +428,8 @@ class MailboxEventSerializer @Inject()(mailboxIdFactory: 
MailboxId.Factory, mess
 
   override def toJsonBytes(event: JavaEvent): Array[Byte] = 
jsonSerialize.toJsonBytes(event)
 
+  override def toJsonBytes(event: util.Collection[JavaEvent]): Array[Byte] = 
jsonSerialize.toJsonBytes(event)
+
   def fromJson(json: String): JsResult[JavaEvent] = 
jsonSerialize.fromJson(json)
 
   override def toJson(event: util.Collection[JavaEvent]): String = 
jsonSerialize.toJson(event)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to