This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b847270a3f319d9d60df0e242bda7de9c6f8fe47 Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Fri Aug 30 10:01:01 2019 +0200 JAMES-2813 Add timeout in TaskManager.await --- .../james/adapter/mailbox/ReIndexerManagement.java | 9 +-- .../org/apache/james/util/DurationParserTest.java | 16 ++--- .../apache/james/webadmin/routes/TasksRoutes.java | 58 ++++++++++++++-- .../james/webadmin/routes/TasksRoutesTest.java | 80 +++++++++++++++++++++- .../distributed/DistributedTaskManagerTest.java | 25 ++++++- .../org/apache/james/task/MemoryTaskManager.java | 17 ++--- .../java/org/apache/james/task/TaskManager.java | 6 +- .../eventsourcing/EventSourcingTaskManager.scala | 40 +++++++---- .../org/apache/james/task/TaskManagerContract.java | 36 +++++++--- src/site/markdown/server/manage-webadmin.md | 17 ++++- 10 files changed, 247 insertions(+), 57 deletions(-) diff --git a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java index 075795a..6696d54 100644 --- a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java +++ b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java @@ -21,6 +21,7 @@ package org.apache.james.adapter.mailbox; import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import javax.inject.Inject; import javax.inject.Named; @@ -51,8 +52,8 @@ public class ReIndexerManagement implements ReIndexerManagementMBean { .addContext(MDCBuilder.ACTION, "reIndex") .build()) { TaskId taskId = taskManager.submit(reIndexer.reIndex(new MailboxPath(namespace, user, name))); - taskManager.await(taskId); - } catch (IOException e) { + taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE)); + } catch (IOException | TaskManager.ReachedTimeoutException e) { throw new RuntimeException(e); } } @@ -65,8 +66,8 @@ public class ReIndexerManagement implements ReIndexerManagementMBean { .addContext(MDCBuilder.ACTION, "reIndex") .build()) { TaskId taskId = taskManager.submit(reIndexer.reIndex()); - taskManager.await(taskId); - } catch (IOException e) { + taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE)); + } catch (IOException | TaskManager.ReachedTimeoutException e) { throw new RuntimeException(e); } } diff --git a/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java b/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java index fa997ae..e0b573c 100644 --- a/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java @@ -60,56 +60,56 @@ class DurationParserTest { } @ParameterizedTest - @ValueSource(strings = {"2", "2 ms", "2 msec", "2 msecs", "2 Ms"}) + @ValueSource(strings = {"2ms", "2", "2 ms", "2 msec", "2 msecs", "2 Ms"}) void parseShouldHandleMilliseconds(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(Duration.ofMillis(2)); } @ParameterizedTest - @ValueSource(strings = {"2 s", "2 sec", "2 Sec", "2 second", "2 seconds"}) + @ValueSource(strings = {"2s", "2 s", "2 sec", "2 Sec", "2 second", "2 seconds"}) void parseShouldHandleSeconds(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(Duration.ofSeconds(2)); } @ParameterizedTest - @ValueSource(strings = {"2 m", "2 min", "2 mins", "2 minute", "2 Minute", "2 minutes"}) + @ValueSource(strings = {"2m", "2 m", "2 min", "2 mins", "2 minute", "2 Minute", "2 minutes"}) void parseShouldHandleMinutes(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(Duration.ofMinutes(2)); } @ParameterizedTest - @ValueSource(strings = {"2 h", "2 hour", "2 Hour", "2 hours"}) + @ValueSource(strings = {"2h", "2 h", "2 hour", "2 Hour", "2 hours"}) void parseShouldHandleHours(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(Duration.ofHours(2)); } @ParameterizedTest - @ValueSource(strings = {"2 d", "2 day", "2 Day", "2 days"}) + @ValueSource(strings = {"2d", "2 d", "2 day", "2 Day", "2 days"}) void parseShouldHandleDays(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(Duration.ofDays(2)); } @ParameterizedTest - @ValueSource(strings = {"2 w", "2 week", "2 Week", "2 weeks"}) + @ValueSource(strings = {"2w", "2 w", "2 week", "2 Week", "2 weeks"}) void parseShouldHandleWeeks(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(ChronoUnit.WEEKS.getDuration().multipliedBy(2)); } @ParameterizedTest - @ValueSource(strings = {"2 months", "2 month", "2 Month"}) + @ValueSource(strings = {"2months", "2 months", "2 month", "2 Month"}) void parseShouldHandleMonths(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(ChronoUnit.MONTHS.getDuration().multipliedBy(2)); } @ParameterizedTest - @ValueSource(strings = {"2 y", "2 year", "2 Year", "2 years"}) + @ValueSource(strings = {"2y", "2 y", "2 year", "2 Year", "2 years"}) void parseShouldHandleYears(String input) { assertThat(DurationParser.parse(input)) .isEqualTo(ChronoUnit.YEARS.getDuration().multipliedBy(2)); diff --git a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java index 84f6eff..35db8b8 100644 --- a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java +++ b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java @@ -19,6 +19,8 @@ package org.apache.james.webadmin.routes; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; import java.util.function.Supplier; @@ -33,13 +35,16 @@ import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManager; import org.apache.james.task.TaskNotFoundException; +import org.apache.james.util.DurationParser; import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.dto.ExecutionDetailsDto; import org.apache.james.webadmin.utils.ErrorResponder; import org.apache.james.webadmin.utils.JsonTransformer; import org.apache.james.webadmin.utils.Responses; + import org.eclipse.jetty.http.HttpStatus; +import com.google.common.base.Preconditions; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -54,9 +59,10 @@ import spark.Service; @Path(":task") @Produces("application/json") public class TasksRoutes implements Routes { - public static final String BASE = "/tasks"; + private static final Duration MAXIMUM_AWAIT_TIMEOUT = Duration.ofDays(365); private final TaskManager taskManager; private final JsonTransformer jsonTransformer; + public static final String BASE = "/tasks"; @Inject public TasksRoutes(TaskManager taskManager, JsonTransformer jsonTransformer) { @@ -133,13 +139,15 @@ public class TasksRoutes implements Routes { @ApiOperation(value = "Await, then get a task execution details") @ApiResponses(value = { @ApiResponse(code = HttpStatus.OK_200, message = "A specific class execution details", response = ExecutionDetailsDto.class), - @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "The taskId is invalid"), - @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "The taskId is not found") + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "The taskId is invalid or invalid timeout"), + @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "The taskId is not found"), + @ApiResponse(code = HttpStatus.REQUEST_TIMEOUT_408, message = "Waiting the the task completion has reached the timeout") }) public Object await(Request req, Response response) { TaskId taskId = getTaskId(req); + Duration timeout = getTimeout(req); return respondStatus(taskId, - () -> taskManager.await(getTaskId(req))); + () -> awaitTask(taskId, timeout)); } private Object respondStatus(TaskId taskId, Supplier<TaskExecutionDetails> executionDetailsSupplier) { @@ -181,4 +189,46 @@ public class TasksRoutes implements Routes { .haltError(); } } + + private Duration getTimeout(Request req) { + try { + Optional<String> requestTimeout = Optional.ofNullable(req.queryParams("timeout")) + .filter(parameter -> !parameter.isEmpty()); + + requestTimeout.ifPresent(timeout -> + Preconditions.checkState(!timeout.replaceAll(" ", "").startsWith("-"), "Timeout should be positive")); + + Duration timeout = requestTimeout + .map(rawString -> DurationParser.parse(rawString, ChronoUnit.SECONDS)) + .orElse(MAXIMUM_AWAIT_TIMEOUT); + + Preconditions.checkState(timeout.compareTo(MAXIMUM_AWAIT_TIMEOUT) <= 0, "Timeout should not exceed one year"); + return timeout; + } catch (Exception e) { + throw ErrorResponder.builder() + .statusCode(HttpStatus.BAD_REQUEST_400) + .cause(e) + .type(ErrorResponder.ErrorType.INVALID_ARGUMENT) + .message("Invalid timeout " + e.getMessage()) + .haltError(); + } + } + + private TaskExecutionDetails awaitTask(TaskId taskId, Duration timeout) { + try { + return taskManager.await(taskId, timeout); + } catch (TaskManager.ReachedTimeoutException e) { + throw ErrorResponder.builder() + .statusCode(HttpStatus.REQUEST_TIMEOUT_408) + .type(ErrorResponder.ErrorType.SERVER_ERROR) + .message("The timeout has been reached") + .haltError(); + } catch (TaskNotFoundException e) { + throw ErrorResponder.builder() + .statusCode(HttpStatus.NOT_FOUND_404) + .type(ErrorResponder.ErrorType.SERVER_ERROR) + .message("The taskId is not found") + .haltError(); + } + } } diff --git a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java index bdd8081..f31ec43 100644 --- a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java +++ b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java @@ -22,6 +22,8 @@ package org.apache.james.webadmin.routes; import static io.restassured.RestAssured.given; import static io.restassured.RestAssured.when; import static io.restassured.RestAssured.with; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -199,6 +201,82 @@ class TasksRoutesTest { } @Test + void getAwaitWithTimeoutShouldAwaitTaskCompletion() { + TaskId taskId = taskManager.submit(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Task.Result.COMPLETED; + }); + + given() + .queryParam("timeout", "2s") + .when() + .get("/" + taskId.getValue() + "/await") + .then() + .statusCode(HttpStatus.OK_200) + .body("status", is("completed")); + } + + @Test + void getAwaitWithInvalidTimeoutShouldReturnAnError() { + TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED); + + given() + .queryParam("timeout", "-5") + .when() + .get("/" + taskId.getValue() + "/await") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("message", allOf(containsString("Invalid timeout"), containsString("Timeout should be positive"))); + } + + @Test + void getAwaitWithATooBigTimeoutShouldReturnAnError() { + TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED); + + given() + .queryParam("timeout", "5y") + .when() + .get("/" + taskId.getValue() + "/await") + .then() + .statusCode(HttpStatus.BAD_REQUEST_400) + .body("message", allOf(containsString("Invalid timeout"), containsString("Timeout should not exceed one year"))); + } + + @Test + void getAwaitWithAShorterTimeoutShouldReturnTimeout() { + TaskId taskId = taskManager.submit(() -> { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return Task.Result.COMPLETED; + }); + + given() + .queryParam("timeout", "1") + .when() + .get("/" + taskId.getValue() + "/await") + .then() + .statusCode(HttpStatus.REQUEST_TIMEOUT_408) + .body("message", is("The timeout has been reached")); + } + + @Test + void getAwaitWithANonExistingTaskShouldReturnNotFound() { + String unknownTaskId = TaskId.generateTaskId().asString(); + when() + .get("/" + unknownTaskId + "/await") + .then() + .statusCode(HttpStatus.NOT_FOUND_404) + .body("message", is("The taskId is not found")); + } + + @Test void getAwaitShouldNotFailUponError() { TaskId taskId = taskManager.submit(() -> { throw new RuntimeException(); @@ -333,4 +411,4 @@ class TasksRoutesTest { .body("type", is("InvalidArgument")) .body("message", is("Invalid status query parameter")); } -} \ No newline at end of file +} diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index f61cbc1..a8e43c5 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -123,7 +123,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { Awaitility.await() .atMost(Duration.FIVE_SECONDS) .pollInterval(100L, TimeUnit.MILLISECONDS) - .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED); + .until(() -> taskManager1.await(taskId, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED); TaskExecutionDetails detailsFromTaskManager1 = taskManager1.getExecutionDetails(taskId); TaskExecutionDetails detailsFromTaskManager2 = taskManager2.getExecutionDetails(taskId); @@ -150,7 +150,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { Awaitility.await() .atMost(Duration.ONE_SECOND) .pollInterval(100L, TimeUnit.MILLISECONDS) - .until(() -> taskManager1.await(waitingTaskId).getStatus() == TaskManager.Status.COMPLETED); + .until(() -> taskManager1.await(waitingTaskId, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED); } } @@ -165,7 +165,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { Awaitility.await() .atMost(Duration.ONE_SECOND) .pollInterval(100L, TimeUnit.MILLISECONDS) - .until(() -> taskManager1.await(taskId).getStatus() == TaskManager.Status.COMPLETED); + .until(() -> taskManager1.await(taskId, TIMEOUT).getStatus() == TaskManager.Status.COMPLETED); TaskExecutionDetails executionDetails = taskManager2.getExecutionDetails(taskId); assertThat(executionDetails.getSubmittedNode()).isEqualTo(HOSTNAME_2); @@ -203,6 +203,25 @@ class DistributedTaskManagerTest implements TaskManagerContract { .contains(remoteTaskManager.getKey()); } + @Test + void givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws TaskManager.ReachedTimeoutException { + TaskManager taskManager1 = taskManager(HOSTNAME); + TaskManager taskManager2 = taskManager(HOSTNAME_2); + TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> { + Thread.sleep(250); + return Task.Result.COMPLETED; + })); + + awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, taskManager1); + Hostname runningNode = taskManager1.getExecutionDetails(id).getRanNode().get(); + + TaskManager remoteTaskManager = getOtherTaskManager(runningNode, Pair.of(HOSTNAME, taskManager1), Pair.of(HOSTNAME_2, taskManager2)).getValue(); + + remoteTaskManager.await(id, TIMEOUT); + assertThat(taskManager1.getExecutionDetails(id).getStatus()) + .isEqualTo(TaskManager.Status.COMPLETED); + } + private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname node, Pair<Hostname, TaskManager> taskManager1, Pair<Hostname, TaskManager> taskManager2) { if (node.equals(taskManager1.getKey())) { return taskManager2; diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java index 90e88c0..611cbde 100644 --- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -123,7 +123,7 @@ public class MemoryTaskManager implements TaskManager { return ImmutableList.copyOf(tasksFiltered(status).values()); } - public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) { + private Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) { return idToExecutionDetails.entrySet() .stream() .filter(details -> details.getValue().getStatus().equals(status)) @@ -140,17 +140,14 @@ public class MemoryTaskManager implements TaskManager { } @Override - public TaskExecutionDetails await(TaskId id) { - if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) { + public TaskExecutionDetails await(TaskId id, Duration timeout) throws TaskNotFoundException, ReachedTimeoutException { + try { return Flux.interval(NOW, AWAIT_POLLING_DURATION, Schedulers.elastic()) .map(ignored -> getExecutionDetails(id)) - .filter(details -> details.getStatus() == Status.COMPLETED - || details.getStatus() == Status.FAILED - || details.getStatus() == Status.CANCELLED) - .take(1) - .blockFirst(); - } else { - return null; + .filter(details -> details.getStatus().isFinished()) + .blockFirst(timeout); + } catch (IllegalStateException e) { + throw new ReachedTimeoutException(); } } diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java b/server/task/src/main/java/org/apache/james/task/TaskManager.java index b04f2c2..5aca267 100644 --- a/server/task/src/main/java/org/apache/james/task/TaskManager.java +++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java @@ -19,6 +19,7 @@ package org.apache.james.task; +import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -58,6 +59,9 @@ public interface TaskManager { } } + class ReachedTimeoutException extends Exception { + } + TaskId submit(Task task); TaskExecutionDetails getExecutionDetails(TaskId id); @@ -68,5 +72,5 @@ public interface TaskManager { void cancel(TaskId id); - TaskExecutionDetails await(TaskId id); + TaskExecutionDetails await(TaskId id, Duration timeout) throws TaskNotFoundException, ReachedTimeoutException; } diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 14b2f2a..4e281de 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -19,17 +19,20 @@ package org.apache.james.task.eventsourcing import java.io.Closeable +import java.time.Duration import java.util -import com.google.common.annotations.VisibleForTesting import javax.inject.Inject import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, Subscriber} +import org.apache.james.task.TaskManager.ReachedTimeoutException import org.apache.james.task._ import org.apache.james.task.eventsourcing.TaskCommand._ -import reactor.core.publisher.Flux +import com.google.common.annotations.VisibleForTesting +import reactor.core.publisher.{Flux, Mono} +import reactor.core.scheduler.Schedulers class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( workQueueSupplier: WorkQueueSupplier, @@ -91,19 +94,26 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] eventSourcingSystem.dispatch(command) } - override final def await(id: TaskId): TaskExecutionDetails = { - val details = getExecutionDetails(id) - if (details.getStatus.isFinished) { - details - } else { - Flux.from(terminationSubscriber.listenEvents) - .filter{ - case event: TaskEvent => event.getAggregateId.taskId == id - case _ => false - } - .blockFirst() - - getExecutionDetails(id) + @throws(classOf[TaskNotFoundException]) + @throws(classOf[ReachedTimeoutException]) + override def await(id: TaskId, timeout: Duration): TaskExecutionDetails = { + try { + val details = Mono.fromSupplier[TaskExecutionDetails](() => getExecutionDetails(id)) + .filter(_.getStatus.isFinished) + + val findEvent = Flux.from(terminationSubscriber.listenEvents) + .filter { + case event: TaskEvent => event.getAggregateId.taskId == id + case _ => false + } + .next() + .then(details) + + Flux.merge(findEvent, details) + .subscribeOn(Schedulers.elastic) + .blockFirst(timeout) + } catch { + case _: IllegalStateException => throw new ReachedTimeoutException } } diff --git a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java index 2fa605d..6078752 100644 --- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -47,6 +47,7 @@ public interface TaskManagerContract { .pollDelay(slowPacedPollInterval) .await(); ConditionFactory awaitAtMostFiveSeconds = calmlyAwait.atMost(FIVE_SECONDS); + java.time.Duration TIMEOUT = java.time.Duration.ofMillis(Long.MAX_VALUE); TaskManager taskManager(); @@ -410,16 +411,15 @@ public interface TaskManagerContract { } @Test - default void awaitShouldNotThrowWhenCompletedTask() { + default void awaitShouldNotThrowWhenCompletedTask() throws TaskManager.ReachedTimeoutException { TaskManager taskManager = taskManager(); - TaskId taskId = taskManager.submit( - new CompletedTask()); - taskManager.await(taskId); - taskManager.await(taskId); + TaskId taskId = taskManager.submit(new CompletedTask()); + taskManager.await(taskId, TIMEOUT); + taskManager.await(taskId, TIMEOUT); } @Test - default void awaitShouldAwaitWaitingTask() { + default void awaitShouldAwaitWaitingTask() throws TaskManager.ReachedTimeoutException, InterruptedException { TaskManager taskManager = taskManager(); CountDownLatch latch = new CountDownLatch(1); taskManager.submit(new MemoryReferenceTask( @@ -428,10 +428,28 @@ public interface TaskManagerContract { return Task.Result.COMPLETED; })); latch.countDown(); - TaskId task2 = taskManager.submit( - new CompletedTask()); + TaskId task2 = taskManager.submit(new CompletedTask()); + + assertThat(taskManager.await(task2, TIMEOUT).getStatus()).isEqualTo(TaskManager.Status.COMPLETED); + } + + @Test + default void awaitANonExistingTaskShouldReturnAnUnknownAwaitedTaskExecutionDetailsAndThrow() { + assertThatCode(() -> taskManager().await(TaskId.generateTaskId(), TIMEOUT)) + .isInstanceOf(TaskNotFoundException.class); + } + + @Test + default void awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow() { + TaskManager taskManager = taskManager(); + TaskId taskId = taskManager.submit(new MemoryReferenceTask( + () -> { + Thread.sleep(1000); + return Task.Result.COMPLETED; + })); - assertThat(taskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED); + assertThatCode(() -> taskManager.await(taskId, java.time.Duration.ofMillis(10))) + .isInstanceOf(TaskManager.ReachedTimeoutException.class); } @Test diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md index 91f498b..628ac7f 100644 --- a/src/site/markdown/server/manage-webadmin.md +++ b/src/site/markdown/server/manage-webadmin.md @@ -3249,16 +3249,29 @@ One can await the end of a task, then receive it's final execution report. That feature is especially usefull for testing purpose but still can serve real-life scenari. ``` -curl -XGET http://ip:port/tasks/3294a976-ce63-491e-bd52-1b6f465ed7a2/await +curl -XGET http://ip:port/tasks/3294a976-ce63-491e-bd52-1b6f465ed7a2/await?timeout=duration ``` An Execution Report will be returned. +`timeout` is optional. +By default it is set to 365 days (the maximum value). +The expected value is expressed in the following format: `Nunit`. +`N` should be strictly positive. +`unit` could be either in the short form (`s`, `m`, `h`, etc.), or in the long form (`day`, `week`, `month`, etc.). + +Examples: + - `30s` + - `5m` + - `7d` + - `1y` + Response codes: - 200: The specific task was found and the execution report exposed above is returned - - 400: Invalid task ID + - 400: Invalid task ID or invalid timeout - 404: Task ID was not found + - 408: The timeout has been reached ### Cancelling a task --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org