This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f7a91cfb727d29f16e01570e0a86caa7adc0dfab Author: Benoit Tellier <[email protected]> AuthorDate: Mon Jun 14 15:15:12 2021 +0700 JAMES-3599 Group execution: execute listeners together This minimizes: - The count of events to deserialize (one for all groups) - The count of ACKs to perform This enables potentially ordering of execution upon the happy case. Note that retries are still performed on a per-group basis. --- .../rabbitmq/ReactorRabbitMQChannelPool.java | 18 ++++ .../org/apache/james/events/GroupRegistration.java | 10 +- .../james/events/GroupRegistrationHandler.java | 101 ++++++++++++++++++++- .../org/apache/james/events/RabbitMQEventBus.java | 4 +- .../apache/james/events/RabbitMQEventBusTest.java | 4 +- 5 files changed, 129 insertions(+), 8 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java index 4e74850..ab6f673 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/ReactorRabbitMQChannelPool.java @@ -276,6 +276,24 @@ public class ReactorRabbitMQChannelPool implements ChannelPool, Startable { .then(); } + public Mono<Void> createWorkQueue(QueueSpecification queueSpecification) { + Preconditions.checkArgument(queueSpecification.getName() != null, "WorkQueue pattern do not make sense for unnamed queues"); + + return Mono.using(this::createSender, + managementSender -> managementSender.declareQueue(queueSpecification), + Sender::close) + .onErrorResume( + e -> e instanceof ShutdownSignalException + && e.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue"), + e -> { + LOGGER.warn("{} already exists without dead-letter setup. Dead lettered messages to it will be lost. " + + "To solve this, re-create the queue with the x-dead-letter-exchange argument set up.", + queueSpecification.getName()); + return Mono.empty(); + }) + .then(); + } + private void invalidateObject(Channel channel) { try { pool.invalidateObject(channel); diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index 5b5ce39..c3ce6e5 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -21,7 +21,6 @@ package org.apache.james.events; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; -import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue; @@ -43,7 +42,6 @@ import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.rabbitmq.AcknowledgableDelivery; -import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; @@ -139,8 +137,7 @@ class GroupRegistration implements Registration { return deserializeEvent(eventAsBytes) .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount) - .flatMap(any -> runListener(event)) - .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) + .flatMap(any -> runListenerReliably(currentRetryCount, event)) .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic()))) .onErrorResume(e -> { LOGGER.error("Unable to process delivery for group {}", group, e); @@ -150,6 +147,11 @@ class GroupRegistration implements Registration { }); } + public Mono<Void> runListenerReliably(int currentRetryCount, Event event) { + return runListener(event) + .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)); + } + private Mono<Event> deserializeEvent(byte[] eventAsBytes) { return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8))) .subscribeOn(Schedulers.parallel()); diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index cb224d4..88a2e09 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -19,16 +19,51 @@ package org.apache.james.events; +import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; +import static org.apache.james.backends.rabbitmq.Constants.DURABLE; +import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; +import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; +import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; +import static org.apache.james.backends.rabbitmq.Constants.deadLetterQueue; +import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT; + +import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.steveash.guavate.Guavate; +import com.google.common.base.Preconditions; +import reactor.core.Disposable; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.AcknowledgableDelivery; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ConsumeOptions; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; +import reactor.util.retry.Retry; class GroupRegistrationHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistrationHandler.class); + + private final GroupRegistration.WorkQueueName queueName; + + public static class GroupRegistrationHandlerGroup extends Group { + + } + + static final Group GROUP = new GroupRegistrationHandlerGroup(); + private final NamingStrategy namingStrategy; private final Map<Group, GroupRegistration> groupRegistrations; private final EventSerializer eventSerializer; @@ -38,10 +73,13 @@ class GroupRegistrationHandler { private final RetryBackoffConfiguration retryBackoff; private final EventDeadLetters eventDeadLetters; private final ListenerExecutor listenerExecutor; + private final EventBusId eventBusId; + private Optional<Receiver> receiver; + private Optional<Disposable> consumer; GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, RetryBackoffConfiguration retryBackoff, - EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor) { + EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId) { this.namingStrategy = namingStrategy; this.eventSerializer = eventSerializer; this.channelPool = channelPool; @@ -50,7 +88,11 @@ class GroupRegistrationHandler { this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; this.listenerExecutor = listenerExecutor; + this.eventBusId = eventBusId; this.groupRegistrations = new ConcurrentHashMap<>(); + this.queueName = namingStrategy.workQueue(GROUP); + this.consumer = Optional.empty(); + this.receiver = Optional.empty(); } GroupRegistration retrieveGroupRegistration(Group group) { @@ -58,11 +100,68 @@ class GroupRegistrationHandler { .orElseThrow(() -> new GroupRegistrationNotFound(group)); } + public void start() { + channelPool.createWorkQueue( + QueueSpecification.queue(queueName.asString()) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(!AUTO_DELETE) + .arguments(deadLetterQueue(namingStrategy.deadLetterExchange())), + BindingSpecification.binding() + .exchange(namingStrategy.exchange()) + .queue(queueName.asString()) + .routingKey(EMPTY_ROUTING_KEY)) + .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) + .block(); + + this.receiver = Optional.of(receiverProvider.createReceiver()); + this.consumer = Optional.of(consumeWorkQueue()); + } + + private Disposable consumeWorkQueue() { + Preconditions.checkState(receiver.isPresent()); + return receiver.get().consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)) + .publishOn(Schedulers.parallel()) + .filter(delivery -> Objects.nonNull(delivery.getBody())) + .flatMap(this::deliver, EventBus.EXECUTION_RATE) + .subscribeOn(Schedulers.elastic()) + .subscribe(); + } + + private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { + byte[] eventAsBytes = acknowledgableDelivery.getBody(); + + return deserializeEvent(eventAsBytes) + .flatMapIterable(aa -> groupRegistrations.values() + .stream() + .map(group -> Pair.of(group, aa)) + .collect(Guavate.toImmutableList())) + .flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight())) + .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.elastic())) + .then() + .onErrorResume(e -> { + LOGGER.error("Unable to process delivery for group {}", GROUP, e); + return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE)) + .subscribeOn(Schedulers.elastic()) + .then(); + }); + } + + private Mono<Event> deserializeEvent(byte[] eventAsBytes) { + return Mono.fromCallable(() -> eventSerializer.asEvent(new String(eventAsBytes, StandardCharsets.UTF_8))) + .subscribeOn(Schedulers.parallel()); + } + void stop() { groupRegistrations.values().forEach(GroupRegistration::unregister); + consumer.ifPresent(Disposable::dispose); + receiver.ifPresent(Receiver::close); } Registration register(EventListener.ReactiveEventListener listener, Group group) { + if (groupRegistrations.isEmpty()) { + start(); + } return groupRegistrations .compute(group, (groupToRegister, oldGroupRegistration) -> { if (oldGroupRegistration != null) { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index ad50412..74b092d 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -83,7 +83,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff); - groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor); + groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId); eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters); eventDispatcher.start(); @@ -98,7 +98,7 @@ public class RabbitMQEventBus implements EventBus, Startable { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff); - groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor); + groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId); eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters); keyRegistrationHandler.declareQueue(); diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java index ed68d5a..525aa76 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java @@ -138,7 +138,9 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus2.stop(); eventBus3.stop(); eventBusWithKeyHandlerNotStarted.stop(); - ALL_GROUPS.stream() + Stream.concat( + ALL_GROUPS.stream(), + Stream.of(GroupRegistrationHandler.GROUP)) .map(TEST_NAMING_STRATEGY::workQueue) .forEach(queueName -> rabbitMQExtension.getSender().delete(QueueSpecification.queue(queueName.asString())).block()); rabbitMQExtension.getSender() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
