This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5da39aee831a23a855ba201fc1ed42095b841174 Author: Matthieu Baechler <[email protected]> AuthorDate: Tue Oct 8 15:37:20 2019 +0200 JAMES-2813 replace mocks in RabbitMQWorkQueueTest by a stub --- .../distributed/RabbitMQWorkQueueTest.java | 109 ++++++++++++--------- 1 file changed, 63 insertions(+), 46 deletions(-) diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java index 50950e3..2a109cd 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueTest.java @@ -20,16 +20,17 @@ package org.apache.james.task.eventsourcing.distributed; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.FIVE_HUNDRED_MILLISECONDS; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.IOException; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -39,6 +40,7 @@ import org.apache.james.server.task.json.TestTask; import org.apache.james.server.task.json.dto.TestTaskDTOModules; import org.apache.james.task.CompletedTask; import org.apache.james.task.Task; +import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManagerWorker; import org.apache.james.task.TaskWithId; @@ -46,10 +48,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.mockito.ArgumentCaptor; -import org.mockito.verification.Timeout; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; class RabbitMQWorkQueueTest { private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd"); @@ -65,14 +66,40 @@ class RabbitMQWorkQueueTest { private RabbitMQWorkQueue testee; - private TaskManagerWorker taskManagerWorker; + private ImmediateWorker taskManagerWorker; private JsonTaskSerializer taskSerializer; + private static class ImmediateWorker implements TaskManagerWorker { + + ConcurrentLinkedQueue<TaskWithId> tasks = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue<Task.Result> results = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue<TaskId> failedTasks = new ConcurrentLinkedQueue<>(); + + @Override + public Mono<Task.Result> executeTask(TaskWithId taskWithId) { + tasks.add(taskWithId); + return Mono.fromCallable(() -> taskWithId.getTask().run()) + .doOnNext(result -> results.add(result)) + .subscribeOn(Schedulers.boundedElastic()); + } + + @Override + public void cancelTask(TaskId taskId) { + } + + @Override + public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation,String errorMessage, Throwable reason) { + failedTasks.add(taskId); + } + + @Override + public void close() throws IOException { + } + } + @BeforeEach void setUp() { - taskManagerWorker = mock(TaskManagerWorker.class); - when(taskManagerWorker.executeTask(TASK_WITH_ID)).thenReturn(Mono.just(Task.Result.COMPLETED)); - when(taskManagerWorker.executeTask(TASK_WITH_ID_2)).thenReturn(Mono.just(Task.Result.COMPLETED)); + taskManagerWorker = spy(new ImmediateWorker()); taskSerializer = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE); testee = new RabbitMQWorkQueue(taskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer); testee.start(); @@ -86,46 +113,34 @@ class RabbitMQWorkQueueTest { @Test void workerShouldConsumeSubmittedTask() { testee.submit(TASK_WITH_ID); - - ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class); - verify(taskManagerWorker, timeout(1000)).executeTask(taskWithIdCaptor.capture()); - - TaskWithId actualTaskWithId = taskWithIdCaptor.getValue(); - assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID); - assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type()); + await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> !taskManagerWorker.results.isEmpty()); + assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID); + assertThat(taskManagerWorker.results).containsExactly(Task.Result.COMPLETED); } @Test void workerShouldConsumeTwoSubmittedTask() { testee.submit(TASK_WITH_ID); testee.submit(TASK_WITH_ID_2); - - ArgumentCaptor<TaskWithId> taskWithIdCaptor = ArgumentCaptor.forClass(TaskWithId.class); - verify(taskManagerWorker, new Timeout(1000, times(2))).executeTask(taskWithIdCaptor.capture()); - - TaskWithId actualTaskWithId = taskWithIdCaptor.getAllValues().get(0); - assertThat(actualTaskWithId.getId()).isEqualTo(TASK_ID); - assertThat(actualTaskWithId.getTask().type()).isEqualTo(TASK.type()); - - TaskWithId actualSecondTaskWithId = taskWithIdCaptor.getAllValues().get(1); - assertThat(actualSecondTaskWithId.getId()).isEqualTo(TASK_ID_2); - assertThat(actualSecondTaskWithId.getTask().type()).isEqualTo(TASK2.type()); + await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 2); + assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID, TASK_WITH_ID_2); + assertThat(taskManagerWorker.results).allSatisfy(result -> assertThat(result).isEqualTo(Task.Result.COMPLETED)); } @Test void givenTwoWorkQueuesOnlyTheFirstOneIsConsumingTasks() { testee.submit(TASK_WITH_ID); - TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class); - RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer); - otherWorkQueue.start(); - - IntStream.range(0, 9) - .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2)); + ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); + try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), taskSerializer)) { + otherWorkQueue.start(); - verify(taskManagerWorker, new Timeout(1000, times(10))).executeTask(any()); + IntStream.range(0, 9) + .forEach(ignoredIndex -> testee.submit(TASK_WITH_ID_2)); - verify(otherTaskManagerWorker, new Timeout(1000, times(0))).executeTask(any()); + await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 10); + assertThat(otherTaskManagerWorker.tasks).isEmpty(); + } } @Test @@ -134,20 +149,22 @@ class RabbitMQWorkQueueTest { TaskId taskId = TaskId.fromString("4bf6d081-aa30-11e9-bf6c-2d3b9e84aafd"); TaskWithId taskWithId = new TaskWithId(taskId, task); - TaskManagerWorker otherTaskManagerWorker = mock(TaskManagerWorker.class); + ImmediateWorker otherTaskManagerWorker = new ImmediateWorker(); JsonTaskSerializer otherTaskSerializer = new JsonTaskSerializer(TestTaskDTOModules.TEST_TYPE); - RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer); - //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue. - Thread.sleep(200); - otherWorkQueue.start(); + try (RabbitMQWorkQueue otherWorkQueue = new RabbitMQWorkQueue(otherTaskManagerWorker, rabbitMQExtension.getRabbitConnectionPool(), otherTaskSerializer)) { + //wait to be sur that the first workqueue has subscribed as an exclusive consumer of the RabbitMQ queue. + Thread.sleep(200); + otherWorkQueue.start(); - otherWorkQueue.submit(taskWithId); + otherWorkQueue.submit(taskWithId); - verify(taskManagerWorker, new Timeout(100, times(0))).executeTask(any()); - verify(taskManagerWorker, timeout(100)).fail(eq(taskId), eq(Optional.empty()), any(), any()); + await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.failedTasks.size() == 1); + assertThat(taskManagerWorker.failedTasks).containsExactly(taskWithId.getId()); - testee.submit(TASK_WITH_ID); - verify(taskManagerWorker, timeout(100)).executeTask(any()); + testee.submit(TASK_WITH_ID); + await().atMost(FIVE_HUNDRED_MILLISECONDS).until(() -> taskManagerWorker.results.size() == 1); + assertThat(taskManagerWorker.tasks).containsExactly(TASK_WITH_ID); + } } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
