This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b847270a3f319d9d60df0e242bda7de9c6f8fe47
Author: Gautier DI FOLCO <gdifo...@linagora.com>
AuthorDate: Fri Aug 30 10:01:01 2019 +0200

    JAMES-2813 Add timeout in TaskManager.await
---
 .../james/adapter/mailbox/ReIndexerManagement.java |  9 +--
 .../org/apache/james/util/DurationParserTest.java  | 16 ++---
 .../apache/james/webadmin/routes/TasksRoutes.java  | 58 ++++++++++++++--
 .../james/webadmin/routes/TasksRoutesTest.java     | 80 +++++++++++++++++++++-
 .../distributed/DistributedTaskManagerTest.java    | 25 ++++++-
 .../org/apache/james/task/MemoryTaskManager.java   | 17 ++---
 .../java/org/apache/james/task/TaskManager.java    |  6 +-
 .../eventsourcing/EventSourcingTaskManager.scala   | 40 +++++++----
 .../org/apache/james/task/TaskManagerContract.java | 36 +++++++---
 src/site/markdown/server/manage-webadmin.md        | 17 ++++-
 10 files changed, 247 insertions(+), 57 deletions(-)

diff --git 
a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
 
b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
index 075795a..6696d54 100644
--- 
a/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
+++ 
b/server/container/mailbox-jmx/src/main/java/org/apache/james/adapter/mailbox/ReIndexerManagement.java
@@ -21,6 +21,7 @@ package org.apache.james.adapter.mailbox;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.time.Duration;
 
 import javax.inject.Inject;
 import javax.inject.Named;
@@ -51,8 +52,8 @@ public class ReIndexerManagement implements 
ReIndexerManagementMBean {
                      .addContext(MDCBuilder.ACTION, "reIndex")
                      .build()) {
             TaskId taskId = taskManager.submit(reIndexer.reIndex(new 
MailboxPath(namespace, user, name)));
-            taskManager.await(taskId);
-        } catch (IOException e) {
+            taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE));
+        } catch (IOException | TaskManager.ReachedTimeoutException e) {
             throw new RuntimeException(e);
         }
     }
