This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new a07585e19a JAMES-4033 Option to propagate event bus dispatch error
a07585e19a is described below
commit a07585e19a64b82c2b0930873a622304621733b1
Author: Quan Tran <[email protected]>
AuthorDate: Wed May 15 17:01:42 2024 +0700
JAMES-4033 Option to propagate event bus dispatch error
Allowing for example IMAP APPEND command not to fail when RabbitMQ has
issues.
---
.../backends/rabbitmq/RabbitMQConfiguration.java | 25 ++++++++++--
.../james/backends/rabbitmq/DockerRabbitMQ.java | 14 +++++++
.../rabbitmq/RabbitMQConfigurationTest.java | 26 +++++++++++++
.../org/apache/james/events/EventDispatcher.java | 9 ++++-
.../org/apache/james/events/NetworkErrorTest.java | 45 ++++++++++++++++++++--
.../modules/ROOT/pages/configure/rabbitmq.adoc | 4 ++
src/site/xdoc/server/config-rabbitmq.xml | 4 ++
7 files changed, 118 insertions(+), 9 deletions(-)
diff --git
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 7c5071a44d..e99b476ab3 100644
---
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -311,6 +311,7 @@ public class RabbitMQConfiguration {
private static final String QUEUE_DELIVERY_LIMIT =
"quorum.queues.delivery.limit";
private static final String EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED =
"event.bus.notification.durability.enabled";
private static final String EVENT_BUS_PUBLISH_CONFIRM_ENABLED =
"event.bus.publish.confirm.enabled";
+ private static final String EVENT_BUS_PROPAGATE_DISPATCH_ERROR =
"event.bus.propagate.dispatch.error";
private static final String TASK_QUEUE_CONSUMER_TIMEOUT =
"task.queue.consumer.timeout";
private static final String VHOST = "vhost";
@@ -411,6 +412,7 @@ public class RabbitMQConfiguration {
private Optional<Long> queueTTL;
private Optional<Boolean> eventBusPublishConfirmEnabled;
private Optional<Boolean> eventBusNotificationDurabilityEnabled;
+ private Optional<Boolean> eventBusPropagateDispatchError;
private Optional<String> vhost;
private Optional<Duration> taskQueueConsumerTimeout;
@@ -437,6 +439,7 @@ public class RabbitMQConfiguration {
this.eventBusNotificationDurabilityEnabled = Optional.empty();
this.vhost = Optional.empty();
this.taskQueueConsumerTimeout = Optional.empty();
+ this.eventBusPropagateDispatchError = Optional.empty();
}
public Builder maxRetries(int maxRetries) {
@@ -524,6 +527,11 @@ public class RabbitMQConfiguration {
return this;
}
+ public Builder eventBusPropagateDispatchError(Boolean
eventBusPropagateDispatchError) {
+ this.eventBusPropagateDispatchError =
Optional.ofNullable(eventBusPropagateDispatchError);
+ return this;
+ }
+
public Builder useSslManagement(Boolean useSslForManagement) {
this.useSslManagement = Optional.of(useSslForManagement);
return this;
@@ -580,7 +588,8 @@ public class RabbitMQConfiguration {
eventBusPublishConfirmEnabled.orElse(true),
eventBusNotificationDurabilityEnabled.orElse(true),
vhost,
-
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT));
+
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT),
+ eventBusPropagateDispatchError.orElse(true));
}
private List<Host> hostsDefaultingToUri() {
@@ -655,6 +664,7 @@ public class RabbitMQConfiguration {
.queueTTL(queueTTL)
.eventBusNotificationDurabilityEnabled(configuration.getBoolean(EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED,
null))
.eventBusPublishConfirmEnabled(configuration.getBoolean(EVENT_BUS_PUBLISH_CONFIRM_ENABLED,
null))
+
.eventBusPropagateDispatchError(configuration.getBoolean(EVENT_BUS_PROPAGATE_DISPATCH_ERROR,
null))
.vhost(vhost)
.taskQueueConsumerTimeout(taskQueueConsumerTimeout)
.build();
@@ -730,13 +740,14 @@ public class RabbitMQConfiguration {
private final boolean eventBusNotificationDurabilityEnabled;
private final Optional<String> vhost;
private final Duration taskQueueConsumerTimeout;
+ private final boolean eventBusPropagateDispatchError;
private RabbitMQConfiguration(URI uri, URI managementUri,
ManagementCredentials managementCredentials, int maxRetries, int minDelayInMs,
int connectionTimeoutInMs, int
channelRpcTimeoutInMs, int handshakeTimeoutInMs, int shutdownTimeoutInMs,
int networkRecoveryIntervalInMs, Boolean
useSsl, Boolean useSslManagement, SSLConfiguration sslConfiguration,
boolean useQuorumQueues, Optional<Integer>
quorumQueueDeliveryLimit, int quorumQueueReplicationFactor, List<Host> hosts,
Optional<Long> queueTTL,
boolean eventBusPublishConfirmEnabled,
boolean eventBusNotificationDurabilityEnabled,
- Optional<String> vhost, Duration
taskQueueConsumerTimeout) {
+ Optional<String> vhost, Duration
taskQueueConsumerTimeout, boolean eventBusPropagateDispatchError) {
this.uri = uri;
this.managementUri = managementUri;
this.managementCredentials = managementCredentials;
@@ -759,6 +770,7 @@ public class RabbitMQConfiguration {
this.eventBusNotificationDurabilityEnabled =
eventBusNotificationDurabilityEnabled;
this.vhost = vhost;
this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
+ this.eventBusPropagateDispatchError = eventBusPropagateDispatchError;
}
public URI getUri() {
@@ -871,6 +883,10 @@ public class RabbitMQConfiguration {
return quorumQueueDeliveryLimit;
}
+ public boolean eventBusPropagateDispatchError() {
+ return eventBusPropagateDispatchError;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof RabbitMQConfiguration) {
@@ -897,7 +913,8 @@ public class RabbitMQConfiguration {
&& Objects.equals(this.eventBusPublishConfirmEnabled,
that.eventBusPublishConfirmEnabled)
&& Objects.equals(this.eventBusNotificationDurabilityEnabled,
that.eventBusNotificationDurabilityEnabled)
&& Objects.equals(this.vhost, that.vhost)
- && Objects.equals(this.taskQueueConsumerTimeout,
that.taskQueueConsumerTimeout);
+ && Objects.equals(this.taskQueueConsumerTimeout,
that.taskQueueConsumerTimeout)
+ && Objects.equals(this.eventBusPropagateDispatchError,
that.eventBusPropagateDispatchError);
}
return false;
}
@@ -906,6 +923,6 @@ public class RabbitMQConfiguration {
public final int hashCode() {
return Objects.hash(uri, managementUri, maxRetries, minDelayInMs,
connectionTimeoutInMs, quorumQueueReplicationFactor, quorumQueueDeliveryLimit,
useQuorumQueues, hosts,
channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs,
networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement,
- sslConfiguration, queueTTL, eventBusPublishConfirmEnabled,
eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout);
+ sslConfiguration, queueTTL, eventBusPublishConfirmEnabled,
eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout,
eventBusPropagateDispatchError);
}
}
diff --git
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
index da67056664..3c4557fa3c 100644
---
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
+++
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/DockerRabbitMQ.java
@@ -272,6 +272,20 @@ public class DockerRabbitMQ {
.build();
}
+ public RabbitMQConfiguration.Builder getConfigurationBuilder() throws
URISyntaxException {
+ return RabbitMQConfiguration.builder()
+ .amqpUri(amqpUri())
+ .managementUri(managementUri())
+ .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
+ .maxRetries(MAX_THREE_RETRIES)
+ .minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
+
.connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+
.channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+ .shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
+
.networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND);
+ }
+
public RabbitMQConfiguration withQuorumQueueConfiguration() throws
URISyntaxException {
return RabbitMQConfiguration.builder()
.amqpUri(amqpUri())
diff --git
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
index 0acc56564e..d18f34c84e 100644
---
a/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
+++
b/backends-common/rabbitmq/src/test/java/org/apache/james/backends/rabbitmq/RabbitMQConfigurationTest.java
@@ -572,6 +572,32 @@ class RabbitMQConfigurationTest {
.isEqualTo(Optional.of("vhosttest"));
}
+ @Test
+ void eventBusPropagateDispatchErrorShouldBeTrueByDefault() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri",
"amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri",
"http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password",
DEFAULT_PASSWORD_STRING);
+
+
assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
+ .isTrue();
+ }
+
+ @Test
+ void eventBusPropagateDispatchErrorShouldBeDisabledWhenConfiguredFalse() {
+ PropertiesConfiguration configuration = new PropertiesConfiguration();
+ configuration.addProperty("uri",
"amqp://james:james@rabbitmqhost:5672");
+ configuration.addProperty("management.uri",
"http://james:james@rabbitmqhost:15672/api/");
+ configuration.addProperty("management.user", DEFAULT_USER);
+ configuration.addProperty("management.password",
DEFAULT_PASSWORD_STRING);
+
+ configuration.addProperty("event.bus.propagate.dispatch.error",
"false");
+
+
assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
+ .isFalse();
+ }
+
@Nested
class ManagementCredentialsTest {
@Test
diff --git
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 3282389445..365e6001f3 100644
---
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -168,7 +168,14 @@ public class EventDispatcher {
event.getEventId().getId(),
ex))
.onErrorResume(ex -> deadLetters.store(dispatchingFailureGroup,
event)
- .then(Mono.error(ex)));
+ .then(propagateErrorIfNeeded(ex)));
+ }
+
+ private Mono<Void> propagateErrorIfNeeded(Throwable throwable) {
+ if (configuration.eventBusPropagateDispatchError()) {
+ return Mono.error(throwable);
+ }
+ return Mono.empty();
}
private Mono<Void> remoteKeysDispatch(byte[] serializedEvent,
Set<RegistrationKey> keys) {
diff --git
a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
index f45f018fa1..5b9e83cbb1 100644
---
a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
+++
b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java
@@ -24,11 +24,14 @@ import static
org.apache.james.events.EventBusTestFixture.GROUP_A;
import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
import static
org.apache.james.events.EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION;
import static org.apache.james.events.EventBusTestFixture.newListener;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.verify;
import java.util.NoSuchElementException;
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.events.EventBusTestFixture.TestEventSerializer;
@@ -44,18 +47,18 @@ class NetworkErrorTest {
.isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
private RabbitMQEventBus eventBus;
+ private EventDeadLetters eventDeadLetters;
@BeforeEach
void setUp() throws Exception {
- MemoryEventDeadLetters memoryEventDeadLetters = new
MemoryEventDeadLetters();
-
+ eventDeadLetters = new MemoryEventDeadLetters();
EventSerializer eventSerializer = new TestEventSerializer();
RoutingKeyConverter routingKeyConverter =
RoutingKeyConverter.forFactories(new
EventBusTestFixture.TestRegistrationKeyFactory());
eventBus = new RabbitMQEventBus(new NamingStrategy(new
EventBusName("test")), rabbitMQExtension.getSender(),
rabbitMQExtension.getReceiverProvider(),
eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
- memoryEventDeadLetters, new RecordingMetricFactory(),
rabbitMQExtension.getRabbitChannelPool(),
+ eventDeadLetters, new RecordingMetricFactory(),
rabbitMQExtension.getRabbitChannelPool(),
EventBusId.random(),
rabbitMQExtension.getRabbitMQ().getConfiguration());
eventBus.start();
@@ -63,6 +66,7 @@ class NetworkErrorTest {
@AfterEach
void tearDown() {
+ rabbitMQExtension.getRabbitMQ().unpause();
eventBus.stop();
}
@@ -74,7 +78,7 @@ class NetworkErrorTest {
rabbitMQExtension.getRabbitMQ().pause();
assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
- .getCause()
+ .cause()
.isInstanceOf(NoSuchElementException.class)
.hasMessageContaining("Timeout waiting for idle object");
@@ -85,4 +89,37 @@ class NetworkErrorTest {
.untilAsserted(() -> verify(listener).event(EVENT));
}
+ @Test
+ void dispatchGroupEventsDuringRabbitMQOutageShouldThrowErrorByDefault() {
+ eventBus.register(newListener(), GROUP_A);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+
+ assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
+ .cause()
+ .isInstanceOf(NoSuchElementException.class)
+ .hasMessageContaining("Timeout waiting for idle object");
+ assertThat(eventDeadLetters.containEvents().block()).isTrue();
+ }
+
+ @Test
+ void
dispatchGroupEventsDuringRabbitMQOutageShouldNotThrowErrorWhenDisablePropagateDispatchError()
throws Exception {
+ RabbitMQConfiguration disablePropagateDispatchError =
rabbitMQExtension.getRabbitMQ().getConfigurationBuilder()
+ .eventBusPropagateDispatchError(false)
+ .build();
+ eventBus = new RabbitMQEventBus(new NamingStrategy(new
EventBusName("test")), rabbitMQExtension.getSender(),
rabbitMQExtension.getReceiverProvider(),
+ new TestEventSerializer(), RETRY_BACKOFF_CONFIGURATION,
RoutingKeyConverter.forFactories(new
EventBusTestFixture.TestRegistrationKeyFactory()),
+ eventDeadLetters, new RecordingMetricFactory(),
rabbitMQExtension.getRabbitChannelPool(),
+ EventBusId.random(), disablePropagateDispatchError);
+ eventBus.start();
+
+ assertThat(eventDeadLetters.containEvents().block()).isFalse();
+ eventBus.register(newListener(), GROUP_A);
+
+ rabbitMQExtension.getRabbitMQ().pause();
+
+ assertThatCode(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
+ .doesNotThrowAnyException();
+ assertThat(eventDeadLetters.containEvents().block()).isTrue();
+ }
}
diff --git
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
index b4b719c893..f0871e0d5d 100644
---
a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
+++
b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/rabbitmq.adoc
@@ -108,6 +108,10 @@ A coma separated list of hosts, example:
hosts=ip1:5672,ip2:5672
| event.bus.notification.durability.enabled
| Whether or not the queue backing notifications should be durable. Optional
boolean, defaults to true.
+| event.bus.propagate.dispatch.error
+| Whether to propagate errors back to the callers when eventbus fails to
dispatch group events to RabbitMQ (then store the failed events in the event
dead letters).
+Optional boolean, defaults to true.
+
| vhost
| Optional string. This parameter is only a workaround to support invalid URIs
containing character like '_'.
You still need to specify the vhost in the uri parameter.
diff --git a/src/site/xdoc/server/config-rabbitmq.xml
b/src/site/xdoc/server/config-rabbitmq.xml
index e6ba167d5f..b51689e8db 100644
--- a/src/site/xdoc/server/config-rabbitmq.xml
+++ b/src/site/xdoc/server/config-rabbitmq.xml
@@ -148,6 +148,10 @@
<dt><strong>event.bus.notification.durability.enabled</strong></dt>
<dd>Whether or not the queue backing notifications should be
durable. Optional boolean, defaults to true.</dd>
+ <dt><strong>event.bus.propagate.dispatch.error</strong></dt>
+ <dd>Whether to propagate errors back to the callers when eventbus
fails to dispatch group events to RabbitMQ (then store the failed events in the
event dead letters).
+ Optional boolean, defaults to true.</dd>
+
<dt><strong>vhost</strong></dt>
<dd>Optional string.
This parameter is only a workaround to support invalid URIs
containing character like '_'.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]