This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 83dc11f9d112a2bf8ac44131aa6f7e51695fcbac
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Jun 26 16:21:20 2019 +0200

    JAMES-2813 Implements a task manager using event sourcing
---
 server/task/pom.xml                                |  69 +++++++++++++-
 .../apache/james/task/SerialTaskManagerWorker.java |   2 +-
 .../james/task/eventsourcing/CommandHandlers.scala |  82 ++++++++++++++++
 .../task/eventsourcing/DecisionProjection.scala    |  43 +++++++++
 .../eventsourcing/EventSourcingTaskManager.scala   | 105 +++++++++++++++++++++
 .../apache/james/task/eventsourcing/Events.scala   |  39 ++++++++
 .../task/eventsourcing/RecentTasksProjection.scala |  40 ++++++++
 .../eventsourcing/ScalaEventSourcingSystem.scala   |  28 ++++++
 .../james/task/eventsourcing/TaskAggregate.scala   |  89 +++++++++++++++++
 .../james/task/eventsourcing/TaskAggregateId.scala |  34 +++++++
 .../james/task/eventsourcing/TaskCommand.scala     |  41 ++++++++
 .../TaskExecutionDetailsProjection.scala           |  53 +++++++++++
 .../task/eventsourcing/WorkerStatusListener.scala  |  38 ++++++++
 .../org/apache/james/task/TaskManagerContract.java |   6 ++
 .../EventSourcingTaskManagerTest.java              |  50 ++++++++++
 15 files changed, 717 insertions(+), 2 deletions(-)

diff --git a/server/task/pom.xml b/server/task/pom.xml
index bfafbb5..5d13d58 100644
--- a/server/task/pom.xml
+++ b/server/task/pom.xml
@@ -17,7 +17,8 @@
     specific language governing permissions and limitations
     under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
 
     <modelVersion>4.0.0</modelVersion>
 
@@ -30,9 +31,23 @@
     <artifactId>james-server-task</artifactId>
     <name>Apache James :: Server :: Task</name>
 
+    <properties>
+        <scala.base>2.12</scala.base>
+        <scala.version>${scala.base}.7</scala.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>event-sourcing-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>event-sourcing-event-store-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>james-server-util</artifactId>
         </dependency>
         <dependency>
@@ -70,6 +85,58 @@
             <artifactId>assertj-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.scala-lang.modules</groupId>
+            <artifactId>scala-java8-compat_${scala.base}</artifactId>
+            <version>0.9.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
     </dependencies>
 
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>net.alchim31.maven</groupId>
+                    <artifactId>scala-maven-plugin</artifactId>
+                    <version>3.4.4</version>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <version>2.0.2</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <addArgs>-Xlog-implicits</addArgs>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git 
a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java 
b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index d690be2..4736372 100644
--- 
a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -49,7 +49,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     private final Semaphore semaphore;
     private final Set<TaskId> cancelledTasks;
 
