This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e57a96fa1 AmqpForwardAttribute: support HA by allowing specifying 
several hosts (#2430)
7e57a96fa1 is described below

commit 7e57a96fa181af21a372a0a2d94a0ba7cf930db6
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Thu Oct 3 06:13:24 2024 +0400

    AmqpForwardAttribute: support HA by allowing specifying several hosts 
(#2430)
---
 .../backends/rabbitmq/RabbitMQConfiguration.java   |   2 +-
 .../servers/partials/AmqpForwardAttribute.adoc     |   3 +-
 .../transport/mailets/AmqpForwardAttribute.java    | 100 ++++++++++++---------
 3 files changed, 62 insertions(+), 43 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index e99b476ab3..203ef0714e 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -389,7 +389,7 @@ public class RabbitMQConfiguration {
         static final int DEFAULT_HANDSHAKE_TIMEOUT = 10_000;
         static final int DEFAULT_SHUTDOWN_TIMEOUT = 10_000;
         static final int DEFAULT_NETWORK_RECOVERY_INTERVAL = 100;
-        static final int DEFAULT_PORT = 5672;
+        public static final int DEFAULT_PORT = 5672;
         static final Duration DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT = 
Duration.ofDays(1);
 
         private final URI amqpUri;
diff --git a/docs/modules/servers/partials/AmqpForwardAttribute.adoc 
b/docs/modules/servers/partials/AmqpForwardAttribute.adoc
index 46b4e79460..fdd3610f78 100644
--- a/docs/modules/servers/partials/AmqpForwardAttribute.adoc
+++ b/docs/modules/servers/partials/AmqpForwardAttribute.adoc
@@ -7,7 +7,8 @@ It takes 4 parameters:
 * attribute (mandatory): content to be forwarded, expected to be a Map<String, 
byte[]>
 where the byte[] content is issued from a MimeBodyPart.
 It is typically generated from the StripAttachment mailet.
-* uri (mandatory): AMQP URI defining the server where to send the attachment.
+* uri (mandatory): AMQP URI defining the server where to send the attachment. 
High availability is supported with a
+  coma separated list of uris, whost and credentials are taken from the first 
URI specified.
 * exchange (mandatory): name of the AMQP exchange.
 * exchange_type (optional, default to "direct"): type of the exchange. Can be 
"direct", "fanout", "topic", "headers".
 * routing_key (optional, default to empty string): name of the routing key on 
this exchange.
diff --git 
a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
 
b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
index 114a6b3f14..f71e62ee72 100644
--- 
a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
+++ 
b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java
@@ -20,7 +20,6 @@
 package org.apache.james.transport.mailets;
 
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Arrays;
@@ -39,6 +38,7 @@ import 
org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.util.Host;
 import org.apache.mailet.Attribute;
 import org.apache.mailet.AttributeName;
 import org.apache.mailet.AttributeValue;
@@ -75,7 +75,8 @@ import reactor.rabbitmq.Sender;
  * <li>attribute (mandatory): content to be forwarded, expected to be a 
Map&lt;String, byte[]&gt;
  * where the byte[] content is issued from a MimeBodyPart.
  * It is typically generated from the StripAttachment mailet.</li>
- * <li>uri (mandatory): AMQP URI defining the server where to send the 
attachment.</li>
+ * <li>uri (mandatory): AMQP URI defining the server where to send the 
attachment. High availability is supported with a
+ * coma separated list of uris, whost and credentials are taken from the first 
URI specified.</li>
  * <li>exchange (mandatory): name of the AMQP exchange.</li>
  * <li>exchange_type (optional, default to "direct"): type of the exchange. 
Valid values are: direct, fanout, topic, headers.</li>
  * <li>routing_key (optional, default to empty string): name of the routing 
key on this exchange.</li>
@@ -99,7 +100,6 @@ public class AmqpForwardAttribute extends GenericMailet {
     private static final List<String> VALIDATE_EXCHANGE_TYPES = 
Arrays.stream(BuiltinExchangeType.values()).map(BuiltinExchangeType::getType).toList();
     static final RabbitMQConfiguration.ManagementCredentials 
DEFAULT_MANAGEMENT_CREDENTIAL = new 
RabbitMQConfiguration.ManagementCredentials(DEFAULT_USER, DEFAULT_PASSWORD);
 
-
     public static final String URI_PARAMETER_NAME = "uri";
     public static final String EXCHANGE_PARAMETER_NAME = "exchange";
     public static final String EXCHANGE_TYPE_PARAMETER_NAME = "exchange_type";
@@ -126,48 +126,66 @@ public class AmqpForwardAttribute extends GenericMailet {
 
     @Override
     public void init() throws MailetException {
-        MailetConfig mailetConfig = getMailetConfig();
-        String uri = preInit(mailetConfig);
+        connectionPool = new SimpleConnectionPool(new 
RabbitMQConnectionFactory(rabbitMQConfiguration()), 
SimpleConnectionPool.Configuration.builder()
+            .retries(2)
+            .initialDelay(Duration.ofMillis(5)));
+        reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
+            ReactorRabbitMQChannelPool.Configuration.DEFAULT,
+            metricFactory, new NoopGaugeRegistry());
+        reactorRabbitMQChannelPool.start();
+        sender = reactorRabbitMQChannelPool.getSender();
 
-        try {
-            URI amqpUri = new URI(uri);
-            RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
-                .amqpUri(amqpUri)
-                .managementUri(amqpUri)
-                .managementCredentials(retrieveCredentials(amqpUri))
-                .maxRetries(MAX_THREE_RETRIES)
-                .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
-                
.connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
-                
.channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
-                
.handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
-                
.shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
-                
.networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND)
-                .build();
-            connectionPool = new SimpleConnectionPool(new 
RabbitMQConnectionFactory(rabbitMQConfiguration), 
SimpleConnectionPool.Configuration.builder()
-                .retries(2)
-                .initialDelay(Duration.ofMillis(5)));
-            reactorRabbitMQChannelPool = new 
ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(),
-                ReactorRabbitMQChannelPool.Configuration.DEFAULT,
-                metricFactory, new NoopGaugeRegistry());
-            reactorRabbitMQChannelPool.start();
-            sender = reactorRabbitMQChannelPool.getSender();
-
-            ExchangeSpecification exchangeSpecification = 
Optional.ofNullable(exchangeType)
-                .map(type -> 
ExchangeSpecification.exchange(exchange).type(type))
-                .orElse(ExchangeSpecification.exchange(exchange));
-
-            sender.declareExchange(exchangeSpecification)
-                .onErrorResume(error -> error instanceof 
ShutdownSignalException && error.getMessage().contains("reply-code=406, 
reply-text=PRECONDITION_FAILED"),
-                    error -> {
-                        LOGGER.warn("Exchange `{}` already exists but with 
different configuration. Ignoring this error. \nError message: {}", exchange, 
error.getMessage());
-                        return Mono.empty();
-                    })
-                .block();
-        } catch (URISyntaxException e) {
-            throw new RuntimeException(e);
+        ExchangeSpecification exchangeSpecification = 
Optional.ofNullable(exchangeType)
+            .map(type -> ExchangeSpecification.exchange(exchange).type(type))
+            .orElse(ExchangeSpecification.exchange(exchange));
+
+        sender.declareExchange(exchangeSpecification)
+            .onErrorResume(error -> error instanceof ShutdownSignalException 
&& error.getMessage().contains("reply-code=406, 
reply-text=PRECONDITION_FAILED"),
+                error -> {
+                    LOGGER.warn("Exchange `{}` already exists but with 
different configuration. Ignoring this error. \nError message: {}", exchange, 
error.getMessage());
+                    return Mono.empty();
+                })
+            .block();
+    }
+
+    private RabbitMQConfiguration rabbitMQConfiguration() throws 
MailetException {
+        List<URI> uris = getUris();
+        URI amqpUri = uris.get(0);
+        RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
+            .amqpUri(amqpUri)
+            .managementUri(amqpUri)
+            .managementCredentials(retrieveCredentials(amqpUri))
+            .maxRetries(MAX_THREE_RETRIES)
+            .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+            
.connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            
.channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            
.networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND)
+            .hosts(uris.stream()
+                .map(AmqpForwardAttribute::asHost)
+                .toList())
+            .build();
+        return rabbitMQConfiguration;
+    }
+
+    private static Host asHost(URI aUri) {
+        if (aUri.getPort() > 0) {
+            return Host.from(aUri.getHost(), aUri.getPort());
+        } else {
+            return Host.from(aUri.getHost(), 
RabbitMQConfiguration.Builder.DEFAULT_PORT);
         }
     }
 
+    private List<URI> getUris() throws MailetException {
+        return Splitter.on(',')
+            .trimResults()
+            .omitEmptyStrings()
+            .splitToStream(preInit(getMailetConfig()))
+            .map(Throwing.function(URI::new))
+            .toList();
+    }
+
     @VisibleForTesting
     RabbitMQConfiguration.ManagementCredentials retrieveCredentials(URI 
amqpUri) {
         return Optional.ofNullable(amqpUri.getUserInfo())


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to