This is an automated email from the ASF dual-hosted git repository.
kao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 692c9972f6 JAMES-3890 Allow parallel execution of safe tasks
692c9972f6 is described below
commit 692c9972f64a0586121f2af604d7e9949374702a
Author: Karsten Otto <[email protected]>
AuthorDate: Tue Feb 21 17:54:01 2023 +0100
JAMES-3890 Allow parallel execution of safe tasks
---
.../james/webadmin/service/ExpireMailboxTask.java | 10 +--
server/task/task-api/pom.xml | 4 +
.../java/org/apache/james/task/AsyncSafeTask.java | 29 +++++++
.../src/main/java/org/apache/james/task/Task.java | 21 ++++-
.../apache/james/task/SerialTaskManagerWorker.java | 68 +++++++++++------
.../james/task/SerialTaskManagerWorkerTest.java | 89 ++++++++++++++++++++++
6 files changed, 189 insertions(+), 32 deletions(-)
diff --git
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
index cc7814f070..8855bf2f7c 100644
---
a/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
+++
b/server/protocols/webadmin/webadmin-mailbox/src/main/java/org/apache/james/webadmin/service/ExpireMailboxTask.java
@@ -26,13 +26,14 @@ import java.util.Optional;
import javax.inject.Inject;
-import org.apache.james.task.Task;
+import org.apache.james.task.AsyncSafeTask;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskType;
import org.apache.james.webadmin.service.ExpireMailboxService.Context;
import org.apache.james.webadmin.service.ExpireMailboxService.RunningOptions;
+import org.reactivestreams.Publisher;
-public class ExpireMailboxTask implements Task {
+public class ExpireMailboxTask implements AsyncSafeTask {
public static final TaskType TASK_TYPE = TaskType.of("ExpireMailboxTask");
public static class AdditionalInformation implements
TaskExecutionDetails.AdditionalInformation {
@@ -108,9 +109,8 @@ public class ExpireMailboxTask implements Task {
}
@Override
- public Result run() {
- return expireMailboxService.expireMailboxes(context, runningOptions,
new Date())
- .block();
+ public Publisher<Result> runAsync() {
+ return expireMailboxService.expireMailboxes(context, runningOptions,
new Date());
}
@Override
diff --git a/server/task/task-api/pom.xml b/server/task/task-api/pom.xml
index 77dfac344b..91a9e990e0 100644
--- a/server/task/task-api/pom.xml
+++ b/server/task/task-api/pom.xml
@@ -47,6 +47,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-scala-extensions_${scala.base}</artifactId>
diff --git
a/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java
b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java
new file mode 100644
index 0000000000..504fc35a99
--- /dev/null
+++
b/server/task/task-api/src/main/java/org/apache/james/task/AsyncSafeTask.java
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.task;
+
+/**
+ * Marker interface for a task that can safely run in parallel with other
tasks.
+ * This means it will not likely interfere with the operations of other tasks,
and be able to handle issues arising
+ * from parallel execution; e.g. if the task lists some messages and then
tries to access them, it must gracefully
+ * handle the situation when they have been deleted in the meantime.
+ */
+public interface AsyncSafeTask extends Task {
+}
diff --git a/server/task/task-api/src/main/java/org/apache/james/task/Task.java
b/server/task/task-api/src/main/java/org/apache/james/task/Task.java
index 69b1db4b4c..bcc0e2cd53 100644
--- a/server/task/task-api/src/main/java/org/apache/james/task/Task.java
+++ b/server/task/task-api/src/main/java/org/apache/james/task/Task.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Stream;
+import org.apache.james.util.ReactorUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,12 +90,26 @@ public interface Task {
}
/**
- * Runs the migration
+ * Runs the actual task as a blocking operation.
+ * Must implement either this or #runAsync().
*
- * @return Return true if fully migrated. Returns false otherwise.
+ * @return complete or partial operation result.
*/
- Result run() throws InterruptedException;
+ default Result run() throws InterruptedException {
+ return Mono.from(runAsync()).block();
+ }
+ /**
+ * Runs the actual task as an asynchronous operation.
+ * Must implement either this or #run().
+ *
+ * @return Publisher supplying complete or partial operation result.
+ */
+ default Publisher<Result> runAsync() {
+ return Mono.fromCallable(this::run)
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER);
+ }
+
TaskType type();
default Optional<TaskExecutionDetails.AdditionalInformation> details() {
diff --git
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index a49b32f8f7..66b114c870 100644
---
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -23,12 +23,13 @@ import static
org.apache.james.util.ReactorUtils.publishIfPresent;
import static org.awaitility.Durations.TWO_MINUTES;
import java.time.Duration;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.james.util.MDCBuilder;
@@ -40,6 +41,7 @@ import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import reactor.core.Disposable;
@@ -47,16 +49,15 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
public class SerialTaskManagerWorker implements TaskManagerWorker {
private static final Logger LOGGER =
LoggerFactory.getLogger(SerialTaskManagerWorker.class);
public static final boolean MAY_INTERRUPT_IF_RUNNING = true;
private final Scheduler taskExecutor;
+ private final Scheduler asyncTaskExecutor;
private final Listener listener;
- private final AtomicReference<Tuple2<TaskId, CompletableFuture>>
runningTask;
+ private final Map<TaskId, CompletableFuture<Task.Result>> runningTasks;
private final Set<TaskId> cancelledTasks;
private final Duration pollingInterval;
@@ -64,30 +65,49 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
this.pollingInterval = pollingInterval;
this.taskExecutor = Schedulers.fromExecutor(
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task
executor")));
+ this.asyncTaskExecutor = Schedulers.fromExecutor(
+ Executors.newCachedThreadPool(NamedThreadFactory.withName("async
task executor")));
this.listener = listener;
this.cancelledTasks = Sets.newConcurrentHashSet();
- this.runningTask = new AtomicReference<>();
+ this.runningTasks = Maps.newConcurrentMap();
}
@Override
public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
if (!cancelledTasks.remove(taskWithId.getId())) {
- Mono<Task.Result> taskMono = runWithMdc(taskWithId,
listener).subscribeOn(taskExecutor);
+ Mono<Task.Result> taskMono = runWithMdc(taskWithId,
listener).subscribeOn(schedulerForTask(taskWithId));
CompletableFuture<Task.Result> future = taskMono.toFuture();
- runningTask.set(Tuples.of(taskWithId.getId(), future));
+ runningTasks.put(taskWithId.getId(), future);
- return Mono.using(
+ Mono<Task.Result> pollingMono = Mono.using(
() -> pollAdditionalInformation(taskWithId).subscribe(),
ignored -> Mono.fromFuture(future)
.onErrorResume(exception ->
Mono.from(handleExecutionError(taskWithId, listener, exception))
.thenReturn(Task.Result.PARTIAL)),
- Disposable::dispose);
+ Disposable::dispose)
+ .doOnTerminate(() -> runningTasks.remove(taskWithId.getId()));
+
+ if (taskWithId.getTask() instanceof AsyncSafeTask) {
+ pollingMono.subscribe();
+ return Mono.empty();
+ } else {
+ return pollingMono;
+ }
} else {
return Mono.from(listener.cancelled(taskWithId.getId(),
taskWithId.getTask().detailsReactive()))
+ .doOnTerminate(() -> runningTasks.remove(taskWithId.getId()))
.then(Mono.empty());
}
}
+ private Scheduler schedulerForTask(TaskWithId taskWithId) {
+ if (taskWithId.getTask() instanceof AsyncSafeTask) {
+ return asyncTaskExecutor;
+ } else {
+ return taskExecutor;
+ }
+ }
+
private Publisher<Void> handleExecutionError(TaskWithId taskWithId,
Listener listener, Throwable exception) {
if (exception instanceof CancellationException) {
return Mono.from(listener.cancelled(taskWithId.getId(),
taskWithId.getTask().detailsReactive()))
@@ -145,7 +165,7 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
}
private Mono<Task.Result> runTask(TaskWithId taskWithId, Listener
listener) {
- return Mono.fromCallable(() -> taskWithId.getTask().run())
+ return Mono.from(taskWithId.getTask().runAsync())
.subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
.doOnNext(result -> result
.onComplete(any ->
Mono.from(listener.completed(taskWithId.getId(), result,
taskWithId.getTask().detailsReactive()))
@@ -164,9 +184,8 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
@Override
public void cancelTask(TaskId taskId) {
cancelledTasks.add(taskId);
- Optional.ofNullable(runningTask.get())
- .filter(task -> task.getT1().equals(taskId))
- .ifPresent(task -> task.getT2().cancel(MAY_INTERRUPT_IF_RUNNING));
+ Optional.ofNullable(runningTasks.get(taskId))
+ .ifPresent(task -> task.cancel(MAY_INTERRUPT_IF_RUNNING));
}
@Override
@@ -176,17 +195,18 @@ public class SerialTaskManagerWorker implements
TaskManagerWorker {
@Override
public void close() {
- Optional.ofNullable(runningTask.get())
- .ifPresent(task -> {
- if (!task.getT2().isDone() && !task.getT2().isCancelled()) {
- cancelTask(task.getT1());
- Awaitility
- .waitAtMost(TWO_MINUTES)
- .pollDelay(Duration.ofMillis(500))
- .until(() -> !cancelledTasks.contains(task.getT1()));
- }
-
- });
+ Set<TaskId> taskIds = runningTasks.entrySet().stream()
+ .filter(entry -> !entry.getValue().isCancelled() &&
!entry.getValue().isDone())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toUnmodifiableSet());
+ if (!taskIds.isEmpty()) {
+ taskIds.forEach(this::cancelTask);
+ Awaitility
+ .waitAtMost(TWO_MINUTES)
+ .pollDelay(Duration.ofMillis(500))
+ .until(() -> !cancelledTasks.containsAll(taskIds));
+ }
taskExecutor.dispose();
+ asyncTaskExecutor.dispose();
}
}
diff --git
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 359fa5e848..eaad5c0764 100644
---
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -247,6 +247,95 @@ class SerialTaskManagerWorkerTest {
.block()).isEmpty();
}
+ @Test
+ void theWorkerShouldCancelAnInProgressAsyncTask() throws
InterruptedException {
+ TaskId id = TaskId.generateTaskId();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Task inProgressTask = new AsyncSafeTask() {
+ @Override
+ public Mono<Result> runAsync() {
+ return Mono.fromCallable(() -> {
+ await(latch);
+ return Task.Result.COMPLETED;
+ });
+ }
+
+ @Override
+ public TaskType type() {
+ return TaskType.of("async memory task");
+ }
+ };
+
+ TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
+
+ Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
+ resultMono.subscribe();
+
+ Awaitility.waitAtMost(TEN_SECONDS)
+ .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
+
+ worker.cancelTask(id);
+
+ resultMono.block(Duration.ofSeconds(10));
+
+ // Due to the use of signals, cancellation cannot be instantaneous
+ // Let a grace period for the cancellation to complete to increase
test stability
+ Thread.sleep(50);
+
+ verify(listener, atLeastOnce()).cancelled(eq(id), any());
+ verifyNoMoreInteractions(listener);
+ }
+
+ @Test
+ void theWorkerShouldRunAsyncTasksInParallel() throws InterruptedException {
+ TaskId id1 = TaskId.generateTaskId();
+ TaskId id2 = TaskId.generateTaskId();
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch task1Started = new CountDownLatch(1);
+ CountDownLatch task2Started = new CountDownLatch(1);
+
+ Task inProgressTask1 = new AsyncSafeTask() {
+ @Override
+ public Mono<Result> runAsync() {
+ return Mono.fromCallable(() -> {
+ task1Started.countDown();
+ await(latch);
+ return Task.Result.COMPLETED;
+ });
+ }
+
+ @Override
+ public TaskType type() {
+ return TaskType.of("async memory task");
+ }
+ };
+
+ Task inProgressTask2 = new AsyncSafeTask() {
+ @Override
+ public Mono<Result> runAsync() {
+ return Mono.fromCallable(() -> {
+ task2Started.countDown();
+ return Task.Result.COMPLETED;
+ });
+ }
+
+ @Override
+ public TaskType type() {
+ return TaskType.of("async memory task");
+ }
+ };
+
+ worker.executeTask(new TaskWithId(id1, inProgressTask1)).subscribe();
+ await(task1Started);
+
+ worker.executeTask(new TaskWithId(id2, inProgressTask2)).subscribe();
+ await(task2Started);
+
+ verify(listener, atLeastOnce()).started(id1);
+ verify(listener, atLeastOnce()).started(id2);
+ latch.countDown();
+ }
private void await(CountDownLatch countDownLatch) throws
InterruptedException {
countDownLatch.await();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]