This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 898742a2ceba69a21f602ac0f8499d842fc3d5cf Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Thu Dec 12 17:07:05 2024 +0100 [PERF] RabbitMQEventBus support for grouped events --- .../org/apache/james/events/EventSerializer.java | 8 +++++ .../org/apache/james/events/EventDispatcher.java | 40 ++++++++++++++++++++++ .../org/apache/james/events/GroupRegistration.java | 16 +++++++++ .../james/events/GroupRegistrationHandler.java | 11 +++--- .../org/apache/james/events/ListenerExecutor.java | 37 ++++++++++++++++++++ .../org/apache/james/events/RabbitMQEventBus.java | 14 ++++++++ .../james/events/delivery/InVmEventDelivery.java | 3 ++ .../james/event/json/MailboxEventSerializer.scala | 4 +++ 8 files changed, 128 insertions(+), 5 deletions(-) diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java index 5f04bf14e9..72360b861a 100644 --- a/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java +++ b/event-bus/api/src/main/java/org/apache/james/events/EventSerializer.java @@ -32,6 +32,10 @@ public interface EventSerializer { return toJson(event).getBytes(StandardCharsets.UTF_8); } + default byte[] toJsonBytes(Collection<Event> event) { + return toJson(event).getBytes(StandardCharsets.UTF_8); + } + Event asEvent(String serialized); List<Event> asEvents(String serialized); @@ -39,4 +43,8 @@ public interface EventSerializer { default Event fromBytes(byte[] serialized) { return asEvent(new String(serialized, StandardCharsets.UTF_8)); } + + default List<Event> asEventsFromBytes(byte[] serialized) { + return asEvents(new String(serialized, StandardCharsets.UTF_8)); + } } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java index 365e6001f3..101b2c8813 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java @@ -32,6 +32,7 @@ import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID; import java.time.Duration; import java.util.Collection; +import java.util.List; import java.util.Set; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; @@ -122,6 +123,17 @@ public class EventDispatcher { .then(); } + Mono<Void> dispatch(Collection<EventBus.EventWithRegistrationKey> events) { + return Flux + .concat( + Flux.fromIterable(events) + .concatMap(e -> dispatchToLocalListeners(e.event(), e.keys())) + .then(), + dispatchToRemoteListeners(events)) + .doOnError(throwable -> LOGGER.error("error while dispatching event", throwable)) + .then(); + } + private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) { return Flux.fromIterable(keys) .flatMap(key -> Flux.fromIterable(localListenerRegistry.getLocalListeners(key)) @@ -159,6 +171,19 @@ public class EventDispatcher { .then(); } + private Mono<Void> dispatchToRemoteListeners(Collection<EventBus.EventWithRegistrationKey> events) { + ImmutableList<Event> underlyingEvents = events.stream() + .map(EventBus.EventWithRegistrationKey::event) + .collect(ImmutableList.toImmutableList()); + return Mono.fromCallable(() -> eventSerializer.toJsonBytes(underlyingEvents)) + .flatMap(serializedEvent -> Mono.zipDelayError( + remoteGroupsDispatch(serializedEvent, underlyingEvents), + Flux.fromIterable(events) + .concatMap(e -> remoteKeysDispatch(serializedEvent, e.keys())) + .then())) + .then(); + } + private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, Event event) { return remoteDispatchWithAcks(serializedEvent) .doOnError(ex -> LOGGER.error( @@ -171,6 +196,21 @@ public class EventDispatcher { .then(propagateErrorIfNeeded(ex))); } + private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, List<Event> events) { + return remoteDispatchWithAcks(serializedEvent) + .onErrorResume(ex -> Flux.fromIterable(events) + .map(event -> { + LOGGER.error( + "cannot dispatch event of type '{}' belonging '{}' with id '{}' to remote groups, store it into dead letter", + event.getClass().getSimpleName(), + event.getUsername().asString(), + event.getEventId().getId(), + ex); + return deadLetters.store(dispatchingFailureGroup, event); + }) + .then(propagateErrorIfNeeded(ex))); + } + private Mono<Void> propagateErrorIfNeeded(Throwable throwable) { if (configuration.eventBusPropagateDispatchError()) { return Mono.error(throwable); diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index eb0e55a194..76cd3d9b3c 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -27,6 +27,7 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete; import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; @@ -173,6 +174,13 @@ class GroupRegistration implements Registration { .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)); } + public Mono<Void> runListenerReliably(int currentRetryCount, List<Event> events) { + return runListener(events) + .onErrorResume(throwable -> Flux.fromIterable(events) + .concatMap(event -> retryHandler.handleRetry(event, currentRetryCount, throwable)) + .then()); + } + private Mono<Event> deserializeEvent(byte[] eventAsBytes) { return Mono.fromCallable(() -> eventSerializer.fromBytes(eventAsBytes)) .subscribeOn(Schedulers.parallel()); @@ -190,6 +198,14 @@ class GroupRegistration implements Registration { event); } + private Mono<Void> runListener(List<Event> events) { + return listenerExecutor.execute( + listener, + MDCBuilder.create() + .addToContext(EventBus.StructuredLoggingFields.GROUP, group.asString()), + events); + } + private int getRetryCount(AcknowledgableDelivery acknowledgableDelivery) { return Optional.ofNullable(acknowledgableDelivery.getProperties().getHeaders()) .flatMap(headers -> Optional.ofNullable(headers.get(RETRY_COUNT))) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index e5dae70668..5480af0a8f 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -30,6 +30,7 @@ import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT; import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -138,10 +139,10 @@ class GroupRegistrationHandler { private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { byte[] eventAsBytes = acknowledgableDelivery.getBody(); - return deserializeEvent(eventAsBytes) - .flatMapIterable(aa -> groupRegistrations.values() + return deserializeEvents(eventAsBytes) + .flatMapIterable(events -> groupRegistrations.values() .stream() - .map(group -> Pair.of(group, aa)) + .map(group -> Pair.of(group, events)) .collect(ImmutableList.toImmutableList())) .flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight())) .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.boundedElastic())) @@ -154,8 +155,8 @@ class GroupRegistrationHandler { }); } - private Mono<Event> deserializeEvent(byte[] eventAsBytes) { - return Mono.fromCallable(() -> eventSerializer.fromBytes(eventAsBytes)); + private Mono<List<Event>> deserializeEvents(byte[] eventAsBytes) { + return Mono.fromCallable(() -> eventSerializer.asEventsFromBytes(eventAsBytes)); } void stop() { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java index 2eb14cc97a..a05a317108 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java @@ -22,6 +22,8 @@ package org.apache.james.events; import static org.apache.james.events.EventBus.Metrics.timerName; import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; @@ -50,6 +52,19 @@ class ListenerExecutor { return Mono.empty(); } + Mono<Void> execute(EventListener.ReactiveEventListener listener, MDCBuilder mdcBuilder, List<Event> events) { + if (events.size() == 1) { + return execute(listener, mdcBuilder, events.getFirst()); + } + if (events.stream().noneMatch(listener::isHandling)) { + return Mono.empty(); + } + return Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), + Mono.from(listener.reactiveEvent(events)) + .contextWrite(ReactorUtils.context("ListenerExecutor", mdc(listener, mdcBuilder, events))) + .timeout(TIMEOUT))); + } + private MDCBuilder mdc(EventListener listener, MDCBuilder mdcBuilder, Event event) { return mdcBuilder .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().getId().toString()) @@ -57,4 +72,26 @@ class ListenerExecutor { .addToContext(EventBus.StructuredLoggingFields.USER, event.getUsername().asString()) .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); } + + private MDCBuilder mdc(EventListener listener, MDCBuilder mdcBuilder, List<Event> events) { + if (events.size() == 1) { + return mdcBuilder + .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, events.getFirst().getEventId().toString()) + .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, events.getFirst().getClass().getCanonicalName()) + .addToContext(EventBus.StructuredLoggingFields.USER, events.getFirst().getUsername().asString()) + .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); + } + + return mdcBuilder + .addToContext(EventBus.StructuredLoggingFields.EVENT_ID, events.stream() + .map(e -> e.getEventId().getId().toString()) + .collect(Collectors.joining(","))) + .addToContext(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream() + .map(e -> e.getClass().getCanonicalName()) + .collect(Collectors.joining(","))) + .addToContext(EventBus.StructuredLoggingFields.USER, events.stream() + .map(e -> e.getUsername().asString()) + .collect(Collectors.joining(","))) + .addToContext(EventBus.StructuredLoggingFields.LISTENER_CLASS, listener.getClass().getCanonicalName()); + } } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index a81ec7a8a3..ccd8df7bac 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -32,6 +32,7 @@ import org.apache.james.metrics.api.MetricFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; @@ -153,6 +154,19 @@ public class RabbitMQEventBus implements EventBus, Startable { return Mono.empty(); } + @Override + public Mono<Void> dispatch(Collection<EventWithRegistrationKey> events) { + Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE); + + ImmutableList<EventWithRegistrationKey> notNoopEvents = events.stream() + .filter(e -> !e.event().isNoop()) + .collect(ImmutableList.toImmutableList()); + if (!notNoopEvents.isEmpty()) { + return Mono.from(metricFactory.decoratePublisherWithTimerMetric("rabbit-dispatch", eventDispatcher.dispatch(events))); + } + return Mono.empty(); + } + @Override public Mono<Void> reDeliver(Group group, Event event) { Preconditions.checkState(isRunning, NOT_RUNNING_ERROR_MESSAGE); diff --git a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java index 66fda0526f..7140f5e7fa 100644 --- a/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java +++ b/event-bus/in-vm/src/main/java/org/apache/james/events/delivery/InVmEventDelivery.java @@ -92,6 +92,9 @@ public class InVmEventDelivery implements EventDelivery { } private Mono<Void> doDeliverToListener(EventListener.ReactiveEventListener listener, List<Event> events) { + if (events.stream().noneMatch(listener::isHandling)) { + return Mono.empty(); + } return Mono.defer(() -> Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), listener.reactiveEvent(events)))) .contextWrite(context("deliver", buildMDC(listener, events))); diff --git a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala index 3eeee7fc55..88ae462624 100644 --- a/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala +++ b/mailbox/event/json/src/main/scala/org/apache/james/event/json/MailboxEventSerializer.scala @@ -401,6 +401,7 @@ class JsonSerialize(mailboxIdFactory: MailboxId.Factory, messageIdFactory: Messa def toJson(event: Event): String = Json.toJson(event).toString() def toJson(event: Iterable[Event]): String = Json.toJson(event).toString() def toJsonBytes(event: Event): Array[Byte] = Json.toBytes(Json.toJson(event)) + def toJsonBytes(event: Iterable[Event]): Array[Byte] = Json.toBytes(Json.toJson(event)) def fromJson(json: String): JsResult[Event] = Json.fromJson[Event](Json.parse(json)) def fromJsonAsEvents(json: String): JsResult[List[Event]] = if (json.startsWith("{")) { Json.fromJson[Event](Json.parse(json)).map(event => List(event)) @@ -413,6 +414,7 @@ class JsonSerialize(mailboxIdFactory: MailboxId.Factory, messageIdFactory: Messa def toJson(event: JavaEvent): String = eventSerializerPrivateWrapper.toJson(ScalaConverter.toScala(event)) def toJson(event: util.Collection[JavaEvent]): String = eventSerializerPrivateWrapper.toJson(event.asScala.map(ScalaConverter.toScala)) def toJsonBytes(event: JavaEvent): Array[Byte] = eventSerializerPrivateWrapper.toJsonBytes(ScalaConverter.toScala(event)) + def toJsonBytes(event: util.Collection[JavaEvent]): Array[Byte] = eventSerializerPrivateWrapper.toJsonBytes(event.asScala.map(ScalaConverter.toScala)) def fromJson(json: String): JsResult[JavaEvent] = eventSerializerPrivateWrapper.fromJson(json) .map(event => event.toJava) def fromJsonAsEvents(json: String): JsResult[List[JavaEvent]] = eventSerializerPrivateWrapper.fromJsonAsEvents(json) @@ -426,6 +428,8 @@ class MailboxEventSerializer @Inject()(mailboxIdFactory: MailboxId.Factory, mess override def toJsonBytes(event: JavaEvent): Array[Byte] = jsonSerialize.toJsonBytes(event) + override def toJsonBytes(event: util.Collection[JavaEvent]): Array[Byte] = jsonSerialize.toJsonBytes(event) + def fromJson(json: String): JsResult[JavaEvent] = jsonSerialize.fromJson(json) override def toJson(event: util.Collection[JavaEvent]): String = jsonSerialize.toJson(event) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org