This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fffd183d26b0e3700e9fee68a8074592f0dce6ce
Author: Gautier DI FOLCO <gdifo...@linagora.com>
AuthorDate: Fri Aug 30 10:10:41 2019 +0200

    JAMES-2813 Add non-polling await in DistributedTaskManager
---
 .../task/eventsourcing/EventSourcingTaskManager.scala    | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 93b8302..14b2f2a 100644
--- 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -23,12 +23,13 @@ import java.util
 
 import com.google.common.annotations.VisibleForTesting
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, Subscriber}
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
 
-import scala.annotation.tailrec
+import reactor.core.publisher.Flux
 
 class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing](
                                                                                
   workQueueSupplier: WorkQueueSupplier,
@@ -37,8 +38,6 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
                                                                                
   val hostname: Hostname,
                                                                                
   val terminationSubscriber: TerminationSubscriber) extends TaskManager with 
Closeable {
 
-  private val delayBetweenPollingInMs = 500
-
   private def workDispatcher: Subscriber = {
     case Created(aggregateId, _, task, _) =>
       val taskWithId = new TaskWithId(aggregateId.taskId, task)
@@ -92,14 +91,19 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
     eventSourcingSystem.dispatch(command)
   }
 
-  @tailrec
   override final def await(id: TaskId): TaskExecutionDetails = {
     val details = getExecutionDetails(id)
     if (details.getStatus.isFinished) {
       details
     } else {
-      Thread.sleep(delayBetweenPollingInMs)
-      await(id)
+      Flux.from(terminationSubscriber.listenEvents)
+          .filter{
+            case event: TaskEvent => event.getAggregateId.taskId == id
+            case _ => false
+          }
+          .blockFirst()
+
+      getExecutionDetails(id)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to