This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 257e917892433442a64ce094388041010e93a091 Author: Rémi KOWALSKI <[email protected]> AuthorDate: Tue Mar 3 17:09:29 2020 +0100 JAMES-3082 create test to demonstrate that event bus messages are persisted on rabbitMQ --- .../mailbox/events/ErrorHandlingContract.java | 57 ++++++++++++++++++---- .../james/mailbox/events/EventBusTestFixture.java | 9 +++- .../events/delivery/InVmEventDeliveryTest.java | 6 +++ .../mailbox/events/KeyRegistrationHandler.java | 28 ++++++++--- .../james/mailbox/events/RabbitMQEventBus.java | 22 +++++++++ .../james/mailbox/events/RabbitMQEventBusTest.java | 24 +++++++++ .../RabbitMQEventDeadLettersIntegrationTest.java | 4 +- 7 files changed, 131 insertions(+), 19 deletions(-) diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java index 219f5e4..35aaea2 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java @@ -26,6 +26,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION; +import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION_LONG; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; @@ -107,17 +108,22 @@ interface ErrorHandlingContract extends EventBusContract { @Test default void listenerShouldReceiveWhenFailsEqualsMaxRetries() { EventCollector eventCollector = eventCollector(); - + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); eventBus().register(eventCollector, GROUP_A); eventBus().dispatch(EVENT, NO_KEYS).block(); - WAIT_CONDITION + WAIT_CONDITION_LONG .untilAsserted(() -> assertThat(eventCollector.getEvents()).hasSize(1)); } @@ -125,10 +131,16 @@ interface ErrorHandlingContract extends EventBusContract { default void listenerShouldNotReceiveWhenFailsGreaterThanMaxRetries() throws Exception { EventCollector eventCollector = eventCollector(); + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); @@ -147,9 +159,10 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(throwingListener, GROUP_A); eventBus().dispatch(EVENT, NO_KEYS).block(); - TimeUnit.SECONDS.sleep(5); + TimeUnit.MINUTES.sleep(1); int numberOfCallsAfterExceedMaxRetries = throwingListener.timeElapsed.size(); - TimeUnit.SECONDS.sleep(5); + TimeUnit.MINUTES.sleep(1); + assertThat(throwingListener.timeElapsed.size()) .isEqualTo(numberOfCallsAfterExceedMaxRetries); @@ -162,10 +175,10 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(throwingListener, GROUP_A); eventBus().dispatch(EVENT, NO_KEYS).block(); - TimeUnit.SECONDS.sleep(5); + TimeUnit.MINUTES.sleep(1); SoftAssertions.assertSoftly(softly -> { List<Instant> timeElapsed = throwingListener.timeElapsed; - softly.assertThat(timeElapsed).hasSize(4); + softly.assertThat(timeElapsed).hasSize(RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1); long minFirstDelayAfter = 100; // first backOff long minSecondDelayAfter = 50; // 50 * jitter factor (50 * 0.5) @@ -207,10 +220,16 @@ interface ErrorHandlingContract extends EventBusContract { default void deadLettersIsNotAppliedForKeyRegistrations() throws Exception { EventCollector eventCollector = eventCollector(); + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); @@ -247,18 +266,23 @@ interface ErrorHandlingContract extends EventBusContract { @Test default void deadLetterShouldStoreWhenDispatchFailsGreaterThanMaxRetries() { EventCollector eventCollector = eventCollector(); - + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); eventBus().register(eventCollector, GROUP_A); eventBus().dispatch(EVENT, NO_KEYS).block(); - WAIT_CONDITION.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A) + WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(deadLetter().failedIds(GROUP_A) .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, insertionId)) .toIterable()) .containsOnly(EVENT)); @@ -270,17 +294,23 @@ interface ErrorHandlingContract extends EventBusContract { default void deadLetterShouldStoreWhenRedeliverFailsGreaterThanMaxRetries() { EventCollector eventCollector = eventCollector(); + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); eventBus().register(eventCollector, GROUP_A); eventBus().reDeliver(GROUP_A, EVENT).block(); - WAIT_CONDITION.untilAsserted(() -> + WAIT_CONDITION_LONG.untilAsserted(() -> assertThat( deadLetter() .failedIds(GROUP_A) @@ -295,16 +325,23 @@ interface ErrorHandlingContract extends EventBusContract { default void retryShouldDeliverAsManyTimesAsInitialDeliveryAttempt() { EventCollector eventCollector = eventCollector(); + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(eventCollector).event(EVENT); eventBus().register(eventCollector, GROUP_A); eventBus().reDeliver(GROUP_A, EVENT).block(); - WAIT_CONDITION.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty()); + WAIT_CONDITION_LONG.untilAsserted(() -> assertThat(eventCollector.getEvents()).isNotEmpty()); } @Test diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java index a071583..06515e0 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventBusTestFixture.java @@ -103,7 +103,6 @@ public interface EventBusTestFixture { MailboxListener.MailboxRenamed EVENT_UNSUPPORTED_BY_LISTENER = new MailboxListener.MailboxRenamed(SESSION_ID, USERNAME, MAILBOX_PATH, TEST_ID, MAILBOX_PATH, EVENT_ID_2); java.time.Duration ONE_SECOND = java.time.Duration.ofSeconds(1); - java.time.Duration THIRTY_SECONDS = java.time.Duration.ofSeconds(30); java.time.Duration FIVE_HUNDRED_MS = java.time.Duration.ofMillis(500); MailboxId ID_1 = TEST_ID; MailboxId ID_2 = TestId.of(24); @@ -118,6 +117,7 @@ public interface EventBusTestFixture { List<Group> ALL_GROUPS = ImmutableList.of(GROUP_A, GROUP_B, GROUP_C); ConditionFactory WAIT_CONDITION = await().timeout(Duration.FIVE_SECONDS); + ConditionFactory WAIT_CONDITION_LONG = await().timeout(Duration.ONE_MINUTE); static MailboxListener newListener() { MailboxListener listener = mock(MailboxListener.class); @@ -125,4 +125,11 @@ public interface EventBusTestFixture { when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true); return listener; } + + static MailboxListener newAsyncListener() { + MailboxListener listener = mock(MailboxListener.class); + when(listener.getExecutionMode()).thenReturn(MailboxListener.ExecutionMode.ASYNCHRONOUS); + when(listener.isHandling(any(MailboxListener.MailboxAdded.class))).thenReturn(true); + return listener; + } } diff --git a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java index 6212178..45335e9 100644 --- a/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java +++ b/mailbox/event/event-memory/src/test/java/org/apache/james/mailbox/events/delivery/InVmEventDeliveryTest.java @@ -215,10 +215,16 @@ class InVmEventDeliveryTest { @Test void failureHandlerShouldWorkWhenRetryFails() { MailboxListenerCountingSuccessfulExecution listener = newListener(); + //do throw RetryBackoffConfiguration.DEFAULT.DEFAULT_MAX_RETRIES + 1 times doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) + .doThrow(new RuntimeException()) .doCallRealMethod() .when(listener).event(EVENT); diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index b8789ca..49ea9b9 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -28,6 +28,7 @@ import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER; import java.nio.charset.StandardCharsets; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; @@ -38,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Delivery; @@ -63,6 +65,7 @@ class KeyRegistrationHandler { private final MailboxListenerExecutor mailboxListenerExecutor; private final RetryBackoffConfiguration retryBackoff; private Optional<Disposable> receiverSubscriber; + private AtomicBoolean registrationQueueInitialized = new AtomicBoolean(false); KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, @@ -78,22 +81,35 @@ class KeyRegistrationHandler { this.retryBackoff = retryBackoff; this.registrationQueue = new RegistrationQueueName(); this.registrationBinder = new RegistrationBinder(sender, registrationQueue); + this.receiverSubscriber = Optional.empty(); + } void start() { + declareQueue(); + + receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)) + .subscribeOn(Schedulers.parallel()) + .flatMap(this::handleDelivery) + .subscribe()); + } + + @VisibleForTesting + void declareQueue() { sender.declareQueue(QueueSpecification.queue(eventBusId.asString()) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) .arguments(NO_ARGUMENTS)) .map(AMQP.Queue.DeclareOk::getQueue) - .doOnSuccess(registrationQueue::initialize) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor()) + .doOnSuccess(queueName -> { + if (!registrationQueueInitialized.get()) { + registrationQueue.initialize(queueName); + registrationQueueInitialized.set(true); + } + }) .block(); - - receiverSubscriber = Optional.of(receiver.consumeAutoAck(registrationQueue.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)) - .subscribeOn(Schedulers.parallel()) - .flatMap(this::handleDelivery) - .subscribe()); } void stop() { diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 01c2b23..7e95dbf 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -29,6 +29,7 @@ import org.apache.james.event.json.EventSerializer; import org.apache.james.lifecycle.api.Startable; import org.apache.james.metrics.api.MetricFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import reactor.core.publisher.Mono; @@ -86,6 +87,27 @@ public class RabbitMQEventBus implements EventBus, Startable { } } + @VisibleForTesting + void startWithoutStartingKeyRegistrationHandler() { + if (!isRunning && !isStopping) { + + LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); + keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, channelPool, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); + groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, channelPool, retryBackoff, eventDeadLetters, mailboxListenerExecutor); + eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, channelPool.getSender(), localListenerRegistry, mailboxListenerExecutor); + + keyRegistrationHandler.declareQueue(); + + eventDispatcher.start(); + isRunning = true; + } + } + + @VisibleForTesting + void startKeyRegistrationHandler() { + keyRegistrationHandler.start(); + } + @PreDestroy public void stop() { if (isRunning && !isStopping) { diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index a5a7d48..f47a63b 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -32,6 +32,7 @@ import static org.apache.james.mailbox.events.EventBusTestFixture.GROUP_A; import static org.apache.james.mailbox.events.EventBusTestFixture.KEY_1; import static org.apache.james.mailbox.events.EventBusTestFixture.NO_KEYS; import static org.apache.james.mailbox.events.EventBusTestFixture.WAIT_CONDITION; +import static org.apache.james.mailbox.events.EventBusTestFixture.newAsyncListener; import static org.apache.james.mailbox.events.EventBusTestFixture.newListener; import static org.apache.james.mailbox.events.GroupRegistration.WorkQueueName.MAILBOX_EVENT_WORK_QUEUE_PREFIX; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; @@ -75,6 +76,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.stubbing.Answer; +import reactor.core.publisher.Mono; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; import reactor.rabbitmq.QueueSpecification; @@ -91,6 +93,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, private RabbitMQEventBus eventBus; private RabbitMQEventBus eventBus2; private RabbitMQEventBus eventBus3; + private RabbitMQEventBus eventBusWithKeyHandlerNotStarted; private EventSerializer eventSerializer; private RoutingKeyConverter routingKeyConverter; private MemoryEventDeadLetters memoryEventDeadLetters; @@ -106,10 +109,12 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus = newEventBus(); eventBus2 = newEventBus(); eventBus3 = newEventBus(); + eventBusWithKeyHandlerNotStarted = newEventBus(); eventBus.start(); eventBus2.start(); eventBus3.start(); + eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler(); } @AfterEach @@ -117,6 +122,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.stop(); eventBus2.stop(); eventBus3.stop(); + eventBusWithKeyHandlerNotStarted.stop(); ALL_GROUPS.stream() .map(GroupRegistration.WorkQueueName::of) .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block()); @@ -425,6 +431,21 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } @Test + void dispatchedMessagesShouldSurviveARabbitMQRestart() throws Exception { + eventBusWithKeyHandlerNotStarted.startWithoutStartingKeyRegistrationHandler(); + MailboxListener listener = newAsyncListener(); + eventBusWithKeyHandlerNotStarted.register(listener, KEY_1); + Mono<Void> dispatch = eventBusWithKeyHandlerNotStarted.dispatch(EVENT, KEY_1); + dispatch.block(); + + rabbitMQExtension.getRabbitMQ().restart(); + + eventBusWithKeyHandlerNotStarted.startKeyRegistrationHandler(); + + assertThatListenerReceiveOneEvent(listener); + } + + @Test void dispatchShouldWorkAfterRestartForNewKeyRegistration() throws Exception { eventBus.start(); MailboxListener listener = newListener(); @@ -606,6 +627,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.stop(); eventBus2.stop(); eventBus3.stop(); + eventBusWithKeyHandlerNotStarted.stop(); assertThat(rabbitManagementAPI.listExchanges()) .anySatisfy(exchange -> assertThat(exchange.getName()).isEqualTo(MAILBOX_EVENT_EXCHANGE_NAME)); @@ -618,6 +640,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.stop(); eventBus2.stop(); eventBus3.stop(); + eventBusWithKeyHandlerNotStarted.stop(); assertThat(rabbitManagementAPI.listQueues()) .anySatisfy(queue -> assertThat(queue.getName()).contains(GroupA.class.getName())); @@ -628,6 +651,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.stop(); eventBus2.stop(); eventBus3.stop(); + eventBusWithKeyHandlerNotStarted.stop(); assertThat(rabbitManagementAPI.listQueues()) .filteredOn(queue -> !queue.getName().startsWith(MAILBOX_EVENT_WORK_QUEUE_PREFIX)) diff --git a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java index 18060c8..36dd401 100644 --- a/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java +++ b/server/protocols/webadmin-integration-test/distributed-webadmin-integration-test/src/test/java/org/apache/james/webadmin/integration/rabbitmq/RabbitMQEventDeadLettersIntegrationTest.java @@ -179,7 +179,7 @@ class RabbitMQEventDeadLettersIntegrationTest { //This value is duplicated from default configuration to ensure we keep the same behavior over time //unless we really want to change that default value - private static final int MAX_RETRIES = 3; + private static final int MAX_RETRIES = 8; private static final String DOMAIN = "domain.tld"; private static final String BOB = "bob@" + DOMAIN; @@ -220,7 +220,7 @@ class RabbitMQEventDeadLettersIntegrationTest { } private String retrieveFirstFailedInsertionId() { - calmlyAwait.atMost(TEN_SECONDS) + calmlyAwait.atMost(ONE_MINUTE) .untilAsserted(() -> when() .get(EventDeadLettersRoutes.BASE_PATH + "/groups/" + GROUP_ID) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
