This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b0cd610c5593c85301add2dd58f114fd94291553
Author: Benoit Tellier <[email protected]>
AuthorDate: Fri Jan 27 14:06:30 2023 +0700

    JAMES-3694 Apply queue expiracy only for per-node queues
    
    Those are temporary RPC-like response queues tied to a
    specific James instance, needing cleanup. Other,
    permanent queues should not be deleted.
---
 .../org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java   | 1 -
 .../main/java/org/apache/james/events/KeyReconnectionHandler.java   | 6 ++++--
 .../main/java/org/apache/james/events/KeyRegistrationHandler.java   | 5 ++++-
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 9a242ebe51..a47e17bece 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -767,7 +767,6 @@ public class RabbitMQConfiguration {
         if (allowQuorum && useQuorumQueues) {
             
builder.quorumQueue().replicationFactor(quorumQueueReplicationFactor);
         }
-        queueTTL.ifPresent(builder::queueTTL);
         return builder;
     }
 
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 1c2ba21d4e..e9c5a728fb 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -26,6 +26,7 @@ import static 
org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
@@ -55,8 +56,9 @@ public class KeyReconnectionHandler implements 
SimpleConnectionPool.Reconnection
     public Publisher<Void> handleReconnection(Connection connection) {
         return Mono.fromRunnable(() -> {
             try (Channel channel = connection.createChannel()) {
-                
channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, 
!EXCLUSIVE, AUTO_DELETE,
-                    
configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build());
+                QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+                configuration.getQueueTTL().ifPresent(builder::queueTTL);
+                
channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, 
!EXCLUSIVE, AUTO_DELETE, builder.build());
             } catch (Exception e) {
                 LOGGER.error("Error recovering connection", e);
             }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index 5fc2a5ec84..8b32e11db1 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.function.Predicate;
 
+import org.apache.james.backends.rabbitmq.QueueArguments;
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.metrics.api.MetricFactory;
@@ -122,12 +123,14 @@ class KeyRegistrationHandler {
     }
 
     private void declareQueue(Sender sender) {
+        QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+        configuration.getQueueTTL().ifPresent(builder::queueTTL);
         sender.declareQueue(
             QueueSpecification.queue(registrationQueue.asString())
                 
.durable(configuration.isEventBusNotificationDurabilityEnabled())
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build()))
+                .arguments(builder.build()))
             .timeout(TOPOLOGY_CHANGES_TIMEOUT)
             .map(AMQP.Queue.DeclareOk::getQueue)
             .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), 
retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to