MAILBOX-372 ErrorHandling is applied for only group registered listeners
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b22103d0 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b22103d0 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b22103d0 Branch: refs/heads/master Commit: b22103d09b9b90bd4fb74828691df4e3687965f1 Parents: d4e5693 Author: tran tien duc <[email protected]> Authored: Tue Jan 15 16:57:39 2019 +0700 Committer: Benoit Tellier <[email protected]> Committed: Mon Jan 21 10:04:49 2019 +0700 ---------------------------------------------------------------------- .../mailbox/events/ErrorHandlingContract.java | 17 +++ .../james/mailbox/events/KeyContract.java | 12 +- .../james/mailbox/events/InVMEventBus.java | 19 ++-- .../mailbox/events/delivery/EventDelivery.java | 2 + .../events/delivery/InVmEventDelivery.java | 35 +++++- .../events/delivery/InVmEventDeliveryTest.java | 113 +++++++++++++++++-- .../resources/META-INF/spring/event-system.xml | 7 +- 7 files changed, 171 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java index c10c542..069798d 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java @@ -23,6 +23,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_ID; import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; +import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION; import static org.assertj.core.api.Assertions.assertThat; @@ -41,6 +42,8 @@ import org.apache.james.mailbox.util.EventCollector; import org.assertj.core.api.SoftAssertions; import org.junit.jupiter.api.Test; +import com.google.common.collect.ImmutableSet; + interface ErrorHandlingContract extends EventBusContract { class ThrowingListener implements MailboxListener { @@ -66,6 +69,20 @@ interface ErrorHandlingContract extends EventBusContract { } @Test + default void retryingIsNotAppliedForKeyRegistrations() { + EventCollector eventCollector = eventCollector(); + + doThrow(new RuntimeException()) + .when(eventCollector).event(EVENT); + + eventBus().register(eventCollector, KEY_1); + eventBus().dispatch(EVENT, ImmutableSet.of(KEY_1)).block(); + + assertThat(eventCollector.getEvents()) + .isEmpty(); + } + + @Test default void listenerShouldReceiveWhenFailsLessThanMaxRetries() { EventCollector eventCollector = eventCollector(); http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java ---------------------------------------------------------------------- diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java index 317925a..6bf091b 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/KeyContract.java @@ -20,6 +20,7 @@ package org.apache.james.mailbox.events; import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT; +import static org.apache.james.mailbox.events.EventBusTestFixture.EVENT_2; import static org.apache.james.mailbox.events.EventBusTestFixture.FIVE_HUNDRED_MS; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_2; @@ -35,7 +36,6 @@ import static org.mockito.Mockito.after; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -261,17 +261,11 @@ public interface KeyContract extends EventBusContract { @Test default void failingRegisteredListenersShouldNotAbortRegisteredDelivery() { - EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = spy(new EventBusTestFixture.MailboxListenerCountingSuccessfulExecution()); - doThrow(new RuntimeException()) - .doThrow(new RuntimeException()) - .doThrow(new RuntimeException()) - .doThrow(new RuntimeException()) - .doCallRealMethod() - .when(listener).event(any()); + EventBusTestFixture.MailboxListenerCountingSuccessfulExecution listener = new EventBusTestFixture.EventMatcherThrowingListener(ImmutableSet.of(EVENT)); eventBus().register(listener, KEY_1); eventBus().dispatch(EVENT, KEY_1).block(); - eventBus().dispatch(EVENT, KEY_1).block(); + eventBus().dispatch(EVENT_2, KEY_1).block(); WAIT_CONDITION .until(() -> assertThat(listener.numberOfEventCalls()).isEqualTo(1)); http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java index 8069739..3bb11b0 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/InVMEventBus.java @@ -30,10 +30,10 @@ import org.apache.james.mailbox.events.delivery.EventDelivery; import com.github.steveash.guavate.Guavate; import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class InVMEventBus implements EventBus { @@ -66,7 +66,10 @@ public class InVMEventBus implements EventBus { @Override public Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) { if (!event.isNoop()) { - return eventDelivery.deliver(registeredListeners(keys), event).synchronousListenerFuture() + return Flux.merge( + eventDelivery.deliverWithRetries(groups.values(), event).synchronousListenerFuture(), + eventDelivery.deliver(registeredListenersByKeys(keys), event).synchronousListenerFuture()) + .then() .onErrorResume(throwable -> Mono.empty()); } return Mono.empty(); @@ -76,13 +79,9 @@ public class InVMEventBus implements EventBus { return groups.keySet(); } - private Set<MailboxListener> registeredListeners(Set<RegistrationKey> keys) { - return ImmutableSet.<MailboxListener>builder() - .addAll(groups.values()) - .addAll(keys.stream() - .flatMap(registrationKey -> registrations.get(registrationKey).stream()) - .collect(Guavate.toImmutableList())) - .build(); + private Set<MailboxListener> registeredListenersByKeys(Set<RegistrationKey> keys) { + return keys.stream() + .flatMap(registrationKey -> registrations.get(registrationKey).stream()) + .collect(Guavate.toImmutableSet()); } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index a0240b2..359e4d7 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -49,4 +49,6 @@ public interface EventDelivery { ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event); + + ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event); } http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java index e7eb233..90e64c8 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/InVmEventDelivery.java @@ -40,6 +40,11 @@ import reactor.core.publisher.MonoProcessor; import reactor.core.scheduler.Schedulers; public class InVmEventDelivery implements EventDelivery { + + private enum DeliveryOption { + NO_RETRY, WITH_RETRY + } + private static final Logger LOGGER = LoggerFactory.getLogger(InVmEventDelivery.class); private static final int MAX_RETRIES = 3; private static final Duration FIRST_BACKOFF = Duration.ofMillis(100); @@ -56,9 +61,20 @@ public class InVmEventDelivery implements EventDelivery { @Override public ExecutionStages deliver(Collection<MailboxListener> mailboxListeners, Event event) { - Mono<Void> synchronousListeners = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event) + return deliverByOption(mailboxListeners, event, DeliveryOption.NO_RETRY); + } + + @Override + public ExecutionStages deliverWithRetries(Collection<MailboxListener> mailboxListeners, Event event) { + return deliverByOption(mailboxListeners, event, DeliveryOption.WITH_RETRY); + } + + private ExecutionStages deliverByOption(Collection<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) { + Mono<Void> synchronousListeners = doDeliver( + filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.SYNCHRONOUS), event, deliveryOption) .subscribeWith(MonoProcessor.create()); - Mono<Void> asyncListener = doDeliver(filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event) + Mono<Void> asyncListener = doDeliver( + filterByExecutionMode(mailboxListeners, MailboxListener.ExecutionMode.ASYNCHRONOUS), event, deliveryOption) .subscribeWith(MonoProcessor.create()); return new ExecutionStages(synchronousListeners, asyncListener); @@ -69,20 +85,27 @@ public class InVmEventDelivery implements EventDelivery { .filter(listener -> listener.getExecutionMode() == executionMode); } - private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event) { + private Mono<Void> doDeliver(Stream<MailboxListener> mailboxListeners, Event event, DeliveryOption deliveryOption) { return Flux.fromStream(mailboxListeners) - .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener)) + .flatMap(mailboxListener -> deliveryWithRetries(event, mailboxListener, deliveryOption)) .then() .subscribeOn(Schedulers.elastic()); } - private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener) { - return Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event)) + private Mono<Void> deliveryWithRetries(Event event, MailboxListener mailboxListener, DeliveryOption deliveryOption) { + Mono<Void> firstDelivery = Mono.fromRunnable(() -> doDeliverToListener(mailboxListener, event)) .doOnError(throwable -> LOGGER.error("Error while processing listener {} for {}", listenerName(mailboxListener), eventName(event), throwable)) .subscribeOn(Schedulers.elastic()) + .then(); + + if (deliveryOption == DeliveryOption.NO_RETRY) { + return firstDelivery; + } + + return firstDelivery .retryBackoff(MAX_RETRIES, FIRST_BACKOFF, MAX_BACKOFF, DEFAULT_JITTER_FACTOR) .doOnError(throwable -> LOGGER.error("listener {} exceeded maximum retry({}) to handle event {}", listenerName(mailboxListener), http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java index 207d319..d09eb78 100644 --- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java +++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java @@ -87,19 +87,46 @@ class InVmEventDeliveryTest { void deliverShouldNotDeliverEventToListenerWhenException() { doThrow(RuntimeException.class).when(syncEventCollector).event(event); - inVmEventDelivery.deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture().subscribe(); + assertThatThrownBy(() -> inVmEventDelivery + .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(RuntimeException.class); assertThat(syncEventCollector.getEvents()) .isEmpty(); } @Test - void deliverShouldBeErrorWhenException() { + void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() { doThrow(RuntimeException.class).when(syncEventCollector).event(event); assertThatThrownBy(() -> inVmEventDelivery + .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(IllegalStateException.class); + + assertThat(syncEventCollector.getEvents()) + .isEmpty(); + } + + @Test + void deliverShouldBeErrorWhenException() { + doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery .deliver(ImmutableList.of(syncEventCollector), event).allListenerFuture() .block()) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("mock exception"); + } + + @Test + void deliverWithRetriesShouldBeErrorWhenException() { + doThrow(RuntimeException.class).when(syncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery + .deliverWithRetries(ImmutableList.of(syncEventCollector), event).allListenerFuture() + .block()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Retries exhausted"); } @@ -109,10 +136,25 @@ class InVmEventDeliveryTest { class AsynchronousOnly { @Test void deliverShouldNotDeliverEventToListenerWhenException() { + doThrow(RuntimeException.class).when(asyncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery + .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(RuntimeException.class); + assertThat(asyncEventCollector.getEvents()) + .isEmpty(); + } + + @Test + void deliverWithRetriesShouldNotDeliverEventToListenerWhenException() { doThrow(RuntimeException.class).when(asyncEventCollector).event(event); - inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture().subscribe(); + assertThatThrownBy(() -> inVmEventDelivery + .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(IllegalStateException.class); assertThat(asyncEventCollector.getEvents()) .isEmpty(); @@ -120,13 +162,24 @@ class InVmEventDeliveryTest { @Test void deliverShouldBeErrorWhenException() { - doThrow(RuntimeException.class).when(asyncEventCollector).event(event); + doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event); assertThatThrownBy(() -> inVmEventDelivery .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture() .block()) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("Retries exhausted"); + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("mock exception"); + } + + @Test + void deliverWithRetriesShouldBeErrorWhenException() { + doThrow(RuntimeException.class).when(asyncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery + .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Retries exhausted"); } } @@ -136,7 +189,10 @@ class InVmEventDeliveryTest { void deliverShouldDeliverEventToSyncListenerWhenAsyncGetException() { doThrow(RuntimeException.class).when(asyncEventCollector).event(event); - inVmEventDelivery.deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture().subscribe(); + assertThatThrownBy(() -> inVmEventDelivery + .deliver(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(RuntimeException.class); SoftAssertions.assertSoftly(softly -> { softly.assertThat(asyncEventCollector.getEvents()).isEmpty(); @@ -146,6 +202,22 @@ class InVmEventDeliveryTest { } @Test + void deliverWithRetriesShouldDeliverEventToSyncListenerWhenAsyncGetException() { + doThrow(RuntimeException.class).when(asyncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery + .deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("Retries exhausted"); + + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(asyncEventCollector.getEvents()).isEmpty(); + softly.assertThat(syncEventCollector.getEvents()).hasSize(1); + }); + } + + @Test void deliverShouldDeliverEventToAsyncListenerWhenSyncGetException() { doThrow(RuntimeException.class).when(syncEventCollector).event(event); @@ -157,16 +229,41 @@ class InVmEventDeliveryTest { softly.assertThat(syncEventCollector.getEvents()).isEmpty(); softly.assertThat(asyncEventCollector.getEvents()).hasSize(1); }); + } + + @Test + void deliverWithRetriesShouldDeliverEventToAsyncListenerWhenSyncGetException() { + doThrow(RuntimeException.class).when(syncEventCollector).event(event); + + inVmEventDelivery.deliverWithRetries(ImmutableList.of(asyncEventCollector, syncEventCollector), event).allListenerFuture() + .onErrorResume(e -> Mono.empty()) + .block(); + SoftAssertions.assertSoftly(softly -> { + softly.assertThat(syncEventCollector.getEvents()).isEmpty(); + softly.assertThat(asyncEventCollector.getEvents()).hasSize(1); + }); } @Test void deliverShouldBeErrorWhenException() { + doThrow(new RuntimeException("mock exception")).when(syncEventCollector).event(event); + doThrow(new RuntimeException("mock exception")).when(asyncEventCollector).event(event); + + assertThatThrownBy(() -> inVmEventDelivery + .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture() + .block()) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("mock exception"); + } + + @Test + void deliverWithRetriesShouldBeErrorWhenException() { doThrow(RuntimeException.class).when(syncEventCollector).event(event); doThrow(RuntimeException.class).when(asyncEventCollector).event(event); assertThatThrownBy(() -> inVmEventDelivery - .deliver(ImmutableList.of(asyncEventCollector), event).allListenerFuture() + .deliverWithRetries(ImmutableList.of(asyncEventCollector), event).allListenerFuture() .block()) .isInstanceOf(IllegalStateException.class) .hasMessageContaining("Retries exhausted"); http://git-wip-us.apache.org/repos/asf/james-project/blob/b22103d0/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml ---------------------------------------------------------------------- diff --git a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml index ff0447a..609d183 100644 --- a/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml +++ b/mailbox/spring/src/main/resources/META-INF/spring/event-system.xml @@ -20,14 +20,19 @@ <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" - http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd"> <bean id="event-bus" class="org.apache.james.mailbox.events.InVMEventBus" lazy-init="true"> <constructor-arg index="0" ref="event-delivery"/> </bean> <bean id="event-delivery" class="org.apache.james.mailbox.events.delivery.InVmEventDelivery" lazy-init="true"> <constructor-arg index="0" ref="metricFactory"/> + <constructor-arg> + <util:constant static-field="org.apache.james.mailbox.events.RetryBackoffConfiguration.DEFAULT"/> + </constructor-arg> </bean> </beans> \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
