This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 7e57a96fa1 AmqpForwardAttribute: support HA by allowing specifying several hosts (#2430) 7e57a96fa1 is described below commit 7e57a96fa181af21a372a0a2d94a0ba7cf930db6 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Thu Oct 3 06:13:24 2024 +0400 AmqpForwardAttribute: support HA by allowing specifying several hosts (#2430) --- .../backends/rabbitmq/RabbitMQConfiguration.java | 2 +- .../servers/partials/AmqpForwardAttribute.adoc | 3 +- .../transport/mailets/AmqpForwardAttribute.java | 100 ++++++++++++--------- 3 files changed, 62 insertions(+), 43 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java index e99b476ab3..203ef0714e 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java @@ -389,7 +389,7 @@ public class RabbitMQConfiguration { static final int DEFAULT_HANDSHAKE_TIMEOUT = 10_000; static final int DEFAULT_SHUTDOWN_TIMEOUT = 10_000; static final int DEFAULT_NETWORK_RECOVERY_INTERVAL = 100; - static final int DEFAULT_PORT = 5672; + public static final int DEFAULT_PORT = 5672; static final Duration DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT = Duration.ofDays(1); private final URI amqpUri; diff --git a/docs/modules/servers/partials/AmqpForwardAttribute.adoc b/docs/modules/servers/partials/AmqpForwardAttribute.adoc index 46b4e79460..fdd3610f78 100644 --- a/docs/modules/servers/partials/AmqpForwardAttribute.adoc +++ b/docs/modules/servers/partials/AmqpForwardAttribute.adoc @@ -7,7 +7,8 @@ It takes 4 parameters: * attribute (mandatory): content to be forwarded, expected to be a Map<String, byte[]> where the byte[] content is issued from a MimeBodyPart. It is typically generated from the StripAttachment mailet. -* uri (mandatory): AMQP URI defining the server where to send the attachment. +* uri (mandatory): AMQP URI defining the server where to send the attachment. High availability is supported with a + coma separated list of uris, whost and credentials are taken from the first URI specified. * exchange (mandatory): name of the AMQP exchange. * exchange_type (optional, default to "direct"): type of the exchange. Can be "direct", "fanout", "topic", "headers". * routing_key (optional, default to empty string): name of the routing key on this exchange. diff --git a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java index 114a6b3f14..f71e62ee72 100644 --- a/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java +++ b/mailet/amqp/src/main/java/org/apache/james/transport/mailets/AmqpForwardAttribute.java @@ -20,7 +20,6 @@ package org.apache.james.transport.mailets; import java.net.URI; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; @@ -39,6 +38,7 @@ import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.metrics.api.NoopGaugeRegistry; +import org.apache.james.util.Host; import org.apache.mailet.Attribute; import org.apache.mailet.AttributeName; import org.apache.mailet.AttributeValue; @@ -75,7 +75,8 @@ import reactor.rabbitmq.Sender; * <li>attribute (mandatory): content to be forwarded, expected to be a Map<String, byte[]> * where the byte[] content is issued from a MimeBodyPart. * It is typically generated from the StripAttachment mailet.</li> - * <li>uri (mandatory): AMQP URI defining the server where to send the attachment.</li> + * <li>uri (mandatory): AMQP URI defining the server where to send the attachment. High availability is supported with a + * coma separated list of uris, whost and credentials are taken from the first URI specified.</li> * <li>exchange (mandatory): name of the AMQP exchange.</li> * <li>exchange_type (optional, default to "direct"): type of the exchange. Valid values are: direct, fanout, topic, headers.</li> * <li>routing_key (optional, default to empty string): name of the routing key on this exchange.</li> @@ -99,7 +100,6 @@ public class AmqpForwardAttribute extends GenericMailet { private static final List<String> VALIDATE_EXCHANGE_TYPES = Arrays.stream(BuiltinExchangeType.values()).map(BuiltinExchangeType::getType).toList(); static final RabbitMQConfiguration.ManagementCredentials DEFAULT_MANAGEMENT_CREDENTIAL = new RabbitMQConfiguration.ManagementCredentials(DEFAULT_USER, DEFAULT_PASSWORD); - public static final String URI_PARAMETER_NAME = "uri"; public static final String EXCHANGE_PARAMETER_NAME = "exchange"; public static final String EXCHANGE_TYPE_PARAMETER_NAME = "exchange_type"; @@ -126,48 +126,66 @@ public class AmqpForwardAttribute extends GenericMailet { @Override public void init() throws MailetException { - MailetConfig mailetConfig = getMailetConfig(); - String uri = preInit(mailetConfig); + connectionPool = new SimpleConnectionPool(new RabbitMQConnectionFactory(rabbitMQConfiguration()), SimpleConnectionPool.Configuration.builder() + .retries(2) + .initialDelay(Duration.ofMillis(5))); + reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), + ReactorRabbitMQChannelPool.Configuration.DEFAULT, + metricFactory, new NoopGaugeRegistry()); + reactorRabbitMQChannelPool.start(); + sender = reactorRabbitMQChannelPool.getSender(); - try { - URI amqpUri = new URI(uri); - RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() - .amqpUri(amqpUri) - .managementUri(amqpUri) - .managementCredentials(retrieveCredentials(amqpUri)) - .maxRetries(MAX_THREE_RETRIES) - .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS) - .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) - .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) - .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) - .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) - .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND) - .build(); - connectionPool = new SimpleConnectionPool(new RabbitMQConnectionFactory(rabbitMQConfiguration), SimpleConnectionPool.Configuration.builder() - .retries(2) - .initialDelay(Duration.ofMillis(5))); - reactorRabbitMQChannelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), - ReactorRabbitMQChannelPool.Configuration.DEFAULT, - metricFactory, new NoopGaugeRegistry()); - reactorRabbitMQChannelPool.start(); - sender = reactorRabbitMQChannelPool.getSender(); - - ExchangeSpecification exchangeSpecification = Optional.ofNullable(exchangeType) - .map(type -> ExchangeSpecification.exchange(exchange).type(type)) - .orElse(ExchangeSpecification.exchange(exchange)); - - sender.declareExchange(exchangeSpecification) - .onErrorResume(error -> error instanceof ShutdownSignalException && error.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED"), - error -> { - LOGGER.warn("Exchange `{}` already exists but with different configuration. Ignoring this error. \nError message: {}", exchange, error.getMessage()); - return Mono.empty(); - }) - .block(); - } catch (URISyntaxException e) { - throw new RuntimeException(e); + ExchangeSpecification exchangeSpecification = Optional.ofNullable(exchangeType) + .map(type -> ExchangeSpecification.exchange(exchange).type(type)) + .orElse(ExchangeSpecification.exchange(exchange)); + + sender.declareExchange(exchangeSpecification) + .onErrorResume(error -> error instanceof ShutdownSignalException && error.getMessage().contains("reply-code=406, reply-text=PRECONDITION_FAILED"), + error -> { + LOGGER.warn("Exchange `{}` already exists but with different configuration. Ignoring this error. \nError message: {}", exchange, error.getMessage()); + return Mono.empty(); + }) + .block(); + } + + private RabbitMQConfiguration rabbitMQConfiguration() throws MailetException { + List<URI> uris = getUris(); + URI amqpUri = uris.get(0); + RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() + .amqpUri(amqpUri) + .managementUri(amqpUri) + .managementCredentials(retrieveCredentials(amqpUri)) + .maxRetries(MAX_THREE_RETRIES) + .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS) + .connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) + .channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) + .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) + .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND) + .networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND) + .hosts(uris.stream() + .map(AmqpForwardAttribute::asHost) + .toList()) + .build(); + return rabbitMQConfiguration; + } + + private static Host asHost(URI aUri) { + if (aUri.getPort() > 0) { + return Host.from(aUri.getHost(), aUri.getPort()); + } else { + return Host.from(aUri.getHost(), RabbitMQConfiguration.Builder.DEFAULT_PORT); } } + private List<URI> getUris() throws MailetException { + return Splitter.on(',') + .trimResults() + .omitEmptyStrings() + .splitToStream(preInit(getMailetConfig())) + .map(Throwing.function(URI::new)) + .toList(); + } + @VisibleForTesting RabbitMQConfiguration.ManagementCredentials retrieveCredentials(URI amqpUri) { return Optional.ofNullable(amqpUri.getUserInfo()) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org