This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c000d442881e9f589c9bfa7f42d5290015eb2f24 Author: TungTV <vtt...@linagora.com> AuthorDate: Tue Jan 14 15:58:24 2025 +0700 Fix KeyRegistrationHandler could not deserialize list event --- .../apache/james/events/EventBusTestFixture.java | 7 ++-- .../java/org/apache/james/events/KeyContract.java | 30 ++++++++++++++++- .../james/events/KeyRegistrationHandler.java | 39 ++++++++++++++++------ 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java index 4b6b16d766..cf1fb30def 100644 --- a/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java +++ b/event-bus/api/src/test/java/org/apache/james/events/EventBusTestFixture.java @@ -175,6 +175,8 @@ public interface EventBusTestFixture { } class TestEventSerializer implements EventSerializer { + static final String ARRAY_SEPARATOR = "^"; + @Override public String toJson(Event event) { Preconditions.checkArgument(event instanceof TestEvent || event instanceof UnsupportedEvent); @@ -184,6 +186,7 @@ public interface EventBusTestFixture { @Override public Event asEvent(String serialized) { Preconditions.checkArgument(serialized.contains("&")); + Preconditions.checkArgument(!serialized.contains(ARRAY_SEPARATOR)); List<String> parts = Splitter.on("&").splitToList(serialized); Preconditions.checkArgument(parts.get(0).equals(TestEvent.class.getCanonicalName())); @@ -196,12 +199,12 @@ public interface EventBusTestFixture { public String toJson(Collection<Event> event) { return event.stream() .map(this::toJson) - .collect(Collectors.joining("^")); + .collect(Collectors.joining(ARRAY_SEPARATOR)); } @Override public List<Event> asEvents(String serialized) { - return Splitter.on('^') + return Splitter.on(ARRAY_SEPARATOR) .splitToStream(serialized) .map(this::asEvent) .collect(ImmutableList.toImmutableList()); diff --git a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java index 5d0e5b6eb1..162bc37cf7 100644 --- a/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java +++ b/event-bus/api/src/test/java/org/apache/james/events/KeyContract.java @@ -20,6 +20,7 @@ package org.apache.james.events; import static org.apache.james.events.EventBusTestFixture.EVENT; +import static org.apache.james.events.EventBusTestFixture.EVENT_2; import static org.apache.james.events.EventBusTestFixture.EVENT_ID; import static org.apache.james.events.EventBusTestFixture.EVENT_UNSUPPORTED_BY_LISTENER; import static org.apache.james.events.EventBusTestFixture.FIVE_HUNDRED_MS; @@ -33,6 +34,7 @@ import static org.awaitility.Durations.TEN_MINUTES; import static org.awaitility.Durations.TEN_SECONDS; import static org.junit.jupiter.api.Assertions.assertTimeout; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.after; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -43,6 +45,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -358,7 +362,7 @@ public interface KeyContract extends EventBusContract { Mono.from(eventBus().register(listener, KEY_1)).block(); eventBus().dispatch(EVENT, KEY_1).block(); - eventBus().dispatch(EventBusTestFixture.EVENT_2, KEY_1).block(); + eventBus().dispatch(EVENT_2, KEY_1).block(); getSpeedProfile().shortWaitCondition() .untilAsserted(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1)); @@ -379,6 +383,30 @@ public interface KeyContract extends EventBusContract { verify(listener, timeout(ONE_SECOND.toMillis()).times(1)).event(any()); } + + @Test + default void dispatchShouldNotifyListenerWhenMultiEvents() { + EventListener.ReactiveEventListener listener = mock(EventListener.ReactiveEventListener.class); + when(listener.isHandling(any(EventBusTestFixture.TestEvent.class))).thenReturn(true); + when(listener.getExecutionMode()).thenReturn(ExecutionMode.ASYNCHRONOUS); + + List<Event> storedEvent = new ArrayList<>(); + when(listener.reactiveEvent(anyList())) + .thenAnswer(invocation -> { + List<Event> events = invocation.getArgument(0); + storedEvent.addAll(events); + return Mono.empty(); + }); + + Mono.from(eventBus().register(listener, KEY_1)).block(); + + List<EventBus.EventWithRegistrationKey> eventWithRegistrationKeys = List.of(new EventBus.EventWithRegistrationKey(EVENT, ImmutableSet.of(KEY_1)), + new EventBus.EventWithRegistrationKey(EVENT_2, ImmutableSet.of(KEY_1))); + eventBus().dispatch(eventWithRegistrationKeys).block(); + + verify(listener, after(FIVE_HUNDRED_MS.toMillis())).reactiveEvent(anyList()); + assertThat(storedEvent).containsExactly(EVENT, EVENT_2); + } } interface MultipleEventBusKeyContract extends MultipleEventBusContract { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index d134238579..84ab3b88d3 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -30,6 +30,7 @@ import java.time.Duration; import java.util.List; import java.util.Optional; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.apache.james.backends.rabbitmq.QueueArguments; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; @@ -195,19 +196,19 @@ class KeyRegistrationHandler { return Mono.empty(); } - Event event = toEvent(delivery); + List<Event> events = toEvent(delivery); return Flux.fromIterable(listenersToCall) - .flatMap(listener -> executeListener(listener, event, registrationKey), EventBus.EXECUTION_RATE) + .flatMap(listener -> executeListener(listener, events, registrationKey), EventBus.EXECUTION_RATE) .then(); } - private Mono<Void> executeListener(EventListener.ReactiveEventListener listener, Event event, RegistrationKey key) { + private Mono<Void> executeListener(EventListener.ReactiveEventListener listener, List<Event> events, RegistrationKey key) { MDCBuilder mdcBuilder = MDCBuilder.create() .addToContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString()); - return listenerExecutor.execute(listener, mdcBuilder, event) - .doOnError(e -> structuredLogger(event, key) + return listenerExecutor.execute(listener, mdcBuilder, events) + .doOnError(e -> structuredLogger(events, key) .log(logger -> logger.error("Exception happens when handling event", e))) .onErrorResume(e -> Mono.empty()) .then(); @@ -218,15 +219,31 @@ class KeyRegistrationHandler { listener.getExecutionMode().equals(EventListener.ExecutionMode.SYNCHRONOUS); } - private Event toEvent(Delivery delivery) { - return eventSerializer.fromBytes(delivery.getBody()); + private List<Event> toEvent(Delivery deliver) { + byte[] bodyAsBytes = deliver.getBody(); + // if the json is an array, we have multiple events + if (bodyAsBytes != null && bodyAsBytes.length > 0 && bodyAsBytes[0] == '[') { + return eventSerializer.asEventsFromBytes(bodyAsBytes); + } + + try { + return List.of(eventSerializer.fromBytes(bodyAsBytes)); + } catch (RuntimeException exception) { + return eventSerializer.asEventsFromBytes(bodyAsBytes); + } } - private StructuredLogger structuredLogger(Event event, RegistrationKey key) { + private StructuredLogger structuredLogger(List<Event> events, RegistrationKey key) { return MDCStructuredLogger.forLogger(LOGGER) - .field(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().getId().toString()) - .field(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass().getCanonicalName()) - .field(EventBus.StructuredLoggingFields.USER, event.getUsername().asString()) + .field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream() + .map(e -> e.getEventId().getId().toString()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream() + .map(e -> e.getClass().getCanonicalName()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.USER, events.stream() + .map(e -> e.getUsername().asString()) + .collect(Collectors.joining(","))) .field(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org