This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 982398183e JAMES-3825 Cancel tasks upon graceful shutdown - waiting to
the cancelled listener is completed (#1272)
982398183e is described below
commit 982398183e93ef561e71ef5a37615cd663047728
Author: vttran <[email protected]>
AuthorDate: Mon Oct 31 10:14:43 2022 +0700
JAMES-3825 Cancel tasks upon graceful shutdown - waiting to the cancelled
listener is completed (#1272)
---
.../distributed/DistributedTaskManagerTest.java | 32 +++++++++++++++++-----
server/task/task-memory/pom.xml | 1 -
.../apache/james/task/SerialTaskManagerWorker.java | 17 ++++++++++--
.../james/task/SerialTaskManagerWorkerTest.java | 8 +-----
4 files changed, 41 insertions(+), 17 deletions(-)
diff --git
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index 559d6505b3..3d99530eb6 100644
---
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -23,6 +23,7 @@ import static
com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.task.TaskManager.Status.CANCELLED;
import static
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.EXCHANGE_NAME;
import static
org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueue.ROUTING_KEY;
import static org.assertj.core.api.Assertions.assertThat;
@@ -91,6 +92,7 @@ import
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetai
import
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
import
org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
import org.awaitility.Awaitility;
+import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -249,7 +251,7 @@ class DistributedTaskManagerTest implements
TaskManagerContract {
taskManager.cancel(id);
taskManager.await(id, TIMEOUT);
assertThat(taskManager.getExecutionDetails(id).getStatus())
- .isEqualTo(TaskManager.Status.CANCELLED);
+ .isEqualTo(CANCELLED);
}
@Test
@@ -268,7 +270,7 @@ class DistributedTaskManagerTest implements
TaskManagerContract {
taskManager.cancel(id);
taskManager.await(id, TIMEOUT);
assertThat(taskManager.getExecutionDetails(id).getStatus())
- .isEqualTo(TaskManager.Status.CANCELLED);
+ .isEqualTo(CANCELLED);
}
@Test
@@ -382,13 +384,13 @@ class DistributedTaskManagerTest implements
TaskManagerContract {
awaitAtMostTwoSeconds.untilAsserted(() ->
assertThat(taskManager1.getExecutionDetails(id).getStatus())
- .isIn(TaskManager.Status.CANCELLED,
TaskManager.Status.CANCEL_REQUESTED));
+ .isIn(CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
countDownLatch.countDown();
- awaitUntilTaskHasStatus(id, TaskManager.Status.CANCELLED,
taskManager1);
+ awaitUntilTaskHasStatus(id, CANCELLED, taskManager1);
assertThat(taskManager1.getExecutionDetails(id).getStatus())
- .isEqualTo(TaskManager.Status.CANCELLED);
+ .isEqualTo(CANCELLED);
assertThat(taskManager1.getExecutionDetails(id).getCancelRequestedNode())
.contains(remoteTaskManager.getKey());
@@ -463,8 +465,8 @@ class DistributedTaskManagerTest implements
TaskManagerContract {
EventSourcingTaskManager taskManagerRunningFirstTask =
hostNameByTaskManager.inverse().get(nodeRunningFirstTask);
EventSourcingTaskManager otherTaskManager =
hostNameByTaskManager.inverse().get(otherNode);
- TaskId taskToExecuteAfterFirstNodeIsDown =
taskManagerRunningFirstTask.submit(new CompletedTask());
taskManagerRunningFirstTask.close();
+ TaskId taskToExecuteAfterFirstNodeIsDown =
taskManagerRunningFirstTask.submit(new CompletedTask());
awaitAtMostTwoSeconds.untilAsserted(() ->
assertThat(otherTaskManager.getExecutionDetails(taskToExecuteAfterFirstNodeIsDown).getStatus())
@@ -593,7 +595,23 @@ class DistributedTaskManagerTest implements
TaskManagerContract {
awaitAtMostTwoSeconds.untilAsserted(() ->
assertThat(taskManager.getExecutionDetails(taskId).getStatus())
- .isIn(TaskManager.Status.CANCELLED,
TaskManager.Status.CANCEL_REQUESTED));
+ .isIn(CANCELLED, TaskManager.Status.CANCEL_REQUESTED));
+ }
+
+ @Test
+ void inProgressTaskShouldBeCanceledWhenCloseTaskManager() {
+ try (EventSourcingTaskManager taskManager = taskManager()) {
+ TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> {
+ TimeUnit.SECONDS.sleep(5);
+ return Task.Result.COMPLETED;
+ }));
+
+ awaitAtMostTwoSeconds.until(() ->
taskManager.getExecutionDetails(taskId).getStatus(),
Matchers.equalTo(TaskManager.Status.IN_PROGRESS));
+
+ taskManager.close();
+
+
assertThat(taskManager(HOSTNAME_2).getExecutionDetails(taskId).getStatus()).isEqualTo(CANCELLED);
+ }
}
static class CassandraExecutingTask implements Task {
diff --git a/server/task/task-memory/pom.xml b/server/task/task-memory/pom.xml
index 4b49ef6bf1..e36aa63dd8 100644
--- a/server/task/task-memory/pom.xml
+++ b/server/task/task-memory/pom.xml
@@ -72,7 +72,6 @@
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
- <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
diff --git
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 511777fe84..a49b32f8f7 100644
---
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -20,6 +20,7 @@ package org.apache.james.task;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import static org.apache.james.util.ReactorUtils.publishIfPresent;
+import static org.awaitility.Durations.TWO_MINUTES;
import java.time.Duration;
import java.util.Optional;
@@ -34,6 +35,7 @@ import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.awaitility.Awaitility;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -88,7 +90,9 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
private Publisher<Void> handleExecutionError(TaskWithId taskWithId,
Listener listener, Throwable exception) {
if (exception instanceof CancellationException) {
- return listener.cancelled(taskWithId.getId(),
taskWithId.getTask().detailsReactive());
+ return Mono.from(listener.cancelled(taskWithId.getId(),
taskWithId.getTask().detailsReactive()))
+ .then(Mono.fromCallable(() ->
cancelledTasks.remove(taskWithId.getId())))
+ .then();
} else {
return listener.failed(taskWithId.getId(),
taskWithId.getTask().detailsReactive(), exception);
}
@@ -173,7 +177,16 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
@Override
public void close() {
Optional.ofNullable(runningTask.get())
- .ifPresent(task -> cancelTask(task.getT1()));
+ .ifPresent(task -> {
+ if (!task.getT2().isDone() && !task.getT2().isCancelled()) {
+ cancelTask(task.getT1());
+ Awaitility
+ .waitAtMost(TWO_MINUTES)
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> !cancelledTasks.contains(task.getT1()));
+ }
+
+ });
taskExecutor.dispose();
}
}
diff --git
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 35aa27dc91..359fa5e848 100644
---
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -37,7 +37,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -47,7 +46,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class SerialTaskManagerWorkerTest {
- private static final Duration UPDATE_INFORMATION_POLLING_DURATION =
Duration.ofMillis(100);
+ private static final Duration UPDATE_INFORMATION_POLLING_DURATION =
Duration.ofMillis(100);
private TaskManagerWorker.Listener listener;
private SerialTaskManagerWorker worker;
@@ -69,11 +68,6 @@ class SerialTaskManagerWorkerTest {
worker = new SerialTaskManagerWorker(listener,
UPDATE_INFORMATION_POLLING_DURATION);
}
- @AfterEach
- void tearDown() {
- worker.close();
- }
-
@Test
void aSuccessfullTaskShouldCompleteSuccessfully() {
ArgumentCaptor<Publisher<Optional<TaskExecutionDetails.AdditionalInformation>>>
additionalInformationPublisherCapture =
ArgumentCaptor.forClass(Publisher.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]