This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fffd183d26b0e3700e9fee68a8074592f0dce6ce Author: Gautier DI FOLCO <gdifo...@linagora.com> AuthorDate: Fri Aug 30 10:10:41 2019 +0200 JAMES-2813 Add non-polling await in DistributedTaskManager --- .../task/eventsourcing/EventSourcingTaskManager.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 93b8302..14b2f2a 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -23,12 +23,13 @@ import java.util import com.google.common.annotations.VisibleForTesting import javax.inject.Inject + import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, Subscriber} import org.apache.james.task._ import org.apache.james.task.eventsourcing.TaskCommand._ -import scala.annotation.tailrec +import reactor.core.publisher.Flux class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( workQueueSupplier: WorkQueueSupplier, @@ -37,8 +38,6 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] val hostname: Hostname, val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable { - private val delayBetweenPollingInMs = 500 - private def workDispatcher: Subscriber = { case Created(aggregateId, _, task, _) => val taskWithId = new TaskWithId(aggregateId.taskId, task) @@ -92,14 +91,19 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] eventSourcingSystem.dispatch(command) } - @tailrec override final def await(id: TaskId): TaskExecutionDetails = { val details = getExecutionDetails(id) if (details.getStatus.isFinished) { details } else { - Thread.sleep(delayBetweenPollingInMs) - await(id) + Flux.from(terminationSubscriber.listenEvents) + .filter{ + case event: TaskEvent => event.getAggregateId.taskId == id + case _ => false + } + .blockFirst() + + getExecutionDetails(id) } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org