This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c74f8833dae8a746939ee94ec46f5d5fca7abeac Author: Rene Cordier <[email protected]> AuthorDate: Fri Jul 10 11:01:02 2020 +0700 JAMES-3305 Dont crash EventBus processing upon invalid messages --- .../james/mailbox/events/GroupRegistration.java | 23 ++++++++++--- .../james/mailbox/events/RabbitMQEventBusTest.java | 40 ++++++++++++++++++++++ 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index cade1c8..d95e6bd 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -24,6 +24,7 @@ import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.backends.rabbitmq.Constants.NO_ARGUMENTS; +import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT; import static org.apache.james.mailbox.events.RabbitMQEventBus.MAILBOX_EVENT_EXCHANGE_NAME; @@ -35,6 +36,8 @@ import java.util.function.Predicate; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.event.json.EventSerializer; import org.apache.james.util.MDCBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; @@ -70,6 +73,7 @@ class GroupRegistration implements Registration { } } + private static final Logger LOGGER = LoggerFactory.getLogger(GroupRegistration.class); static final String RETRY_COUNT = "retry-count"; static final int DEFAULT_RETRY_COUNT = 0; @@ -138,13 +142,22 @@ class GroupRegistration implements Registration { private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { byte[] eventAsBytes = acknowledgableDelivery.getBody(); - Event event = eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get(); int currentRetryCount = getRetryCount(acknowledgableDelivery); - return delayGenerator.delayIfHaveTo(currentRetryCount) - .flatMap(any -> runListener(event)) - .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) - .then(Mono.fromRunnable(acknowledgableDelivery::ack)); + return deserializeEvent(eventAsBytes) + .flatMap(event -> delayGenerator.delayIfHaveTo(currentRetryCount) + .flatMap(any -> runListener(event)) + .onErrorResume(throwable -> retryHandler.handleRetry(event, currentRetryCount, throwable)) + .then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack))) + .onErrorResume(e -> { + LOGGER.error("Unable to process delivery for group {}", group, e); + return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE)); + }); + } + + private Mono<Event> deserializeEvent(byte[] eventAsBytes) { + return Mono.fromCallable(() -> eventSerializer.fromJson(new String(eventAsBytes, StandardCharsets.UTF_8)).get()) + .subscribeOn(Schedulers.parallel()); } Mono<Void> reDeliver(Event event) { diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 9641df3..810187c 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -53,6 +53,7 @@ import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.james.backends.rabbitmq.RabbitMQExtension; @@ -84,6 +85,7 @@ import com.google.common.collect.ImmutableSet; import reactor.core.publisher.Mono; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.OutboundMessage; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; @@ -172,6 +174,44 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } @Test + void eventProcessingShouldNotCrashOnInvalidMessage() { + EventCollector listener = new EventCollector(); + EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA(); + eventBus.register(listener, registeredGroup); + + String emptyRoutingKey = ""; + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, + emptyRoutingKey, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block(); + + eventBus.dispatch(EVENT, NO_KEYS).block(); + await() + .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() -> + assertThat(listener.getEvents()).containsOnly(EVENT)); + } + + @Test + void eventProcessingShouldNotCrashOnInvalidMessages() { + EventCollector listener = new EventCollector(); + EventBusTestFixture.GroupA registeredGroup = new EventBusTestFixture.GroupA(); + eventBus.register(listener, registeredGroup); + + String emptyRoutingKey = ""; + IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(MAILBOX_EVENT_EXCHANGE_NAME, + emptyRoutingKey, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block()); + + eventBus.dispatch(EVENT, NO_KEYS).block(); + await() + .timeout(org.awaitility.Duration.TEN_SECONDS).untilAsserted(() -> + assertThat(listener.getEvents()).containsOnly(EVENT)); + } + + @Test void deserializeEventCollectorGroup() throws Exception { assertThat(Group.deserialize("org.apache.james.mailbox.util.EventCollector$EventCollectorGroup")) .isEqualTo(new EventCollector.EventCollectorGroup()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
