This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c000d442881e9f589c9bfa7f42d5290015eb2f24
Author: TungTV <vtt...@linagora.com>
AuthorDate: Tue Jan 14 15:58:24 2025 +0700

    Fix KeyRegistrationHandler could not deserialize list event
---
 .../apache/james/events/EventBusTestFixture.java   |  7 ++--
 .../java/org/apache/james/events/KeyContract.java  | 30 ++++++++++++++++-
 .../james/events/KeyRegistrationHandler.java       | 39 ++++++++++++++++------
 3 files changed, 62 insertions(+), 14 deletions(-)

diff --git 
a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java 
b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java
index 4b6b16d766..cf1fb30def 100644
--- 
a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java
+++ 
b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java
@@ -175,6 +175,8 @@ public interface EventBusTestFixture {
     }
 
     class TestEventSerializer implements EventSerializer {
+        static final String ARRAY_SEPARATOR = "^";
+
         @Override
         public String toJson(Event event) {
             Preconditions.checkArgument(event instanceof TestEvent || event 
instanceof UnsupportedEvent);
@@ -184,6 +186,7 @@ public interface EventBusTestFixture {
         @Override
         public Event asEvent(String serialized) {
             Preconditions.checkArgument(serialized.contains("&"));
+            Preconditions.checkArgument(!serialized.contains(ARRAY_SEPARATOR));
             List<String> parts = Splitter.on("&").splitToList(serialized);
             
Preconditions.checkArgument(parts.get(0).equals(TestEvent.class.getCanonicalName()));
 
@@ -196,12 +199,12 @@ public interface EventBusTestFixture {
         public String toJson(Collection<Event> event) {
             return event.stream()
                 .map(this::toJson)
-                .collect(Collectors.joining("^"));
+                .collect(Collectors.joining(ARRAY_SEPARATOR));
         }
 
         @Override
         public List<Event> asEvents(String serialized) {
-            return Splitter.on('^')
+            return Splitter.on(ARRAY_SEPARATOR)
                 .splitToStream(serialized)
                 .map(this::asEvent)
                 .collect(ImmutableList.toImmutableList());
diff --git 
a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java 
b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
index 5d0e5b6eb1..162bc37cf7 100644
--- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
+++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java
@@ -20,6 +20,7 @@
 package org.apache.james.events;
 
 import static org.apache.james.events.EventBusTestFixture.EVENT;
+import static org.apache.james.events.EventBusTestFixture.EVENT_2;
 import static org.apache.james.events.EventBusTestFixture.EVENT_ID;
 import static 
org.apache.james.events.EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER;
 import static org.apache.james.events.EventBusTestFixture.FIVE_HUNDRED_MS;
@@ -33,6 +34,7 @@ import static org.awaitility.Durations.TEN_MINUTES;
 import static org.awaitility.Durations.TEN_SECONDS;
 import static org.junit.jupiter.api.Assertions.assertTimeout;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.Mockito.after;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
@@ -43,6 +45,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -358,7 +362,7 @@ public interface KeyContract extends EventBusContract {
             Mono.from(eventBus().register(listener, KEY_1)).block();
 
             eventBus().dispatch(EVENT, KEY_1).block();
-            eventBus().dispatch(EventBusTestFixture.EVENT_2, KEY_1).block();
+            eventBus().dispatch(EVENT_2, KEY_1).block();
 
             getSpeedProfile().shortWaitCondition()
                 .untilAsserted(() -> 
assertThat(listener.numberOfEventCalls()).isEqualTo(1));
@@ -379,6 +383,30 @@ public interface KeyContract extends EventBusContract {
 
             verify(listener, 
timeout(ONE_SECOND.toMillis()).times(1)).event(any());
         }
+
+        @Test
+        default void dispatchShouldNotifyListenerWhenMultiEvents() {
+            EventListener.ReactiveEventListener listener = 
mock(EventListener.ReactiveEventListener.class);
+            
when(listener.isHandling(any(EventBusTestFixture.TestEvent.class))).thenReturn(true);
+            
when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS);
+
+            List<Event> storedEvent = new ArrayList<>();
+            when(listener.reactiveEvent(anyList()))
+                .thenAnswer(invocation -> {
+                    List<Event> events = invocation.getArgument(0);
+                    storedEvent.addAll(events);
+                    return Mono.empty();
+                });
+
+            Mono.from(eventBus().register(listener, KEY_1)).block();
+
+            List<EventBus.EventWithRegistrationKey> eventWithRegistrationKeys 
= List.of(new EventBus.EventWithRegistrationKey(EVENT, ImmutableSet.of(KEY_1)),
+                new EventBus.EventWithRegistrationKey(EVENT_2, 
ImmutableSet.of(KEY_1)));
+            eventBus().dispatch(eventWithRegistrationKeys).block();
+
+            verify(listener, 
after(FIVE_HUNDRED_MS.toMillis())).reactiveEvent(anyList());
+            assertThat(storedEvent).containsExactly(EVENT, EVENT_2);
+        }
     }
 
     interface MultipleEventBusKeyContract extends MultipleEventBusContract {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index d134238579..84ab3b88d3 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -30,6 +30,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -195,19 +196,19 @@ class KeyRegistrationHandler {
             return Mono.empty();
         }
 
-        Event event = toEvent(delivery);
+        List<Event> events = toEvent(delivery);
 
         return Flux.fromIterable(listenersToCall)
-            .flatMap(listener -> executeListener(listener, event, 
registrationKey), EventBus.EXECUTION_RATE)
+            .flatMap(listener -> executeListener(listener, events, 
registrationKey), EventBus.EXECUTION_RATE)
             .then();
     }
 
-    private Mono<Void> executeListener(EventListener.ReactiveEventListener 
listener, Event event, RegistrationKey key) {
+    private Mono<Void> executeListener(EventListener.ReactiveEventListener 
listener, List<Event> events, RegistrationKey key) {
         MDCBuilder mdcBuilder = MDCBuilder.create()
             .addToContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, 
key.asString());
 
-        return listenerExecutor.execute(listener, mdcBuilder, event)
-            .doOnError(e -> structuredLogger(event, key)
+        return listenerExecutor.execute(listener, mdcBuilder, events)
+            .doOnError(e -> structuredLogger(events, key)
                 .log(logger -> logger.error("Exception happens when handling 
event", e)))
             .onErrorResume(e -> Mono.empty())
             .then();
@@ -218,15 +219,31 @@ class KeyRegistrationHandler {
             
listener.getExecutionMode().equals(EventListener.ExecutionMode.SYNCHRONOUS);
     }
 
-    private Event toEvent(Delivery delivery) {
-        return eventSerializer.fromBytes(delivery.getBody());
+    private List<Event> toEvent(Delivery deliver) {
+        byte[] bodyAsBytes = deliver.getBody();
+        // if the json is an array, we have multiple events
+        if (bodyAsBytes != null && bodyAsBytes.length > 0 && bodyAsBytes[0] == 
'[') {
+            return eventSerializer.asEventsFromBytes(bodyAsBytes);
+        }
+
+        try {
+            return List.of(eventSerializer.fromBytes(bodyAsBytes));
+        } catch (RuntimeException exception) {
+            return eventSerializer.asEventsFromBytes(bodyAsBytes);
+        }
     }
 
-    private StructuredLogger structuredLogger(Event event, RegistrationKey 
key) {
+    private StructuredLogger structuredLogger(List<Event> events, 
RegistrationKey key) {
         return MDCStructuredLogger.forLogger(LOGGER)
-            .field(EventBus.StructuredLoggingFields.EVENT_ID, 
event.getEventId().getId().toString())
-            .field(EventBus.StructuredLoggingFields.EVENT_CLASS, 
event.getClass().getCanonicalName())
-            .field(EventBus.StructuredLoggingFields.USER, 
event.getUsername().asString())
+            .field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream()
+                .map(e -> e.getEventId().getId().toString())
+                .collect(Collectors.joining(",")))
+            .field(EventBus.StructuredLoggingFields.EVENT_CLASS, 
events.stream()
+                .map(e -> e.getClass().getCanonicalName())
+                .collect(Collectors.joining(",")))
+            .field(EventBus.StructuredLoggingFields.USER, events.stream()
+                .map(e -> e.getUsername().asString())
+                .collect(Collectors.joining(",")))
             .field(EventBus.StructuredLoggingFields.REGISTRATION_KEY, 
key.asString());
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to