This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9cdeb21b77bcb7d672548c7f2691d96ecf90df7d
Author: duc91 <[email protected]>
AuthorDate: Thu Jul 16 11:19:38 2020 +0700

    JAMES-3008: add test for DistributedTaskManagerTest
---
 .../distributed/DistributedTaskManagerTest.java    | 75 ++++++++++++++++++++++
 1 file changed, 75 insertions(+)

diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 37e4b7d..233dd9a 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME;
@@ -102,6 +103,8 @@ import reactor.rabbitmq.Sender;
 
 class DistributedTaskManagerTest implements TaskManagerContract {
 
+    private static final byte[] BAD_PAYLOAD = "BAD_PAYLOAD!".getBytes(UTF_8);
+
     static class TrackedRabbitMQWorkQueueSupplier implements WorkQueueSupplier 
{
         private final List<RabbitMQWorkQueue> workQueues;
         private final RabbitMQWorkQueueSupplier supplier;
@@ -218,6 +221,78 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
     }
 
     @Test
+    void badPayloadShouldNotAffectTaskManagerOnCancelTask() throws 
TaskManager.ReachedTimeoutException {
+        TaskManager taskManager = taskManager(HOSTNAME);
+        TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
+            Thread.sleep(250);
+            return Task.Result.COMPLETED;
+        }));
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, 
BAD_PAYLOAD)))
+            .block();
+
+        taskManager.cancel(id);
+        taskManager.await(id, TIMEOUT);
+        assertThat(taskManager.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    @Test
+    void badPayloadsShouldNotAffectTaskManagerOnCancelTask() throws 
TaskManager.ReachedTimeoutException {
+        TaskManager taskManager = taskManager(HOSTNAME);
+        TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
+            Thread.sleep(250);
+            return Task.Result.COMPLETED;
+        }));
+
+        IntStream.range(0, 100)
+            .forEach(i -> rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, 
BAD_PAYLOAD)))
+            .block());
+
+        taskManager.cancel(id);
+        taskManager.await(id, TIMEOUT);
+        assertThat(taskManager.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.CANCELLED);
+    }
+
+    @Test
+    void badPayloadShouldNotAffectTaskManagerOnCompleteTask() throws 
TaskManager.ReachedTimeoutException {
+        TaskManager taskManager = taskManager(HOSTNAME);
+        TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
+            Thread.sleep(250);
+            return Task.Result.COMPLETED;
+        }));
+
+        rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, 
BAD_PAYLOAD)))
+            .block();
+
+        taskManager.await(id, TIMEOUT);
+        assertThat(taskManager.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    void badPayloadsShouldNotAffectTaskManagerOnCompleteTask() throws 
TaskManager.ReachedTimeoutException {
+        TaskManager taskManager = taskManager(HOSTNAME);
+        TaskId id = taskManager.submit(new MemoryReferenceTask(() -> {
+            Thread.sleep(250);
+            return Task.Result.COMPLETED;
+        }));
+
+        IntStream.range(0, 100)
+            .forEach(i -> rabbitMQExtension.getSender()
+            .send(Mono.just(new OutboundMessage(EXCHANGE_NAME, ROUTING_KEY, 
BAD_PAYLOAD)))
+            .block());
+
+        taskManager.await(id, TIMEOUT);
+        assertThat(taskManager.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
     void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents() {
         try (EventSourcingTaskManager taskManager1 = taskManager();
              EventSourcingTaskManager taskManager2 = taskManager(HOSTNAME_2)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to