This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 930a2e3bdd321b889b91e49a37e937f3a5827a21 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 7 13:42:44 2020 +0700 JAMES-3172 Rely on Reactor for task execution --- .../apache/james/task/SerialTaskManagerWorker.java | 60 ++++++++++++++-------- .../james/task/SerialTaskManagerWorkerTest.java | 7 +-- 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 8341a16..5438eff 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -20,16 +20,14 @@ package org.apache.james.task; import static org.apache.james.util.ReactorUtils.publishIfPresent; -import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; import org.apache.james.util.MDCBuilder; import org.apache.james.util.concurrent.NamedThreadFactory; @@ -42,22 +40,25 @@ import com.google.common.collect.Sets; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; public class SerialTaskManagerWorker implements TaskManagerWorker { - private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class); - private final ExecutorService taskExecutor; + public static final boolean MAY_INTERRUPT_IF_RUNNING = true; + + private final Scheduler taskExecutor; private final Listener listener; - private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask; + private final AtomicReference<Tuple2<TaskId, CompletableFuture>> runningTask; private final Set<TaskId> cancelledTasks; private final Duration pollingInterval; public SerialTaskManagerWorker(Listener listener, Duration pollingInterval) { this.pollingInterval = pollingInterval; - this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")); + this.taskExecutor = Schedulers.fromExecutor( + Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"))); this.listener = listener; this.cancelledTasks = Sets.newConcurrentHashSet(); this.runningTask = new AtomicReference<>(); @@ -66,7 +67,8 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { @Override public Mono<Task.Result> executeTask(TaskWithId taskWithId) { if (!cancelledTasks.remove(taskWithId.getId())) { - CompletableFuture<Task.Result> future = CompletableFuture.supplyAsync(() -> runWithMdc(taskWithId, listener), taskExecutor); + Mono<Task.Result> taskMono = Mono.fromCallable(() -> runWithMdc(taskWithId, listener)).subscribeOn(taskExecutor); + CompletableFuture<Task.Result> future = taskMono.toFuture(); runningTask.set(Tuples.of(taskWithId.getId(), future)); return Mono.using( @@ -109,22 +111,38 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) { return Mono.from(listener.started(taskWithId.getId())) - .then(Mono.fromCallable(() -> runTask(taskWithId, listener))) - .onErrorResume(InterruptedException.class, e -> Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL)) + .then(runTask(taskWithId, listener)) + .onErrorResume(this::isCausedByInterruptedException, e -> cancelled(taskWithId, listener)) .onErrorResume(Exception.class, e -> { LOGGER.error("Error while running task {}", taskWithId.getId(), e); return Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL); }); } - private Task.Result runTask(TaskWithId taskWithId, Listener listener) throws InterruptedException { - return taskWithId.getTask() - .run() - .onComplete(result -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block()) - .onFailure(() -> { - LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); - Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details())).block(); - }); + private boolean isCausedByInterruptedException(Throwable e) { + if (e instanceof InterruptedException) { + return true; + } + return Stream.iterate(e, t -> t.getCause() != null, Throwable::getCause) + .anyMatch(t -> t instanceof InterruptedException); + } + + private Mono<Task.Result> cancelled(TaskWithId taskWithId, Listener listener) { + TaskId id = taskWithId.getId(); + Optional<TaskExecutionDetails.AdditionalInformation> details = taskWithId.getTask().details(); + + return Mono.from(listener.cancelled(id, details)) + .thenReturn(Task.Result.PARTIAL); + } + + private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener listener) { + return Mono.fromCallable(() -> taskWithId.getTask().run()) + .doOnNext(result -> result + .onComplete(any -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block()) + .onFailure(() -> { + LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); + Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details())).block(); + })); } @Override @@ -132,7 +150,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { cancelledTasks.add(taskId); Optional.ofNullable(runningTask.get()) .filter(task -> task.getT1().equals(taskId)) - .ifPresent(task -> task.getT2().cancel(true)); + .ifPresent(task -> task.getT2().cancel(MAY_INTERRUPT_IF_RUNNING)); } @Override @@ -141,7 +159,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { } @Override - public void close() throws IOException { - taskExecutor.shutdownNow(); + public void close() { + taskExecutor.dispose(); } } diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 0065546..ed17928 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; @@ -69,7 +67,7 @@ class SerialTaskManagerWorkerTest { } @AfterEach - void tearDown() throws IOException { + void tearDown() { worker.close(); } @@ -101,7 +99,7 @@ class SerialTaskManagerWorkerTest { } @Test - void aRunningTaskShouldHaveAFiniteNumberOfInformation() throws InterruptedException { + void aRunningTaskShouldHaveAFiniteNumberOfInformation() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> Mono.fromCallable(counter::incrementAndGet) .delayElement(Duration.ofSeconds(1)) @@ -170,7 +168,6 @@ class SerialTaskManagerWorkerTest { latch.countDown(); } - @Disabled("JAMES-3172 We cannot cancel computation started by Reactor") @Test void taskExecutingReactivelyShouldStopExecutionUponCancel() throws InterruptedException { // Provide a task ticking every 100ms in a separate reactor thread --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