-    SerialTaskManagerWorker() {
+    public SerialTaskManagerWorker() {
         this.taskExecutor = 
Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
         this.cancelledTasks = Sets.newConcurrentHashSet();
         this.runningTask = new AtomicReference<>();
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
new file mode 100644
index 0000000..1bb8abf
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -0,0 +1,82 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import java.util
+
+import org.apache.james.eventsourcing.eventstore.History
+import org.apache.james.eventsourcing.{CommandHandler, Event}
+import org.apache.james.task.TaskId
+import org.apache.james.task.eventsourcing.TaskCommand._
+
+sealed abstract class TaskCommandHandler[T <: TaskCommand] extends 
CommandHandler[T] {
+
+  def loadAggregate(loadHistory: TaskAggregateId => History, taskId: TaskId): 
TaskAggregate = {
+    val aggregateId = TaskAggregateId(taskId)
+    TaskAggregate.fromHistory(aggregateId, loadHistory(aggregateId))
+  }
+}
+
+class CreateCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Create] {
+  override def handledClass: Class[Create] = classOf[Create]
+
+  override def handle(command: Create): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).create(command.task)
+  }
+}
+
+class StartCommandHandler(private val loadHistory: TaskAggregateId => History) 
extends TaskCommandHandler[Start] {
+  override def handledClass: Class[Start] = classOf[Start]
+
+  override def handle(command: Start): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).start()
+  }
+}
+
+class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[RequestCancel] {
+  override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
+
+  override def handle(command: RequestCancel): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).requestCancel()
+  }
+}
+
+class CompleteCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Complete] {
+  override def handledClass: Class[Complete] = classOf[Complete]
+
+  override def handle(command: Complete): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).complete(command.result)
+  }
+}
+
+class CancelCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Cancel] {
+  override def handledClass: Class[Cancel] = classOf[Cancel]
+
+  override def handle(command: Cancel): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).cancel()
+  }
+}
+
+class FailCommandHandler(private val loadHistory: TaskAggregateId => History) 
extends TaskCommandHandler[Fail] {
+  override def handledClass: Class[Fail] = classOf[Fail]
+
+  override def handle(command: Fail): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).fail()
+  }
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
new file mode 100644
index 0000000..d5394bb
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -0,0 +1,43 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.Event
+import org.apache.james.task.TaskManager.Status
+
+case class DecisionProjection(status: Option[Status]) {
+  val update: Event => DecisionProjection =
+    DecisionProjection.create
+}
+
+object DecisionProjection {
+  def create(event: Event): DecisionProjection = 
DecisionProjection(Some(fromEvent(event)))
+
+  def empty: DecisionProjection = DecisionProjection(None)
+
+  private def fromEvent(event: Event): Status = event match {
+    case event: Created => Status.WAITING
+    case event: Started => Status.IN_PROGRESS
+    case event: CancelRequested => Status.CANCEL_REQUESTED
+    case event: Cancelled => Status.CANCELLED
+    case event: Completed => Status.COMPLETED
+    case event: Failed => Status.FAILED
+  }
+}
+
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
new file mode 100644
index 0000000..9d6a3fa
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -0,0 +1,105 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import java.io.Closeable
+import java.util
+
+import com.google.common.annotations.VisibleForTesting
+import javax.inject.Inject
+import org.apache.james.eventsourcing.{AggregateId, Subscriber}
+import org.apache.james.eventsourcing.eventstore.{EventStore, History}
+import org.apache.james.task._
+import org.apache.james.task.eventsourcing.TaskCommand._
+
+import scala.annotation.tailrec
+
+class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing](val eventStore: EventStore) extends TaskManager with 
Closeable {
+
+  private val executionDetailsProjection = new TaskExecutionDetailsProjection
+  private val recentTasksProjection = new RecentTasksProjection()
+  private val workQueue: WorkQueue = WorkQueue.builder().worker(new 
SerialTaskManagerWorker)
+  private val delayBetweenPollingInMs = 500
+
+  private def workDispatcher: Subscriber = {
+    case Created(aggregateId, _, task) =>
+      val taskWithId = new TaskWithId(aggregateId.taskId, task)
+      workQueue.submit(taskWithId, new WorkerStatusListener(taskWithId.getId, 
eventSourcingSystem))
+    case CancelRequested(aggregateId, _) =>
+      workQueue.cancel(aggregateId.taskId)
+    case _ =>
+  }
+
+  import scala.collection.JavaConverters._
+
+  private val loadHistory: AggregateId => History = 
eventStore.getEventsOfAggregate _
+  private val eventSourcingSystem = ScalaEventSourcingSystem(
+    handlers = Set(
+      new CreateCommandHandler(loadHistory),
+      new StartCommandHandler(loadHistory),
+      new RequestCancelCommandHandler(loadHistory),
+      new CompleteCommandHandler(loadHistory),
+      new CancelCommandHandler(loadHistory),
+      new FailCommandHandler(loadHistory)),
+    subscribers = Set(
+      executionDetailsProjection.asSubscriber,
+      workDispatcher,
+      recentTasksProjection.asSubscriber),
+    eventStore = eventStore)
+
+  override def submit(task: Task): TaskId = {
+    val taskId = TaskId.generateTaskId
+    val command = Create(taskId, task)
+    eventSourcingSystem.dispatch(command)
+    taskId
+  }
+
+  override def getExecutionDetails(id: TaskId): TaskExecutionDetails = 
executionDetailsProjection.load(id)
+    .getOrElse(throw new TaskNotFoundException())
+
+  override def list: util.List[TaskExecutionDetails] = listScala.asJava
+
+  override def list(status: TaskManager.Status): 
util.List[TaskExecutionDetails] = listScala
+    .filter(details => details.getStatus == status)
+    .asJava
+
+  private def listScala: List[TaskExecutionDetails] = recentTasksProjection
+    .list()
+    .flatMap(executionDetailsProjection.load)
+
+  override def cancel(id: TaskId): Unit = {
+    val command = RequestCancel(id)
+    eventSourcingSystem.dispatch(command)
+  }
+
+  @tailrec
+  override final def await(id: TaskId): TaskExecutionDetails = {
+    val details = getExecutionDetails(id)
+    if (details.getStatus.isFinished) {
+      details
+    } else {
+      Thread.sleep(delayBetweenPollingInMs)
+      await(id)
+    }
+  }
+
+  override def close(): Unit = {
+    workQueue.close()
+  }
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
new file mode 100644
index 0000000..71f1c8b
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -0,0 +1,39 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.{AggregateId, Event, EventId}
+import org.apache.james.task.Task
+import org.apache.james.task.Task.Result
+
+sealed abstract class TaskEvent(aggregateId: TaskAggregateId, val eventId: 
EventId) extends Event {
+  override def getAggregateId: TaskAggregateId = aggregateId
+}
+
+case class Created(aggregateId: TaskAggregateId, override val eventId: 
EventId, task: Task) extends TaskEvent(aggregateId, eventId)
+
+case class Started(aggregateId: TaskAggregateId, override val eventId: 
EventId) extends TaskEvent(aggregateId, eventId)
+
+case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: 
EventId) extends TaskEvent(aggregateId, eventId)
+
+case class Completed(aggregateId: TaskAggregateId, override val eventId: 
EventId, result: Result) extends TaskEvent(aggregateId, eventId)
+
+case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) 
extends TaskEvent(aggregateId, eventId)
+
+case class Cancelled(aggregateId: TaskAggregateId, override val eventId: 
EventId) extends TaskEvent(aggregateId, eventId)
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
new file mode 100644
index 0000000..fced087
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
@@ -0,0 +1,40 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import java.util.concurrent.ConcurrentLinkedDeque
+
+import org.apache.james.eventsourcing.Subscriber
+import org.apache.james.task.TaskId
+
+class RecentTasksProjection() {
+
+  import scala.collection.JavaConverters._
+
+  private val tasks = new ConcurrentLinkedDeque[TaskId]
+
+  def list(): List[TaskId] = tasks.asScala.toList
+
+  private def add(taskId: TaskId): Unit = tasks.add(taskId)
+
+  def asSubscriber: Subscriber = {
+    case Created(aggregateId, _, _) => add(aggregateId.taskId)
+    case _ =>
+  }
+}
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala
new file mode 100644
index 0000000..16178b3
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala
@@ -0,0 +1,28 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.eventstore.EventStore
+import org.apache.james.eventsourcing.{CommandHandler, EventSourcingSystem, 
Subscriber}
+
+object ScalaEventSourcingSystem {
+  import scala.collection.JavaConverters._
+  def apply(handlers: Set[CommandHandler[_]], subscribers: Set[Subscriber], 
eventStore: EventStore): EventSourcingSystem =
+    new EventSourcingSystem(handlers.asJava, subscribers.asJava, eventStore)
+}
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
new file mode 100644
index 0000000..5fd5875
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -0,0 +1,89 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import java.util
+
+import org.apache.james.eventsourcing.eventstore.History
+import org.apache.james.eventsourcing.{Event, EventId}
+import org.apache.james.task.Task
+import org.apache.james.task.Task.Result
+import org.apache.james.task.TaskManager.Status
+
+import scala.collection.JavaConverters._
+
+class TaskAggregate private(val aggregateId: TaskAggregateId, private val 
history: History) {
+
+  private val currentStatus: Option[Status] = history
+    .getEvents
+    .asScala
+    .foldLeft(DecisionProjection.empty)((decision, event) => 
decision.update(event))
+    .status
+
+
+  def create(task: Task): util.List[Event] = {
+    if (currentStatus.isEmpty) {
+      createEventWithId(Created(aggregateId, _, task))
+    } else Nil.asJava
+  }
+
+  private[eventsourcing] def start(): util.List[Event] = {
+    currentStatus match {
+      case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _))
+      case _ => Nil.asJava
+    }
+  }
+
+  def requestCancel(): util.List[Event] = {
+    currentStatus match {
+      case Some(status) if !status.isFinished => 
createEventWithId(CancelRequested(aggregateId, _))
+      case _ => Nil.asJava
+    }
+  }
+
+  private[eventsourcing] def complete(result: Result): util.List[Event] = {
+    currentStatus match {
+      case Some(status) if !status.isFinished => 
createEventWithId(Completed(aggregateId, _, result))
+      case _ => Nil.asJava
+    }
+  }
+
+  private[eventsourcing] def fail(): util.List[Event] = {
+    currentStatus match {
+      case Some(status) if !status.isFinished => 
createEventWithId(Failed(aggregateId, _))
+      case _ => Nil.asJava
+    }
+  }
+
+  private[eventsourcing] def cancel(): util.List[Event] = {
+    currentStatus match {
+      case Some(status) if !status.isFinished => 
createEventWithId(Cancelled(aggregateId, _))
+      case _ => Nil.asJava
+    }
+  }
+
+  private def createEventWithId(event: EventId => Event): util.List[Event] =
+    List(history.getNextEventId)
+      .map({ eventId => event(eventId) })
+      .asJava
+}
+
+object TaskAggregate {
+  def fromHistory(aggregateId: TaskAggregateId, history: History) = new 
TaskAggregate(aggregateId, history)
+}
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala
new file mode 100644
index 0000000..e6e8812
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala
@@ -0,0 +1,34 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.AggregateId
+import org.apache.james.task.TaskId
+
+case class TaskAggregateId(taskId: TaskId) extends AggregateId {
+
+  import TaskAggregateId._
+
+  override def asAggregateKey: String = s"$PREFIX$SEPARATOR${taskId.asString}"
+}
+
+object TaskAggregateId {
+  private val PREFIX = "Task"
+  private val SEPARATOR = "/"
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
new file mode 100644
index 0000000..973c8a9
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala
@@ -0,0 +1,41 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.Command
+import org.apache.james.task.Task.Result
+import org.apache.james.task.{Task, TaskId}
+
+sealed trait TaskCommand extends Command
+
+object TaskCommand {
+
+  case class Create(id: TaskId, task: Task) extends TaskCommand
+
+  case class Start(id: TaskId) extends TaskCommand
+
+  case class Complete(id: TaskId, result: Result) extends TaskCommand
+
+  case class RequestCancel(id: TaskId) extends TaskCommand
+
+  case class Fail(id: TaskId) extends TaskCommand
+
+  case class Cancel(id: TaskId) extends TaskCommand
+
+}
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
new file mode 100644
index 0000000..2cc8419
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -0,0 +1,53 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+package org.apache.james.task.eventsourcing
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.james.eventsourcing.Subscriber
+import org.apache.james.task.{TaskExecutionDetails, TaskId}
+
+class TaskExecutionDetailsProjection() {
+  private[this] val projections = new ConcurrentHashMap[TaskId, 
TaskExecutionDetails]
+
+  val asSubscriber: Subscriber = {
+    case created: Created =>
+      update(created.getAggregateId.taskId, 
TaskExecutionDetails.from(created.task, created.aggregateId.taskId))
+    case cancelRequested: CancelRequested =>
+      update(cancelRequested.aggregateId.taskId)(_.cancelRequested())
+    case started: Started =>
+      update(started.aggregateId.taskId)(_.start())
+    case completed: Completed =>
+      update(completed.aggregateId.taskId)(_.completed())
+    case failed: Failed =>
+      update(failed.aggregateId.taskId)(_.failed())
+    case canceled: Cancelled =>
+      update(canceled.aggregateId.taskId)(_.cancelEffectively())
+    case _ =>
+  }
+
+  def load(taskId: TaskId): Option[TaskExecutionDetails] = 
Option(projections.get(taskId))
+
+  private def update(taskId: TaskId, details: TaskExecutionDetails): Unit = 
projections.put(taskId, details)
+
+  private def update(taskId: TaskId)(updater: TaskExecutionDetails => 
TaskExecutionDetails): Unit =
+    load(taskId)
+      .map(updater)
+      .foreach(update(taskId, _))
+}
\ No newline at end of file
diff --git 
a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
new file mode 100644
index 0000000..7e1ca87
--- /dev/null
+++ 
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -0,0 +1,38 @@
+/** **************************************************************
+  * Licensed to the Apache Software Foundation (ASF) under one   *
+  * or more contributor license agreements.  See the NOTICE file *
+  * distributed with this work for additional information        *
+  * regarding copyright ownership.  The ASF licenses this file   *
+  * to you under the Apache License, Version 2.0 (the            *
+  * "License"); you may not use this file except in compliance   *
+  * with the License.  You may obtain a copy of the License at   *
+  * *
+  * http://www.apache.org/licenses/LICENSE-2.0                 *
+  * *
+  * Unless required by applicable law or agreed to in writing,   *
+  * software distributed under the License is distributed on an  *
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+  * KIND, either express or implied.  See the License for the    *
+  * specific language governing permissions and limitations      *
+  * under the License.                                           *
+  * ***************************************************************/
+
+package org.apache.james.task.eventsourcing
+
+import org.apache.james.eventsourcing.EventSourcingSystem
+import org.apache.james.task.Task.Result
+import org.apache.james.task.eventsourcing.TaskCommand._
+import org.apache.james.task.{TaskId, TaskManagerWorker}
+
+class WorkerStatusListener(taskId: TaskId, eventSourcingSystem: 
EventSourcingSystem) extends TaskManagerWorker.Listener {
+
+  override def started(): Unit = eventSourcingSystem.dispatch(Start(taskId))
+
+  override def completed(result: Result): Unit = 
eventSourcingSystem.dispatch(Complete(taskId, result))
+
+  override def failed(t: Throwable): Unit = 
eventSourcingSystem.dispatch(Fail(taskId))
+
+  override def failed(): Unit = eventSourcingSystem.dispatch(Fail(taskId))
+
+  override def cancelled(): Unit = eventSourcingSystem.dispatch(Cancel(taskId))
+}
\ No newline at end of file
diff --git 
a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java 
b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
index 9c93da4..3b5c5f8 100644
--- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
+++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java
@@ -32,6 +32,12 @@ public interface TaskManagerContract {
     TaskManager taskManager();
 
     @Test
+    default void submitShouldReturnATaskId() {
+        TaskId taskId = taskManager().submit(() -> Task.Result.COMPLETED);
+        assertThat(taskId).isNotNull();
+    }
+
+    @Test
     default void getStatusShouldReturnUnknownWhenUnknownId() {
         TaskId unknownId = TaskId.generateTaskId();
         assertThatThrownBy(() -> taskManager().getExecutionDetails(unknownId))
diff --git 
a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
new file mode 100644
index 0000000..16b2b0b
--- /dev/null
+++ 
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing;
+
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore;
+import org.apache.james.task.CountDownLatchExtension;
+import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskManagerContract;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(CountDownLatchExtension.class)
+class EventSourcingTaskManagerTest implements TaskManagerContract {
+
+    private EventSourcingTaskManager taskManager;
+    @BeforeEach
+    void setUp() {
+        EventStore eventStore = new InMemoryEventStore();
+        taskManager = new EventSourcingTaskManager(eventStore);
+    }
+
+    @AfterEach
+    void tearDown() {
+        taskManager.close();
+    }
+
+    @Override
+    public TaskManager taskManager() {
+        return taskManager;
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to