This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b0cd610c5593c85301add2dd58f114fd94291553 Author: Benoit Tellier <[email protected]> AuthorDate: Fri Jan 27 14:06:30 2023 +0700 JAMES-3694 Apply queue expiracy only for per-node queues Those are temporary RPC-like response queues tied to a specific James instance, needing cleanup. Other, permanent queues should not be deleted. --- .../org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java | 1 - .../main/java/org/apache/james/events/KeyReconnectionHandler.java | 6 ++++-- .../main/java/org/apache/james/events/KeyRegistrationHandler.java | 5 ++++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java index 9a242ebe51..a47e17bece 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java @@ -767,7 +767,6 @@ public class RabbitMQConfiguration { if (allowQuorum && useQuorumQueues) { builder.quorumQueue().replicationFactor(quorumQueueReplicationFactor); } - queueTTL.ifPresent(builder::queueTTL); return builder; } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java index 1c2ba21d4e..e9c5a728fb 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java @@ -26,6 +26,7 @@ import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import javax.inject.Inject; +import org.apache.james.backends.rabbitmq.QueueArguments; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; @@ -55,8 +56,9 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection public Publisher<Void> handleReconnection(Connection connection) { return Mono.fromRunnable(() -> { try (Channel channel = connection.createChannel()) { - channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, - configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build()); + QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM); + configuration.getQueueTTL().ifPresent(builder::queueTTL); + channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, builder.build()); } catch (Exception e) { LOGGER.error("Error recovering connection", e); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index 5fc2a5ec84..8b32e11db1 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Optional; import java.util.function.Predicate; +import org.apache.james.backends.rabbitmq.QueueArguments; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.metrics.api.MetricFactory; @@ -122,12 +123,14 @@ class KeyRegistrationHandler { } private void declareQueue(Sender sender) { + QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM); + configuration.getQueueTTL().ifPresent(builder::queueTTL); sender.declareQueue( QueueSpecification.queue(registrationQueue.asString()) .durable(configuration.isEventBusNotificationDurabilityEnabled()) .exclusive(!EXCLUSIVE) .autoDelete(AUTO_DELETE) - .arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build())) + .arguments(builder.build())) .timeout(TOPOLOGY_CHANGES_TIMEOUT) .map(AMQP.Queue.DeclareOk::getQueue) .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor())) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
