This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit dc46a4497d0b9b7c1be16b476dafa347c921ff40 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 7 13:31:06 2020 +0700 JAMES-3172 We cannot cancel computation started by Reactor --- .../james/task/SerialTaskManagerWorkerTest.java | 37 +++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 5c4f23d..0065546 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -25,7 +25,6 @@ import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atMost; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @@ -40,8 +39,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; class SerialTaskManagerWorkerTest { @@ -169,6 +170,40 @@ class SerialTaskManagerWorkerTest { latch.countDown(); } + @Disabled("JAMES-3172 We cannot cancel computation started by Reactor") + @Test + void taskExecutingReactivelyShouldStopExecutionUponCancel() throws InterruptedException { + // Provide a task ticking every 100ms in a separate reactor thread + AtomicInteger tickCount = new AtomicInteger(); + int tikIntervalInMs = 100; + MemoryReferenceTask tickTask = new MemoryReferenceTask(() -> Flux.interval(Duration.ofMillis(tikIntervalInMs)) + .flatMap(any -> Mono.fromCallable(() -> { + tickCount.incrementAndGet(); + return Task.Result.COMPLETED; + })) + .reduce(Task::combine) + .thenReturn(Task.Result.COMPLETED) + .block()); + + // Execute the task + TaskId id = TaskId.generateTaskId(); + TaskWithId taskWithId = new TaskWithId(id, tickTask); + Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache(); + resultMono.subscribe(); + Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS) + .untilAsserted(() -> verify(listener, atLeastOnce()).started(id)); + + worker.cancelTask(id); + + Thread.sleep(tikIntervalInMs); + + int tikCountSnapshot1 = tickCount.get(); + Thread.sleep(2 * tikIntervalInMs); + int tikCountSnapshot2 = tickCount.get(); + // If the task had effectively been canceled tikCount should no longer be incremented + assertThat(tikCountSnapshot1).isEqualTo(tikCountSnapshot2); + } + @Test void theWorkerShouldCancelAnInProgressTask() throws InterruptedException { TaskId id = TaskId.generateTaskId(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
