This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 69744b46a1cbaab6d990243ed6c2d8e583a43aeb Author: Benoit Tellier <[email protected]> AuthorDate: Fri May 31 14:33:15 2019 +0700 JAMES-2334 Awaiting a cancelled task should be supported --- .../java/org/apache/james/task/MemoryTaskManager.java | 12 +++++++++--- .../org/apache/james/task/MemoryTaskManagerWorker.java | 15 +++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 566ce32..7143b1e 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -33,9 +33,9 @@ import java.util.function.Predicate; import javax.annotation.PreDestroy; -import com.github.fge.lambdas.Throwing; import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.WorkQueueProcessor; @@ -130,7 +130,7 @@ public class MemoryTaskManager implements TaskManager { public void cancel(TaskId id) { Optional.ofNullable(idToExecutionDetails.get(id)).ifPresent(details -> { if (details.getStatus().equals(Status.WAITING)) { - updateDetails(id).accept(currentDetails -> currentDetails.cancelRequested()); + updateDetails(id).accept(TaskExecutionDetails::cancelRequested); } worker.cancelTask(id, updateDetails(id)); } @@ -144,7 +144,13 @@ public class MemoryTaskManager implements TaskManager { .filter(ignore -> tasksResult.get(id) != null) .map(ignore -> { Optional.ofNullable(tasksResult.get(id)) - .ifPresent(Throwing.<Mono<Task.Result>>consumer(Mono::block).orDoNothing()); + .ifPresent(mono -> { + try { + mono.block(); + } catch (CancellationException e) { + // ignore + } + }); return getExecutionDetails(id); }) .take(1) diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java index 5d6226d..9890095 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManagerWorker.java @@ -37,8 +37,9 @@ import reactor.core.publisher.Mono; public class MemoryTaskManagerWorker implements TaskManagerWorker { private static final boolean INTERRUPT_IF_RUNNING = true; private static final Logger LOGGER = LoggerFactory.getLogger(MemoryTaskManagerWorker.class); - public static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100); - public static final int FIRST = 1; + private static final Duration CHECK_CANCELED_PERIOD = Duration.ofMillis(100); + private static final int FIRST = 1; + private final ConcurrentHashMap<TaskId, CompletableFuture<Task.Result>> idToFuture = new ConcurrentHashMap<>(); @Override @@ -47,7 +48,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { idToFuture.put(taskWithId.getId(), futureResult); - Mono<Task.Result> result = Mono.<Task.Result>fromFuture(futureResult) + return Mono.fromFuture(futureResult) .doOnError(res -> { if (!(res instanceof CancellationException)) { failed(updateDetails, @@ -55,8 +56,6 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { } }) .doOnTerminate(() -> idToFuture.remove(taskWithId.getId())); - - return result; } private Task.Result runWithMdc(TaskWithId taskWithId, Consumer<TaskExecutionDetailsUpdater> updateDetails) { @@ -78,7 +77,7 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { .onFailure(() -> failed(updateDetails, (logger, details) -> logger.error("Task was partially performed. Check logs for more details" + details.getTaskId()))); } catch (Exception e) { failed(updateDetails, - (logger, executionDetails) -> logger.error("Error while running task", executionDetails, e)); + (logger, executionDetails) -> logger.error("Error while running task {}", executionDetails, e)); return Task.Result.PARTIAL; } } @@ -88,8 +87,8 @@ public class MemoryTaskManagerWorker implements TaskManagerWorker { Optional.ofNullable(idToFuture.remove(id)) .ifPresent(future -> { requestCancellation(updateDetails, future); - waitUntilFutureIsCancelled(future) - .subscribe(cancellationSuccessful -> effectivelyCancelled(updateDetails)); + waitUntilFutureIsCancelled(future).blockFirst(); + effectivelyCancelled(updateDetails); }); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
