This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c23267ac9b8021ee000768eb309c7f64c95d7a8f Author: duc91 <[email protected]> AuthorDate: Tue Jul 14 15:01:16 2020 +0700 JAMES-3308: add test in RabbitMQTerminationSubscriberTest for deserialization error handling --- .../distributed/RabbitMQTerminationSubscriber.java | 6 +-- .../RabbitMQTerminationSubscriberTest.java | 54 +++++++++++++++++++++- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 2c1812f..46144d7 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -56,9 +56,9 @@ import reactor.rabbitmq.Sender; public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Startable, Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQTerminationSubscriber.class); - private static final String EXCHANGE_NAME = "terminationSubscriberExchange"; - private static final String QUEUE_NAME_PREFIX = "terminationSubscriber"; - private static final String ROUTING_KEY = "terminationSubscriberRoutingKey"; + static final String EXCHANGE_NAME = "terminationSubscriberExchange"; + static final String QUEUE_NAME_PREFIX = "terminationSubscriber"; + static final String ROUTING_KEY = "terminationSubscriberRoutingKey"; private final JsonEventSerializer serializer; private final Sender sender; diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java index 77cfeb7..3da35db 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriberTest.java @@ -20,12 +20,19 @@ package org.apache.james.task.eventsourcing.distributed; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.EXCHANGE_NAME; +import static org.apache.james.task.eventsourcing.distributed.RabbitMQTerminationSubscriber.ROUTING_KEY; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.awaitility.Duration.ONE_MINUTE; +import static org.awaitility.Duration.TEN_SECONDS; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.stream.IntStream; import org.apache.james.backends.rabbitmq.RabbitMQExtension; import org.apache.james.eventsourcing.Event; @@ -36,11 +43,14 @@ import org.apache.james.json.DTOConverter; import org.apache.james.server.task.json.JsonTaskSerializer; import org.apache.james.task.eventsourcing.TerminationSubscriber; import org.apache.james.task.eventsourcing.TerminationSubscriberContract; +import org.assertj.core.api.SoftAssertions; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.OutboundMessage; class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract { private static final JsonTaskSerializer TASK_SERIALIZER = JsonTaskSerializer.of(); @@ -52,7 +62,8 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract @Override public TerminationSubscriber subscriber() { - RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), SERIALIZER); + RabbitMQTerminationSubscriber subscriber = new RabbitMQTerminationSubscriber(rabbitMQExtension.getSender(), + rabbitMQExtension.getReceiverProvider(), SERIALIZER); subscriber.start(); return subscriber; } @@ -77,4 +88,45 @@ class RabbitMQTerminationSubscriberTest implements TerminationSubscriberContract assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT); assertThat(receivedEventsSecond).containsExactly(COMPLETED_EVENT); } + + @Test + void eventProcessingShouldNotCrashOnInvalidMessage() { + TerminationSubscriber subscriber1 = subscriber(); + Flux<Event> firstListener = Flux.from(subscriber1.listenEvents()); + + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, + ROUTING_KEY, + "BAD_PAYLOAD!".getBytes(UTF_8)))) + .block(); + + sendEvents(subscriber1, COMPLETED_EVENT); + + List<Event> receivedEventsFirst = new ArrayList<>(); + firstListener.subscribe(receivedEventsFirst::add); + + await().timeout(TEN_SECONDS).untilAsserted(() -> assertThat(receivedEventsFirst).hasSize(1)); + } + + @Test + void eventProcessingShouldNotCrashOnInvalidMessages() { + TerminationSubscriber subscriber1 = subscriber(); + Flux<Event> firstListener = Flux.from(subscriber1.listenEvents()); + + IntStream.range(0, 10).forEach(i -> rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, + ROUTING_KEY, + "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8)))) + .block()); + + sendEvents(subscriber1, COMPLETED_EVENT); + + List<Event> receivedEventsFirst = new ArrayList<>(); + firstListener.subscribe(receivedEventsFirst::add); + + await().atMost(ONE_MINUTE).untilAsserted(() -> + SoftAssertions.assertSoftly(soft -> { + assertThat(receivedEventsFirst).containsExactly(COMPLETED_EVENT); + })); + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
