This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1ebd654240b4bcc2be1437e19e7f9aab7ed3ccc1 Author: Rémi KOWALSKI <[email protected]> AuthorDate: Mon Mar 2 17:41:30 2020 +0100 JAMES-3082 add retry to make event bus test when rabbitmq restart pass --- .../mailbox/events/RetryBackoffConfiguration.java | 3 ++- .../james/mailbox/events/GroupRegistration.java | 4 ++++ .../james/mailbox/events/KeyRegistrationHandler.java | 20 ++++++++++++++++---- .../james/mailbox/events/RabbitMQEventBus.java | 2 +- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java index f802d5e..a674a28 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/RetryBackoffConfiguration.java @@ -63,8 +63,9 @@ public class RetryBackoffConfiguration { } static final double DEFAULT_JITTER_FACTOR = 0.5; - static final int DEFAULT_MAX_RETRIES = 3; + static final int DEFAULT_MAX_RETRIES = 8; static final Duration DEFAULT_FIRST_BACKOFF = Duration.ofMillis(100); + static final Duration FOREVER = Duration.ofMillis(Long.MAX_VALUE); public static final RetryBackoffConfiguration DEFAULT = new RetryBackoffConfiguration( DEFAULT_MAX_RETRIES, DEFAULT_FIRST_BACKOFF, diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index da0fe77..8d1b7aa 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; +import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER; import java.nio.charset.StandardCharsets; import java.util.Objects; @@ -81,6 +82,7 @@ class GroupRegistration implements Registration { private final GroupConsumerRetry retryHandler; private final WaitDelayGenerator delayGenerator; private final Group group; + private final RetryBackoffConfiguration retryBackoff; private final MailboxListenerExecutor mailboxListenerExecutor; private Optional<Disposable> receiverSubscriber; @@ -93,6 +95,7 @@ class GroupRegistration implements Registration { this.queueName = WorkQueueName.of(group); this.sender = sender; this.receiver = receiverProvider.createReceiver(); + this.retryBackoff = retryBackoff; this.mailboxListenerExecutor = mailboxListenerExecutor; this.receiverSubscriber = Optional.empty(); this.unregisterGroup = unregisterGroup; @@ -106,6 +109,7 @@ class GroupRegistration implements Registration { .of(createGroupWorkQueue() .then(retryHandler.createRetryExchange(queueName)) .then(Mono.fromCallable(() -> this.consumeWorkQueue())) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) .block()); return this; } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index ae49471..b8789ca 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; import static org.apache.james.mailbox.events.RabbitMQEventBus.EVENT_BUS_ID; +import static org.apache.james.mailbox.events.RetryBackoffConfiguration.FOREVER; import java.nio.charset.StandardCharsets; import java.util.Optional; @@ -60,9 +61,13 @@ class KeyRegistrationHandler { private final RegistrationQueueName registrationQueue; private final RegistrationBinder registrationBinder; private final MailboxListenerExecutor mailboxListenerExecutor; + private final RetryBackoffConfiguration retryBackoff; private Optional<Disposable> receiverSubscriber; - KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, MailboxListenerExecutor mailboxListenerExecutor) { + KeyRegistrationHandler(EventBusId eventBusId, EventSerializer eventSerializer, + Sender sender, ReceiverProvider receiverProvider, + RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, + MailboxListenerExecutor mailboxListenerExecutor, RetryBackoffConfiguration retryBackoff) { this.eventBusId = eventBusId; this.eventSerializer = eventSerializer; this.sender = sender; @@ -70,6 +75,7 @@ class KeyRegistrationHandler { this.localListenerRegistry = localListenerRegistry; this.receiver = receiverProvider.createReceiver(); this.mailboxListenerExecutor = mailboxListenerExecutor; + this.retryBackoff = retryBackoff; this.registrationQueue = new RegistrationQueueName(); this.registrationBinder = new RegistrationBinder(sender, registrationQueue); } @@ -94,17 +100,23 @@ class KeyRegistrationHandler { receiverSubscriber.filter(subscriber -> !subscriber.isDisposed()) .ifPresent(Disposable::dispose); receiver.close(); - sender.delete(QueueSpecification.queue(registrationQueue.asString())).block(); + sender.delete(QueueSpecification.queue(registrationQueue.asString())) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .block(); } Registration register(MailboxListener listener, RegistrationKey key) { LocalListenerRegistry.LocalRegistration registration = localListenerRegistry.addListener(key, listener); if (registration.isFirstListener()) { - registrationBinder.bind(key).block(); + registrationBinder.bind(key) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .block(); } return new KeyRegistration(() -> { if (registration.unregister().lastListenerRemoved()) { - registrationBinder.unbind(key).block(); + registrationBinder.unbind(key) + .retryBackoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff(), FOREVER, retryBackoff.getJitterFactor(), Schedulers.elastic()) + .block(); } }); } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index ecc1b4f..01c2b23 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -76,7 +76,7 @@ public class RabbitMQEventBus implements EventBus, Startable { if (!isRunning && !isStopping) { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); - keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor); + keyRegistrationHandler = new KeyRegistrationHandler(eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, mailboxListenerExecutor, retryBackoff); groupRegistrationHandler = new GroupRegistrationHandler(eventSerializer, sender, receiverProvider, retryBackoff, eventDeadLetters, mailboxListenerExecutor); eventDispatcher = new EventDispatcher(eventBusId, eventSerializer, sender, localListenerRegistry, mailboxListenerExecutor); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
