This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9cdeb21b77bcb7d672548c7f2691d96ecf90df7d Author: duc91 <[email protected]> AuthorDate: Thu Jul 16 11:19:38 2020 +0700 JAMES-3008: add test for DistributedTaskManagerTest --- .../distributed/DistributedTaskManagerTest.java | 75 ++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index 37e4b7d..233dd9a 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -20,6 +20,7 @@ package org.apache.james.task.eventsourcing.distributed; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME; @@ -102,6 +103,8 @@ import reactor.rabbitmq.Sender; class DistributedTaskManagerTest implements TaskManagerContract { + private static final byte[] BAD_PAYLOAD = "BAD_PAYLOAD!".getBytes(UTF_8); + static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier { private final List<RabbitMQWorkQueue> workQueues; private final RabbitMQWorkQueueSupplier supplier; @@ -218,6 +221,78 @@ class DistributedTaskManagerTest implements TaskManagerContract { } @Test + void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException { + TaskManager taskManager = taskManager(HOSTNAME); + TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { + Thread.sleep(250); + return Task.Result.COMPLETED; + })); + + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) + .block(); + + taskManager.cancel(id); + taskManager.await(id, TIMEOUT); + assertThat(taskManager.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.CANCELLED); + } + + @Test + void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws TaskManager.ReachedTimeoutException { + TaskManager taskManager = taskManager(HOSTNAME); + TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { + Thread.sleep(250); + return Task.Result.COMPLETED; + })); + + IntStream.range(0, 100) + .forEach(i -> rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) + .block()); + + taskManager.cancel(id); + taskManager.await(id, TIMEOUT); + assertThat(taskManager.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.CANCELLED); + } + + @Test + void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException { + TaskManager taskManager = taskManager(HOSTNAME); + TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { + Thread.sleep(250); + return Task.Result.COMPLETED; + })); + + rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) + .block(); + + taskManager.await(id, TIMEOUT); + assertThat(taskManager.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws TaskManager.ReachedTimeoutException { + TaskManager taskManager = taskManager(HOSTNAME); + TaskId id = taskManager.submit(new MemoryReferenceTask(() -> { + Thread.sleep(250); + return Task.Result.COMPLETED; + })); + + IntStream.range(0, 100) + .forEach(i -> rabbitMQExtension.getSender() + .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, BAD_PAYLOAD))) + .block()); + + taskManager.await(id, TIMEOUT); + assertThat(taskManager.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() { try (EventSourcingTaskManager taskManager1 = taskManager(); EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
