This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 982398183e JAMES-3825 Cancel tasks upon graceful shutdown - waiting to 
the cancelled listener is completed (#1272)
982398183e is described below

commit 982398183e93ef561e71ef5a37615cd663047728
Author: vttran <[email protected]>
AuthorDate: Mon Oct 31 10:14:43 2022 +0700

    JAMES-3825 Cancel tasks upon graceful shutdown - waiting to the cancelled 
listener is completed (#1272)
---
 .../distributed/DistributedTaskManagerTest.java    | 32 +++++++++++++++++-----
 server/task/task-memory/pom.xml                    |  1 -
 .../apache/james/task/SerialTaskManagerWorker.java | 17 ++++++++++--
 .../james/task/SerialTaskManagerWorkerTest.java    |  8 +-----
 4 files changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 559d6505b3..3d99530eb6 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -23,6 +23,7 @@ import static 
com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.task.TaskManager.Status.CANCELLED;
 import static 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME;
 import static 
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.ROUTING_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -91,6 +92,7 @@ import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetai
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
 import 
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -249,7 +251,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
         taskManager.cancel(id);
         taskManager.await(id, TIMEOUT);
         assertThat(taskManager.getExecutionDetails(id).getStatus())
-            .isEqualTo(TaskManager.Status.CANCELLED);
+            .isEqualTo(CANCELLED);
     }
 
     @Test
@@ -268,7 +270,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
         taskManager.cancel(id);
         taskManager.await(id, TIMEOUT);
         assertThat(taskManager.getExecutionDetails(id).getStatus())
-            .isEqualTo(TaskManager.Status.CANCELLED);
+            .isEqualTo(CANCELLED);
     }
 
     @Test
@@ -382,13 +384,13 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
         awaitAtMostTwoSeconds.untilAsserted(() ->
             assertThat(taskManager1.getExecutionDetails(id).getStatus())
-                .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED));
+                .isIn(CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
 
         countDownLatch.countDown();
 
-        awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED, 
taskManager1);
+        awaitUntilTaskHasStatus(id, CANCELLED, taskManager1);
         assertThat(taskManager1.getExecutionDetails(id).getStatus())
-            .isEqualTo(TaskManager.Status.CANCELLED);
+            .isEqualTo(CANCELLED);
 
         
assertThat(taskManager1.getExecutionDetails(id).getCancelRequestedNode())
             .contains(remoteTaskManager.getKey());
@@ -463,8 +465,8 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             EventSourcingTaskManager taskManagerRunningFirstTask = 
hostNameByTaskManager.inverse().get(nodeRunningFirstTask);
             EventSourcingTaskManager otherTaskManager = 
hostNameByTaskManager.inverse().get(otherNode);
 
-            TaskId taskToExecuteAfterFirstNodeIsDown = 
taskManagerRunningFirstTask.submit(new CompletedTask());
             taskManagerRunningFirstTask.close();
+            TaskId taskToExecuteAfterFirstNodeIsDown = 
taskManagerRunningFirstTask.submit(new CompletedTask());
 
             awaitAtMostTwoSeconds.untilAsserted(() ->
                 
assertThat(otherTaskManager.getExecutionDetails(taskToExecuteAfterFirstNodeIsDown).getStatus())
@@ -593,7 +595,23 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
 
         awaitAtMostTwoSeconds.untilAsserted(() ->
             assertThat(taskManager.getExecutionDetails(taskId).getStatus())
-                .isIn(TaskManager.Status.CANCELLED, 
TaskManager.Status.CANCEL_REQUESTED));
+                .isIn(CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
+    }
+
+    @Test
+    void inProgressTaskShouldBeCanceledWhenCloseTaskManager() {
+        try (EventSourcingTaskManager taskManager = taskManager()) {
+            TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
+                TimeUnit.SECONDS.sleep(5);
+                return Task.Result.COMPLETED;
+            }));
+
+            awaitAtMostTwoSeconds.until(() -> 
taskManager.getExecutionDetails(taskId).getStatus(), 
Matchers.equalTo(TaskManager.Status.IN_PROGRESS));
+
+            taskManager.close();
+
+            
assertThat(taskManager(HOSTNAME_2).getExecutionDetails(taskId).getStatus()).isEqualTo(CANCELLED);
+        }
     }
 
     static class CassandraExecutingTask implements Task {
diff --git a/server/task/task-memory/pom.xml b/server/task/task-memory/pom.xml
index 4b49ef6bf1..e36aa63dd8 100644
--- a/server/task/task-memory/pom.xml
+++ b/server/task/task-memory/pom.xml
@@ -72,7 +72,6 @@
         <dependency>
             <groupId>org.awaitility</groupId>
             <artifactId>awaitility</artifactId>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 511777fe84..a49b32f8f7 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -20,6 +20,7 @@ package org.apache.james.task;
 
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import static org.awaitility.Durations.TWO_MINUTES;
 
 import java.time.Duration;
 import java.util.Optional;
@@ -34,6 +35,7 @@ import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.MDCStructuredLogger;
 import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.awaitility.Awaitility;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,7 +90,9 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
 
     private Publisher<Void> handleExecutionError(TaskWithId taskWithId, 
Listener listener, Throwable exception) {
         if (exception instanceof CancellationException) {
-            return listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().detailsReactive());
+            return Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().detailsReactive()))
+                .then(Mono.fromCallable(() -> 
cancelledTasks.remove(taskWithId.getId())))
+                .then();
         } else {
             return listener.failed(taskWithId.getId(), 
taskWithId.getTask().detailsReactive(), exception);
         }
@@ -173,7 +177,16 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     @Override
     public void close() {
         Optional.ofNullable(runningTask.get())
-            .ifPresent(task -> cancelTask(task.getT1()));
+            .ifPresent(task -> {
+                if (!task.getT2().isDone() && !task.getT2().isCancelled()) {
+                    cancelTask(task.getT1());
+                    Awaitility
+                        .waitAtMost(TWO_MINUTES)
+                        .pollDelay(Duration.ofMillis(500))
+                        .until(() -> !cancelledTasks.contains(task.getT1()));
+                }
+
+            });
         taskExecutor.dispose();
     }
 }
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 35aa27dc91..359fa5e848 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
@@ -47,7 +46,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 class SerialTaskManagerWorkerTest {
-    private  static final Duration UPDATE_INFORMATION_POLLING_DURATION = 
Duration.ofMillis(100);
+    private static final Duration UPDATE_INFORMATION_POLLING_DURATION = 
Duration.ofMillis(100);
 
     private TaskManagerWorker.Listener listener;
     private SerialTaskManagerWorker worker;
@@ -69,11 +68,6 @@ class SerialTaskManagerWorkerTest {
         worker = new SerialTaskManagerWorker(listener, 
UPDATE_INFORMATION_POLLING_DURATION);
     }
 
-    @AfterEach
-    void tearDown() {
-        worker.close();
-    }
-
     @Test
     void aSuccessfullTaskShouldCompleteSuccessfully() {
         
ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>> 
additionalInformationPublisherCapture = 
ArgumentCaptor.forClass(Publisher.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to