This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new a07585e19a JAMES-4033 Option to propagate event bus dispatch error
a07585e19a is described below

commit a07585e19a64b82c2b0930873a622304621733b1
Author: Quan Tran <[email protected]>
AuthorDate: Wed May 15 17:01:42 2024 +0700

    JAMES-4033 Option to propagate event bus dispatch error
    
    Allowing for example IMAP APPEND command not to fail when RabbitMQ has 
issues.
---
 .../backends/rabbitmq/RabbitMQConfiguration.java   | 25 ++++++++++--
 .../james/backends/rabbitmq/DockerRabbitMQ.java    | 14 +++++++
 .../rabbitmq/RabbitMQConfigurationTest.java        | 26 +++++++++++++
 .../org/apache/james/events/EventDispatcher.java   |  9 ++++-
 .../org/apache/james/events/NetworkErrorTest.java  | 45 ++++++++++++++++++++--
 .../modules/ROOT/pages/configure/rabbitmq.adoc     |  4 ++
 src/site/xdoc/server/config-rabbitmq.xml           |  4 ++
 7 files changed, 118 insertions(+), 9 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 7c5071a44d..e99b476ab3 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -311,6 +311,7 @@ public class RabbitMQConfiguration {
     private static final String QUEUE_DELIVERY_LIMIT = 
"quorum.queues.delivery.limit";
     private static final String EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED = 
"event.bus.notification.durability.enabled";
     private static final String EVENT_BUS_PUBLISH_CONFIRM_ENABLED = 
"event.bus.publish.confirm.enabled";
+    private static final String EVENT_BUS_PROPAGATE_DISPATCH_ERROR = 
"event.bus.propagate.dispatch.error";
     private static final String TASK_QUEUE_CONSUMER_TIMEOUT = 
"task.queue.consumer.timeout";
     private static final String VHOST = "vhost";
 
@@ -411,6 +412,7 @@ public class RabbitMQConfiguration {
         private Optional<Long> queueTTL;
         private Optional<Boolean> eventBusPublishConfirmEnabled;
         private Optional<Boolean> eventBusNotificationDurabilityEnabled;
+        private Optional<Boolean> eventBusPropagateDispatchError;
         private Optional<String> vhost;
         private Optional<Duration> taskQueueConsumerTimeout;
 
@@ -437,6 +439,7 @@ public class RabbitMQConfiguration {
             this.eventBusNotificationDurabilityEnabled = Optional.empty();
             this.vhost = Optional.empty();
             this.taskQueueConsumerTimeout = Optional.empty();
+            this.eventBusPropagateDispatchError = Optional.empty();
         }
 
         public Builder maxRetries(int maxRetries) {
@@ -524,6 +527,11 @@ public class RabbitMQConfiguration {
             return this;
         }
 
+        public Builder eventBusPropagateDispatchError(Boolean 
eventBusPropagateDispatchError) {
+            this.eventBusPropagateDispatchError = 
Optional.ofNullable(eventBusPropagateDispatchError);
+            return this;
+        }
+
         public Builder useSslManagement(Boolean useSslForManagement) {
             this.useSslManagement = Optional.of(useSslForManagement);
             return this;
@@ -580,7 +588,8 @@ public class RabbitMQConfiguration {
                     eventBusPublishConfirmEnabled.orElse(true),
                     eventBusNotificationDurabilityEnabled.orElse(true),
                     vhost,
-                    
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT));
+                    
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT),
+                    eventBusPropagateDispatchError.orElse(true));
         }
 
         private List<Host> hostsDefaultingToUri() {
@@ -655,6 +664,7 @@ public class RabbitMQConfiguration {
             .queueTTL(queueTTL)
             
.eventBusNotificationDurabilityEnabled(configuration.getBoolean(EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED,
 null))
             
.eventBusPublishConfirmEnabled(configuration.getBoolean(EVENT_BUS_PUBLISH_CONFIRM_ENABLED,
 null))
+            
.eventBusPropagateDispatchError(configuration.getBoolean(EVENT_BUS_PROPAGATE_DISPATCH_ERROR,
 null))
             .vhost(vhost)
             .taskQueueConsumerTimeout(taskQueueConsumerTimeout)
             .build();
@@ -730,13 +740,14 @@ public class RabbitMQConfiguration {
     private final boolean eventBusNotificationDurabilityEnabled;
     private final Optional<String> vhost;
     private final Duration taskQueueConsumerTimeout;
+    private final boolean eventBusPropagateDispatchError;
 
     private RabbitMQConfiguration(URI uri, URI managementUri, 
ManagementCredentials managementCredentials, int maxRetries, int minDelayInMs,
                                   int connectionTimeoutInMs, int 
channelRpcTimeoutInMs, int handshakeTimeoutInMs, int shutdownTimeoutInMs,
                                   int networkRecoveryIntervalInMs, Boolean 
useSsl, Boolean useSslManagement, SSLConfiguration sslConfiguration,
                                   boolean useQuorumQueues, Optional<Integer> 
quorumQueueDeliveryLimit, int quorumQueueReplicationFactor, List<Host> hosts, 
Optional<Long> queueTTL,
                                   boolean eventBusPublishConfirmEnabled, 
boolean eventBusNotificationDurabilityEnabled,
-                                  Optional<String> vhost, Duration 
taskQueueConsumerTimeout) {
+                                  Optional<String> vhost, Duration 
taskQueueConsumerTimeout, boolean eventBusPropagateDispatchError) {
         this.uri = uri;
         this.managementUri = managementUri;
         this.managementCredentials = managementCredentials;
@@ -759,6 +770,7 @@ public class RabbitMQConfiguration {
         this.eventBusNotificationDurabilityEnabled = 
eventBusNotificationDurabilityEnabled;
         this.vhost = vhost;
         this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
+        this.eventBusPropagateDispatchError = eventBusPropagateDispatchError;
     }
 
     public URI getUri() {
@@ -871,6 +883,10 @@ public class RabbitMQConfiguration {
         return quorumQueueDeliveryLimit;
     }
 
+    public boolean eventBusPropagateDispatchError() {
+        return eventBusPropagateDispatchError;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof RabbitMQConfiguration) {
@@ -897,7 +913,8 @@ public class RabbitMQConfiguration {
                 && Objects.equals(this.eventBusPublishConfirmEnabled, 
that.eventBusPublishConfirmEnabled)
                 && Objects.equals(this.eventBusNotificationDurabilityEnabled, 
that.eventBusNotificationDurabilityEnabled)
                 && Objects.equals(this.vhost, that.vhost)
-                && Objects.equals(this.taskQueueConsumerTimeout, 
that.taskQueueConsumerTimeout);
+                && Objects.equals(this.taskQueueConsumerTimeout, 
that.taskQueueConsumerTimeout)
+                && Objects.equals(this.eventBusPropagateDispatchError, 
that.eventBusPropagateDispatchError);
         }
         return false;
     }
@@ -906,6 +923,6 @@ public class RabbitMQConfiguration {
     public final int hashCode() {
         return Objects.hash(uri, managementUri, maxRetries, minDelayInMs, 
connectionTimeoutInMs, quorumQueueReplicationFactor, quorumQueueDeliveryLimit, 
useQuorumQueues, hosts,
             channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, 
networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement,
-            sslConfiguration, queueTTL, eventBusPublishConfirmEnabled, 
eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout);
+            sslConfiguration, queueTTL, eventBusPublishConfirmEnabled, 
eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout, 
eventBusPropagateDispatchError);
     }
 }
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
index da67056664..3c4557fa3c 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
@@ -272,6 +272,20 @@ public class DockerRabbitMQ {
             .build();
     }
 
+    public RabbitMQConfiguration.Builder getConfigurationBuilder() throws 
URISyntaxException {
+        return RabbitMQConfiguration.builder()
+            .amqpUri(amqpUri())
+            .managementUri(managementUri())
+            .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
+            .maxRetries(MAX_THREE_RETRIES)
+            .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+            
.connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            
.channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+            
.networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND);
+    }
+
     public RabbitMQConfiguration withQuorumQueueConfiguration() throws 
URISyntaxException {
         return RabbitMQConfiguration.builder()
             .amqpUri(amqpUri())
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
index 0acc56564e..d18f34c84e 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
@@ -572,6 +572,32 @@ class RabbitMQConfigurationTest {
             .isEqualTo(Optional.of("vhosttest"));
     }
 
+    @Test
+    void eventBusPropagateDispatchErrorShouldBeTrueByDefault() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", 
"amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", 
"http://james:james@rabbitmqhost:15672/api/";);
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", 
DEFAULT_PASSWORD_STRING);
+
+        
assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
+            .isTrue();
+    }
+
+    @Test
+    void eventBusPropagateDispatchErrorShouldBeDisabledWhenConfiguredFalse() {
+        PropertiesConfiguration configuration = new PropertiesConfiguration();
+        configuration.addProperty("uri", 
"amqp://james:james@rabbitmqhost:5672");
+        configuration.addProperty("management.uri", 
"http://james:james@rabbitmqhost:15672/api/";);
+        configuration.addProperty("management.user", DEFAULT_USER);
+        configuration.addProperty("management.password", 
DEFAULT_PASSWORD_STRING);
+
+        configuration.addProperty("event.bus.propagate.dispatch.error", 
"false");
+
+        
assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
+            .isFalse();
+    }
+
     @Nested
     class ManagementCredentialsTest {
         @Test
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 3282389445..365e6001f3 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -168,7 +168,14 @@ public class EventDispatcher {
                 event.getEventId().getId(),
                 ex))
             .onErrorResume(ex -> deadLetters.store(dispatchingFailureGroup, 
event)
-                .then(Mono.error(ex)));
+                .then(propagateErrorIfNeeded(ex)));
+    }
+
+    private Mono<Void> propagateErrorIfNeeded(Throwable throwable) {
+        if (configuration.eventBusPropagateDispatchError()) {
+            return Mono.error(throwable);
+        }
+        return Mono.empty();
     }
 
     private Mono<Void> remoteKeysDispatch(byte[] serializedEvent, 
Set<RegistrationKey> keys) {
diff --git 
a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
 
b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
index f45f018fa1..5b9e83cbb1 100644
--- 
a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
+++ 
b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
@@ -24,11 +24,14 @@ import static 
org.apache.james.events.EventBusTestFixture.GROUP_A;
 import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
 import static 
org.apache.james.events.EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION;
 import static org.apache.james.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.verify;
 
 import java.util.NoSuchElementException;
 
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.RabbitMQExtension;
 import org.apache.james.backends.rabbitmq.RabbitMQFixture;
 import org.apache.james.events.EventBusTestFixture.TestEventSerializer;
@@ -44,18 +47,18 @@ class NetworkErrorTest {
         .isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
 
     private RabbitMQEventBus eventBus;
+    private EventDeadLetters eventDeadLetters;
 
     @BeforeEach
     void setUp() throws Exception {
-        MemoryEventDeadLetters memoryEventDeadLetters = new 
MemoryEventDeadLetters();
-
+        eventDeadLetters = new MemoryEventDeadLetters();
 
         EventSerializer eventSerializer = new TestEventSerializer();
         RoutingKeyConverter routingKeyConverter = 
RoutingKeyConverter.forFactories(new 
EventBusTestFixture.TestRegistrationKeyFactory());
 
         eventBus = new RabbitMQEventBus(new NamingStrategy(new 
EventBusName("test")), rabbitMQExtension.getSender(), 
rabbitMQExtension.getReceiverProvider(),
             eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
-            memoryEventDeadLetters, new RecordingMetricFactory(), 
rabbitMQExtension.getRabbitChannelPool(),
+            eventDeadLetters, new RecordingMetricFactory(), 
rabbitMQExtension.getRabbitChannelPool(),
             EventBusId.random(), 
rabbitMQExtension.getRabbitMQ().getConfiguration());
 
         eventBus.start();
@@ -63,6 +66,7 @@ class NetworkErrorTest {
 
     @AfterEach
     void tearDown() {
+        rabbitMQExtension.getRabbitMQ().unpause();
         eventBus.stop();
     }
 
@@ -74,7 +78,7 @@ class NetworkErrorTest {
         rabbitMQExtension.getRabbitMQ().pause();
 
         assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
-            .getCause()
+            .cause()
             .isInstanceOf(NoSuchElementException.class)
             .hasMessageContaining("Timeout waiting for idle object");
 
@@ -85,4 +89,37 @@ class NetworkErrorTest {
             .untilAsserted(() -> verify(listener).event(EVENT));
     }
 
+    @Test
+    void dispatchGroupEventsDuringRabbitMQOutageShouldThrowErrorByDefault() {
+        eventBus.register(newListener(), GROUP_A);
+
+        rabbitMQExtension.getRabbitMQ().pause();
+
+        assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
+            .cause()
+            .isInstanceOf(NoSuchElementException.class)
+            .hasMessageContaining("Timeout waiting for idle object");
+        assertThat(eventDeadLetters.containEvents().block()).isTrue();
+    }
+
+    @Test
+    void 
dispatchGroupEventsDuringRabbitMQOutageShouldNotThrowErrorWhenDisablePropagateDispatchError()
 throws Exception {
+        RabbitMQConfiguration disablePropagateDispatchError = 
rabbitMQExtension.getRabbitMQ().getConfigurationBuilder()
+            .eventBusPropagateDispatchError(false)
+            .build();
+        eventBus = new RabbitMQEventBus(new NamingStrategy(new 
EventBusName("test")), rabbitMQExtension.getSender(), 
rabbitMQExtension.getReceiverProvider(),
+            new TestEventSerializer(), RETRY_BACKOFF_CONFIGURATION, 
RoutingKeyConverter.forFactories(new 
EventBusTestFixture.TestRegistrationKeyFactory()),
+            eventDeadLetters, new RecordingMetricFactory(), 
rabbitMQExtension.getRabbitChannelPool(),
+            EventBusId.random(), disablePropagateDispatchError);
+        eventBus.start();
+
+        assertThat(eventDeadLetters.containEvents().block()).isFalse();
+        eventBus.register(newListener(), GROUP_A);
+
+        rabbitMQExtension.getRabbitMQ().pause();
+
+        assertThatCode(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
+            .doesNotThrowAnyException();
+        assertThat(eventDeadLetters.containEvents().block()).isTrue();
+    }
 }
diff --git 
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc 
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
index b4b719c893..f0871e0d5d 100644
--- 
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
+++ 
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
@@ -108,6 +108,10 @@ A coma separated list of hosts, example: 
hosts=ip1:5672,ip2:5672
 | event.bus.notification.durability.enabled
 | Whether or not the queue backing notifications should be durable. Optional 
boolean, defaults to true.
 
+| event.bus.propagate.dispatch.error
+| Whether to propagate errors back to the callers when eventbus fails to 
dispatch group events to RabbitMQ (then store the failed events in the event 
dead letters).
+Optional boolean, defaults to true.
+
 | vhost
 | Optional string. This parameter is only a workaround to support invalid URIs 
containing character like '_'.
 You still need to specify the vhost in the uri parameter.
diff --git a/src/site/xdoc/server/config-rabbitmq.xml 
b/src/site/xdoc/server/config-rabbitmq.xml
index e6ba167d5f..b51689e8db 100644
--- a/src/site/xdoc/server/config-rabbitmq.xml
+++ b/src/site/xdoc/server/config-rabbitmq.xml
@@ -148,6 +148,10 @@
           <dt><strong>event.bus.notification.durability.enabled</strong></dt>
           <dd>Whether or not the queue backing notifications should be 
durable. Optional boolean, defaults to true.</dd>
 
+          <dt><strong>event.bus.propagate.dispatch.error</strong></dt>
+          <dd>Whether to propagate errors back to the callers when eventbus 
fails to dispatch group events to RabbitMQ (then store the failed events in the 
event dead letters).
+              Optional boolean, defaults to true.</dd>
+
           <dt><strong>vhost</strong></dt>
           <dd>Optional string.
               This parameter is only a workaround to support invalid URIs 
containing character like '_'.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to