@@ -65,8 +66,8 @@ public class ReIndexerManagement implements 
ReIndexerManagementMBean {
                      .addContext(MDCBuilder.ACTION, "reIndex")
                      .build()) {
             TaskId taskId = taskManager.submit(reIndexer.reIndex());
-            taskManager.await(taskId);
-        } catch (IOException e) {
+            taskManager.await(taskId, Duration.ofMillis(Long.MAX_VALUE));
+        } catch (IOException | TaskManager.ReachedTimeoutException e) {
             throw new RuntimeException(e);
         }
     }
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java
index fa997ae..e0b573c 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/DurationParserTest.java
@@ -60,56 +60,56 @@ class DurationParserTest {
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2", "2 ms", "2 msec", "2 msecs", "2 Ms"})
+    @ValueSource(strings = {"2ms", "2", "2 ms", "2 msec", "2 msecs", "2 Ms"})
     void parseShouldHandleMilliseconds(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(Duration.ofMillis(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 s", "2 sec", "2 Sec", "2 second", "2 seconds"})
+    @ValueSource(strings = {"2s", "2 s", "2 sec", "2 Sec", "2 second", "2 
seconds"})
     void parseShouldHandleSeconds(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(Duration.ofSeconds(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 m", "2 min", "2 mins", "2 minute", "2 Minute", 
"2 minutes"})
+    @ValueSource(strings = {"2m", "2 m", "2 min", "2 mins", "2 minute", "2 
Minute", "2 minutes"})
     void parseShouldHandleMinutes(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(Duration.ofMinutes(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 h", "2 hour", "2 Hour", "2 hours"})
+    @ValueSource(strings = {"2h", "2 h", "2 hour", "2 Hour", "2 hours"})
     void parseShouldHandleHours(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(Duration.ofHours(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 d", "2 day", "2 Day", "2 days"})
+    @ValueSource(strings = {"2d", "2 d", "2 day", "2 Day", "2 days"})
     void parseShouldHandleDays(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(Duration.ofDays(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 w", "2 week", "2 Week", "2 weeks"})
+    @ValueSource(strings = {"2w", "2 w", "2 week", "2 Week", "2 weeks"})
     void parseShouldHandleWeeks(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(ChronoUnit.WEEKS.getDuration().multipliedBy(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 months", "2 month", "2 Month"})
+    @ValueSource(strings = {"2months", "2 months", "2 month", "2 Month"})
     void parseShouldHandleMonths(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(ChronoUnit.MONTHS.getDuration().multipliedBy(2));
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"2 y", "2 year", "2 Year", "2 years"})
+    @ValueSource(strings = {"2y", "2 y", "2 year", "2 Year", "2 years"})
     void parseShouldHandleYears(String input) {
         assertThat(DurationParser.parse(input))
             .isEqualTo(ChronoUnit.YEARS.getDuration().multipliedBy(2));
diff --git 
a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java
 
b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java
index 84f6eff..35db8b8 100644
--- 
a/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java
+++ 
b/server/protocols/webadmin/webadmin-core/src/main/java/org/apache/james/webadmin/routes/TasksRoutes.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.webadmin.routes;
 
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Supplier;
@@ -33,13 +35,16 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskNotFoundException;
+import org.apache.james.util.DurationParser;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.dto.ExecutionDetailsDto;
 import org.apache.james.webadmin.utils.ErrorResponder;
 import org.apache.james.webadmin.utils.JsonTransformer;
 import org.apache.james.webadmin.utils.Responses;
+
 import org.eclipse.jetty.http.HttpStatus;
 
+import com.google.common.base.Preconditions;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
@@ -54,9 +59,10 @@ import spark.Service;
 @Path(":task")
 @Produces("application/json")
 public class TasksRoutes implements Routes {
-    public static final String BASE = "/tasks";
+    private static final Duration MAXIMUM_AWAIT_TIMEOUT = Duration.ofDays(365);
     private final TaskManager taskManager;
     private final JsonTransformer jsonTransformer;
+    public static final String BASE = "/tasks";
 
     @Inject
     public TasksRoutes(TaskManager taskManager, JsonTransformer 
jsonTransformer) {
@@ -133,13 +139,15 @@ public class TasksRoutes implements Routes {
     @ApiOperation(value = "Await, then get a task execution details")
     @ApiResponses(value = {
         @ApiResponse(code = HttpStatus.OK_200, message = "A specific class 
execution details", response = ExecutionDetailsDto.class),
-        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "The taskId 
is invalid"),
-        @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "The taskId is 
not found")
+        @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "The taskId 
is invalid or invalid timeout"),
+        @ApiResponse(code = HttpStatus.NOT_FOUND_404, message = "The taskId is 
not found"),
+        @ApiResponse(code = HttpStatus.REQUEST_TIMEOUT_408, message = "Waiting 
the the task completion has reached the timeout")
     })
     public Object await(Request req, Response response) {
         TaskId taskId = getTaskId(req);
+        Duration timeout = getTimeout(req);
         return respondStatus(taskId,
-            () -> taskManager.await(getTaskId(req)));
+            () -> awaitTask(taskId, timeout));
     }
 
     private Object respondStatus(TaskId taskId, Supplier<TaskExecutionDetails> 
executionDetailsSupplier) {
@@ -181,4 +189,46 @@ public class TasksRoutes implements Routes {
                 .haltError();
         }
     }
+
+    private Duration getTimeout(Request req) {
+        try {
+            Optional<String> requestTimeout = 
Optional.ofNullable(req.queryParams("timeout"))
+                .filter(parameter -> !parameter.isEmpty());
+
+            requestTimeout.ifPresent(timeout ->
+                Preconditions.checkState(!timeout.replaceAll(" ", 
"").startsWith("-"), "Timeout should be positive"));
+
+            Duration timeout = requestTimeout
+                .map(rawString -> DurationParser.parse(rawString, 
ChronoUnit.SECONDS))
+                .orElse(MAXIMUM_AWAIT_TIMEOUT);
+
+            Preconditions.checkState(timeout.compareTo(MAXIMUM_AWAIT_TIMEOUT) 
<= 0, "Timeout should not exceed one year");
+            return timeout;
+        } catch (Exception e) {
+            throw ErrorResponder.builder()
+                .statusCode(HttpStatus.BAD_REQUEST_400)
+                .cause(e)
+                .type(ErrorResponder.ErrorType.INVALID_ARGUMENT)
+                .message("Invalid timeout " + e.getMessage())
+                .haltError();
+        }
+    }
+
+    private TaskExecutionDetails awaitTask(TaskId taskId, Duration timeout) {
+        try {
+            return taskManager.await(taskId, timeout);
+        } catch (TaskManager.ReachedTimeoutException e) {
+            throw ErrorResponder.builder()
+                .statusCode(HttpStatus.REQUEST_TIMEOUT_408)
+                .type(ErrorResponder.ErrorType.SERVER_ERROR)
+                .message("The timeout has been reached")
+                .haltError();
+        } catch (TaskNotFoundException e) {
+            throw ErrorResponder.builder()
+                .statusCode(HttpStatus.NOT_FOUND_404)
+                .type(ErrorResponder.ErrorType.SERVER_ERROR)
+                .message("The taskId is not found")
+                .haltError();
+        }
+    }
 }
diff --git 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
index bdd8081..f31ec43 100644
--- 
a/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-core/src/test/java/org/apache/james/webadmin/routes/TasksRoutesTest.java
@@ -22,6 +22,8 @@ package org.apache.james.webadmin.routes;
 import static io.restassured.RestAssured.given;
 import static io.restassured.RestAssured.when;
 import static io.restassured.RestAssured.with;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -199,6 +201,82 @@ class TasksRoutesTest {
     }
 
     @Test
+    void getAwaitWithTimeoutShouldAwaitTaskCompletion() {
+        TaskId taskId = taskManager.submit(() -> {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return Task.Result.COMPLETED;
+        });
+
+        given()
+            .queryParam("timeout", "2s")
+        .when()
+            .get("/" + taskId.getValue() + "/await")
+        .then()
+            .statusCode(HttpStatus.OK_200)
+            .body("status", is("completed"));
+    }
+
+    @Test
+    void getAwaitWithInvalidTimeoutShouldReturnAnError() {
+        TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED);
+
+        given()
+            .queryParam("timeout", "-5")
+        .when()
+            .get("/" + taskId.getValue() + "/await")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("message", allOf(containsString("Invalid timeout"), 
containsString("Timeout should be positive")));
+    }
+
+    @Test
+    void getAwaitWithATooBigTimeoutShouldReturnAnError() {
+        TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED);
+
+        given()
+            .queryParam("timeout", "5y")
+        .when()
+            .get("/" + taskId.getValue() + "/await")
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("message", allOf(containsString("Invalid timeout"), 
containsString("Timeout should not exceed one year")));
+    }
+
+    @Test
+    void getAwaitWithAShorterTimeoutShouldReturnTimeout() {
+        TaskId taskId = taskManager.submit(() -> {
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return Task.Result.COMPLETED;
+        });
+
+        given()
+            .queryParam("timeout", "1")
+        .when()
+            .get("/" + taskId.getValue() + "/await")
+        .then()
+            .statusCode(HttpStatus.REQUEST_TIMEOUT_408)
+            .body("message", is("The timeout has been reached"));
+    }
+
+    @Test
+    void getAwaitWithANonExistingTaskShouldReturnNotFound() {
+        String unknownTaskId = TaskId.generateTaskId().asString();
+        when()
+            .get("/" + unknownTaskId + "/await")
+        .then()
+            .statusCode(HttpStatus.NOT_FOUND_404)
+            .body("message", is("The taskId is not found"));
+    }
+
+    @Test
     void getAwaitShouldNotFailUponError() {
         TaskId taskId = taskManager.submit(() -> {
             throw new RuntimeException();
@@ -333,4 +411,4 @@ class TasksRoutesTest {
             .body("type", is("InvalidArgument"))
             .body("message", is("Invalid status query parameter"));
     }
-}
\ No newline at end of file
+}
diff --git 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
index f61cbc1..a8e43c5 100644
--- 
a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
+++ 
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java
@@ -123,7 +123,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             Awaitility.await()
                 .atMost(Duration.FIVE_SECONDS)
                 .pollInterval(100L, TimeUnit.MILLISECONDS)
-                .until(() -> taskManager1.await(taskId).getStatus() == 
TaskManager.Status.COMPLETED);
+                .until(() -> taskManager1.await(taskId, TIMEOUT).getStatus() 
== TaskManager.Status.COMPLETED);
 
             TaskExecutionDetails detailsFromTaskManager1 = 
taskManager1.getExecutionDetails(taskId);
             TaskExecutionDetails detailsFromTaskManager2 = 
taskManager2.getExecutionDetails(taskId);
@@ -150,7 +150,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             Awaitility.await()
                 .atMost(Duration.ONE_SECOND)
                 .pollInterval(100L, TimeUnit.MILLISECONDS)
-                .until(() -> taskManager1.await(waitingTaskId).getStatus() == 
TaskManager.Status.COMPLETED);
+                .until(() -> taskManager1.await(waitingTaskId, 
TIMEOUT).getStatus() == TaskManager.Status.COMPLETED);
         }
     }
 
@@ -165,7 +165,7 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
                 Awaitility.await()
                     .atMost(Duration.ONE_SECOND)
                     .pollInterval(100L, TimeUnit.MILLISECONDS)
-                    .until(() -> taskManager1.await(taskId).getStatus() == 
TaskManager.Status.COMPLETED);
+                    .until(() -> taskManager1.await(taskId, 
TIMEOUT).getStatus() == TaskManager.Status.COMPLETED);
 
                 TaskExecutionDetails executionDetails = 
taskManager2.getExecutionDetails(taskId);
                 
assertThat(executionDetails.getSubmittedNode()).isEqualTo(HOSTNAME_2);
@@ -203,6 +203,25 @@ class DistributedTaskManagerTest implements 
TaskManagerContract {
             .contains(remoteTaskManager.getKey());
     }
 
+    @Test
+    void 
givenTwoTaskManagersATaskRunningOnOneShouldBeWaitableFromTheOtherOne() throws 
TaskManager.ReachedTimeoutException {
+        TaskManager taskManager1 = taskManager(HOSTNAME);
+        TaskManager taskManager2 = taskManager(HOSTNAME_2);
+        TaskId id = taskManager1.submit(new MemoryReferenceTask(() -> {
+            Thread.sleep(250);
+            return Task.Result.COMPLETED;
+        }));
+
+        awaitUntilTaskHasStatus(id, TaskManager.Status.IN_PROGRESS, 
taskManager1);
+        Hostname runningNode = 
taskManager1.getExecutionDetails(id).getRanNode().get();
+
+        TaskManager remoteTaskManager = getOtherTaskManager(runningNode, 
Pair.of(HOSTNAME, taskManager1), Pair.of(HOSTNAME_2, taskManager2)).getValue();
+
+        remoteTaskManager.await(id, TIMEOUT);
+        assertThat(taskManager1.getExecutionDetails(id).getStatus())
+            .isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
     private Pair<Hostname, TaskManager> getOtherTaskManager(Hostname node, 
Pair<Hostname, TaskManager> taskManager1, Pair<Hostname, TaskManager> 
taskManager2) {
         if (node.equals(taskManager1.getKey())) {
             return taskManager2;
diff --git 
a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java 
b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 90e88c0..611cbde 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -123,7 +123,7 @@ public class MemoryTaskManager implements TaskManager {
         return ImmutableList.copyOf(tasksFiltered(status).values());
     }
 
-    public Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
+    private Map<TaskId, TaskExecutionDetails> tasksFiltered(Status status) {
         return idToExecutionDetails.entrySet()
             .stream()
             .filter(details -> details.getValue().getStatus().equals(status))
@@ -140,17 +140,14 @@ public class MemoryTaskManager implements TaskManager {
     }
 
     @Override
-    public TaskExecutionDetails await(TaskId id) {
-        if (Optional.ofNullable(idToExecutionDetails.get(id)).isPresent()) {
+    public TaskExecutionDetails await(TaskId id, Duration timeout) throws 
TaskNotFoundException, ReachedTimeoutException {
+        try {
             return Flux.interval(NOW, AWAIT_POLLING_DURATION, 
Schedulers.elastic())
                 .map(ignored -> getExecutionDetails(id))
-                .filter(details -> details.getStatus() == Status.COMPLETED
-                    || details.getStatus() == Status.FAILED
-                    || details.getStatus() == Status.CANCELLED)
-                .take(1)
-                .blockFirst();
-        } else {
-            return null;
+                .filter(details -> details.getStatus().isFinished())
+                .blockFirst(timeout);
+        } catch (IllegalStateException e) {
+            throw new ReachedTimeoutException();
         }
     }
 
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManager.java 
b/server/task/src/main/java/org/apache/james/task/TaskManager.java
index b04f2c2..5aca267 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.task;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 
@@ -58,6 +59,9 @@ public interface TaskManager {
         }
     }
 
+    class ReachedTimeoutException extends Exception {
+    }
+
     TaskId submit(Task task);
 
     TaskExecutionDetails getExecutionDetails(TaskId id);
@@ -68,5 +72,5 @@ public interface TaskManager {
 
     void cancel(TaskId id);
 
-    TaskExecutionDetails await(TaskId id);
+    TaskExecutionDetails await(TaskId id, Duration timeout) throws 
TaskNotFoundException, ReachedTimeoutException;
 }
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 14b2f2a..4e281de 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -19,17 +19,20 @@
 package org.apache.james.task.eventsourcing
 
 import java.io.Closeable
+import java.time.Duration
 import java.util
 
-import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
 
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, Subscriber}
+import org.apache.james.task.TaskManager.ReachedTimeoutException
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
 
-import reactor.core.publisher.Flux
+import com.google.common.annotations.VisibleForTesting
+import reactor.core.publisher.{Flux, Mono}
+import reactor.core.scheduler.Schedulers
 
 class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing](
                                                                                
   workQueueSupplier: WorkQueueSupplier,
@@ -91,19 +94,26 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
     eventSourcingSystem.dispatch(command)
   }
 
-  override final def await(id: TaskId): TaskExecutionDetails = {
-    val details = getExecutionDetails(id)
-    if (details.getStatus.isFinished) {
-      details
-    } else {
-      Flux.from(terminationSubscriber.listenEvents)
-          .filter{
-            case event: TaskEvent => event.getAggregateId.taskId == id
-            case _ => false
-          }
-          .blockFirst()
-
-      getExecutionDetails(id)
+  @throws(classOf[TaskNotFoundException])
+  @throws(classOf[ReachedTimeoutException])
+  override def await(id: TaskId, timeout: Duration): TaskExecutionDetails = {
+    try {
+      val details = Mono.fromSupplier[TaskExecutionDetails](() => 
getExecutionDetails(id))
+        .filter(_.getStatus.isFinished)
+
+      val findEvent = Flux.from(terminationSubscriber.listenEvents)
+        .filter {
+          case event: TaskEvent => event.getAggregateId.taskId == id
+          case _ => false
+        }
+        .next()
+        .then(details)
+
+      Flux.merge(findEvent, details)
+        .subscribeOn(Schedulers.elastic)
+        .blockFirst(timeout)
+    } catch {
+      case _: IllegalStateException => throw new ReachedTimeoutException
     }
   }
 
diff --git 
a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java 
b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
index 2fa605d..6078752 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -47,6 +47,7 @@ public interface TaskManagerContract {
         .pollDelay(slowPacedPollInterval)
         .await();
     ConditionFactory awaitAtMostFiveSeconds = calmlyAwait.atMost(FIVE_SECONDS);
+    java.time.Duration TIMEOUT = java.time.Duration.ofMillis(Long.MAX_VALUE);
 
     TaskManager taskManager();
 
@@ -410,16 +411,15 @@ public interface TaskManagerContract {
     }
 
     @Test
-    default void awaitShouldNotThrowWhenCompletedTask() {
+    default void awaitShouldNotThrowWhenCompletedTask() throws 
TaskManager.ReachedTimeoutException {
         TaskManager taskManager = taskManager();
-        TaskId taskId = taskManager.submit(
-            new CompletedTask());
-        taskManager.await(taskId);
-        taskManager.await(taskId);
+        TaskId taskId = taskManager.submit(new CompletedTask());
+        taskManager.await(taskId, TIMEOUT);
+        taskManager.await(taskId, TIMEOUT);
     }
 
     @Test
-    default void awaitShouldAwaitWaitingTask() {
+    default void awaitShouldAwaitWaitingTask() throws 
TaskManager.ReachedTimeoutException, InterruptedException {
         TaskManager taskManager = taskManager();
         CountDownLatch latch = new CountDownLatch(1);
         taskManager.submit(new MemoryReferenceTask(
@@ -428,10 +428,28 @@ public interface TaskManagerContract {
                 return Task.Result.COMPLETED;
             }));
         latch.countDown();
-        TaskId task2 = taskManager.submit(
-            new CompletedTask());
+        TaskId task2 = taskManager.submit(new CompletedTask());
+
+        assertThat(taskManager.await(task2, 
TIMEOUT).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+    }
+
+    @Test
+    default void 
awaitANonExistingTaskShouldReturnAnUnknownAwaitedTaskExecutionDetailsAndThrow() 
{
+        assertThatCode(() -> taskManager().await(TaskId.generateTaskId(), 
TIMEOUT))
+            .isInstanceOf(TaskNotFoundException.class);
+    }
+
+    @Test
+    default void 
awaitWithATooShortTimeoutShouldReturnATimeoutAwaitedTaskExecutionDetailsAndThrow()
 {
+        TaskManager taskManager = taskManager();
+        TaskId taskId = taskManager.submit(new MemoryReferenceTask(
+            () -> {
+                Thread.sleep(1000);
+                return Task.Result.COMPLETED;
+            }));
 
-        
assertThat(taskManager.await(task2).getStatus()).isEqualTo(TaskManager.Status.COMPLETED);
+        assertThatCode(() -> taskManager.await(taskId, 
java.time.Duration.ofMillis(10)))
+            .isInstanceOf(TaskManager.ReachedTimeoutException.class);
     }
 
     @Test
diff --git a/src/site/markdown/server/manage-webadmin.md 
b/src/site/markdown/server/manage-webadmin.md
index 91f498b..628ac7f 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -3249,16 +3249,29 @@ One can await the end of a task, then receive it's 
final execution report.
 That feature is especially usefull for testing purpose but still can serve 
real-life scenari.
 
 ```
-curl -XGET http://ip:port/tasks/3294a976-ce63-491e-bd52-1b6f465ed7a2/await
+curl -XGET 
http://ip:port/tasks/3294a976-ce63-491e-bd52-1b6f465ed7a2/await?timeout=duration
 ```
 
 An Execution Report will be returned.
 
+`timeout` is optional.
+By default it is set to 365 days (the maximum value).
+The expected value is expressed in the following format: `Nunit`.
+`N` should be strictly positive.
+`unit` could be either in the short form (`s`, `m`, `h`, etc.), or in the long 
form (`day`, `week`, `month`, etc.).
+
+Examples:
+ - `30s`
+ - `5m`
+ - `7d`
+ - `1y`
+
 Response codes:
 
  - 200: The specific task was found and the execution report exposed above is 
returned
- - 400: Invalid task ID
+ - 400: Invalid task ID or invalid timeout
  - 404: Task ID was not found
+ - 408: The timeout has been reached
 
 ### Cancelling a task
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to