This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 83dc11f9d112a2bf8ac44131aa6f7e51695fcbac Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Jun 26 16:21:20 2019 +0200 JAMES-2813 Implements a task manager using event sourcing --- server/task/pom.xml | 69 +++++++++++++- .../apache/james/task/SerialTaskManagerWorker.java | 2 +- .../james/task/eventsourcing/CommandHandlers.scala | 82 ++++++++++++++++ .../task/eventsourcing/DecisionProjection.scala | 43 +++++++++ .../eventsourcing/EventSourcingTaskManager.scala | 105 +++++++++++++++++++++ .../apache/james/task/eventsourcing/Events.scala | 39 ++++++++ .../task/eventsourcing/RecentTasksProjection.scala | 40 ++++++++ .../eventsourcing/ScalaEventSourcingSystem.scala | 28 ++++++ .../james/task/eventsourcing/TaskAggregate.scala | 89 +++++++++++++++++ .../james/task/eventsourcing/TaskAggregateId.scala | 34 +++++++ .../james/task/eventsourcing/TaskCommand.scala | 41 ++++++++ .../TaskExecutionDetailsProjection.scala | 53 +++++++++++ .../task/eventsourcing/WorkerStatusListener.scala | 38 ++++++++ .../org/apache/james/task/TaskManagerContract.java | 6 ++ .../EventSourcingTaskManagerTest.java | 50 ++++++++++ 15 files changed, 717 insertions(+), 2 deletions(-) diff --git a/server/task/pom.xml b/server/task/pom.xml index bfafbb5..5d13d58 100644 --- a/server/task/pom.xml +++ b/server/task/pom.xml @@ -17,7 +17,8 @@ specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -30,9 +31,23 @@ <artifactId>james-server-task</artifactId> <name>Apache James :: Server :: Task</name> + <properties> + <scala.base>2.12</scala.base> + <scala.version>${scala.base}.7</scala.version> + </properties> + <dependencies> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>event-sourcing-core</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>event-sourcing-event-store-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-util</artifactId> </dependency> <dependency> @@ -70,6 +85,58 @@ <artifactId>assertj-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-java8-compat_${scala.base}</artifactId> + <version>0.9.0</version> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> </dependencies> + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <version>3.4.4</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>2.0.2</version> + </plugin> + </plugins> + </pluginManagement> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>scala-compile-first</id> + <phase>process-resources</phase> + <goals> + <goal>add-source</goal> + <goal>compile</goal> + </goals> + </execution> + <execution> + <id>scala-test-compile</id> + <phase>process-test-resources</phase> + <goals> + <goal>testCompile</goal> + </goals> + </execution> + </executions> + <configuration> + <addArgs>-Xlog-implicits</addArgs> + </configuration> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index d690be2..4736372 100644 --- a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -49,7 +49,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { private final Semaphore semaphore; private final Set<TaskId> cancelledTasks; - SerialTaskManagerWorker() { + public SerialTaskManagerWorker() { this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor")); this.cancelledTasks = Sets.newConcurrentHashSet(); this.runningTask = new AtomicReference<>(); diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala new file mode 100644 index 0000000..1bb8abf --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -0,0 +1,82 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import java.util + +import org.apache.james.eventsourcing.eventstore.History +import org.apache.james.eventsourcing.{CommandHandler, Event} +import org.apache.james.task.TaskId +import org.apache.james.task.eventsourcing.TaskCommand._ + +sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandler[T] { + + def loadAggregate(loadHistory: TaskAggregateId => History, taskId: TaskId): TaskAggregate = { + val aggregateId = TaskAggregateId(taskId) + TaskAggregate.fromHistory(aggregateId, loadHistory(aggregateId)) + } +} + +class CreateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Create] { + override def handledClass: Class[Create] = classOf[Create] + + override def handle(command: Create): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).create(command.task) + } +} + +class StartCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Start] { + override def handledClass: Class[Start] = classOf[Start] + + override def handle(command: Start): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).start() + } +} + +class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[RequestCancel] { + override def handledClass: Class[RequestCancel] = classOf[RequestCancel] + + override def handle(command: RequestCancel): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).requestCancel() + } +} + +class CompleteCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Complete] { + override def handledClass: Class[Complete] = classOf[Complete] + + override def handle(command: Complete): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).complete(command.result) + } +} + +class CancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Cancel] { + override def handledClass: Class[Cancel] = classOf[Cancel] + + override def handle(command: Cancel): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).cancel() + } +} + +class FailCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Fail] { + override def handledClass: Class[Fail] = classOf[Fail] + + override def handle(command: Fail): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).fail() + } +} \ No newline at end of file diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala new file mode 100644 index 0000000..d5394bb --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala @@ -0,0 +1,43 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.Event +import org.apache.james.task.TaskManager.Status + +case class DecisionProjection(status: Option[Status]) { + val update: Event => DecisionProjection = + DecisionProjection.create +} + +object DecisionProjection { + def create(event: Event): DecisionProjection = DecisionProjection(Some(fromEvent(event))) + + def empty: DecisionProjection = DecisionProjection(None) + + private def fromEvent(event: Event): Status = event match { + case event: Created => Status.WAITING + case event: Started => Status.IN_PROGRESS + case event: CancelRequested => Status.CANCEL_REQUESTED + case event: Cancelled => Status.CANCELLED + case event: Completed => Status.COMPLETED + case event: Failed => Status.FAILED + } +} + diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala new file mode 100644 index 0000000..9d6a3fa --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -0,0 +1,105 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import java.io.Closeable +import java.util + +import com.google.common.annotations.VisibleForTesting +import javax.inject.Inject +import org.apache.james.eventsourcing.{AggregateId, Subscriber} +import org.apache.james.eventsourcing.eventstore.{EventStore, History} +import org.apache.james.task._ +import org.apache.james.task.eventsourcing.TaskCommand._ + +import scala.annotation.tailrec + +class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore) extends TaskManager with Closeable { + + private val executionDetailsProjection = new TaskExecutionDetailsProjection + private val recentTasksProjection = new RecentTasksProjection() + private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker) + private val delayBetweenPollingInMs = 500 + + private def workDispatcher: Subscriber = { + case Created(aggregateId, _, task) => + val taskWithId = new TaskWithId(aggregateId.taskId, task) + workQueue.submit(taskWithId, new WorkerStatusListener(taskWithId.getId, eventSourcingSystem)) + case CancelRequested(aggregateId, _) => + workQueue.cancel(aggregateId.taskId) + case _ => + } + + import scala.collection.JavaConverters._ + + private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _ + private val eventSourcingSystem = ScalaEventSourcingSystem( + handlers = Set( + new CreateCommandHandler(loadHistory), + new StartCommandHandler(loadHistory), + new RequestCancelCommandHandler(loadHistory), + new CompleteCommandHandler(loadHistory), + new CancelCommandHandler(loadHistory), + new FailCommandHandler(loadHistory)), + subscribers = Set( + executionDetailsProjection.asSubscriber, + workDispatcher, + recentTasksProjection.asSubscriber), + eventStore = eventStore) + + override def submit(task: Task): TaskId = { + val taskId = TaskId.generateTaskId + val command = Create(taskId, task) + eventSourcingSystem.dispatch(command) + taskId + } + + override def getExecutionDetails(id: TaskId): TaskExecutionDetails = executionDetailsProjection.load(id) + .getOrElse(throw new TaskNotFoundException()) + + override def list: util.List[TaskExecutionDetails] = listScala.asJava + + override def list(status: TaskManager.Status): util.List[TaskExecutionDetails] = listScala + .filter(details => details.getStatus == status) + .asJava + + private def listScala: List[TaskExecutionDetails] = recentTasksProjection + .list() + .flatMap(executionDetailsProjection.load) + + override def cancel(id: TaskId): Unit = { + val command = RequestCancel(id) + eventSourcingSystem.dispatch(command) + } + + @tailrec + override final def await(id: TaskId): TaskExecutionDetails = { + val details = getExecutionDetails(id) + if (details.getStatus.isFinished) { + details + } else { + Thread.sleep(delayBetweenPollingInMs) + await(id) + } + } + + override def close(): Unit = { + workQueue.close() + } +} \ No newline at end of file diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala new file mode 100644 index 0000000..71f1c8b --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -0,0 +1,39 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.{AggregateId, Event, EventId} +import org.apache.james.task.Task +import org.apache.james.task.Task.Result + +sealed abstract class TaskEvent(aggregateId: TaskAggregateId, val eventId: EventId) extends Event { + override def getAggregateId: TaskAggregateId = aggregateId +} + +case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, task: Task) extends TaskEvent(aggregateId, eventId) + +case class Started(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) + +case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) + +case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result) extends TaskEvent(aggregateId, eventId) + +case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) + +case class Cancelled(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala new file mode 100644 index 0000000..fced087 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala @@ -0,0 +1,40 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import java.util.concurrent.ConcurrentLinkedDeque + +import org.apache.james.eventsourcing.Subscriber +import org.apache.james.task.TaskId + +class RecentTasksProjection() { + + import scala.collection.JavaConverters._ + + private val tasks = new ConcurrentLinkedDeque[TaskId] + + def list(): List[TaskId] = tasks.asScala.toList + + private def add(taskId: TaskId): Unit = tasks.add(taskId) + + def asSubscriber: Subscriber = { + case Created(aggregateId, _, _) => add(aggregateId.taskId) + case _ => + } +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala new file mode 100644 index 0000000..16178b3 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala @@ -0,0 +1,28 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.eventstore.EventStore +import org.apache.james.eventsourcing.{CommandHandler, EventSourcingSystem, Subscriber} + +object ScalaEventSourcingSystem { + import scala.collection.JavaConverters._ + def apply(handlers: Set[CommandHandler[_]], subscribers: Set[Subscriber], eventStore: EventStore): EventSourcingSystem = + new EventSourcingSystem(handlers.asJava, subscribers.asJava, eventStore) +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala new file mode 100644 index 0000000..5fd5875 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -0,0 +1,89 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import java.util + +import org.apache.james.eventsourcing.eventstore.History +import org.apache.james.eventsourcing.{Event, EventId} +import org.apache.james.task.Task +import org.apache.james.task.Task.Result +import org.apache.james.task.TaskManager.Status + +import scala.collection.JavaConverters._ + +class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) { + + private val currentStatus: Option[Status] = history + .getEvents + .asScala + .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event)) + .status + + + def create(task: Task): util.List[Event] = { + if (currentStatus.isEmpty) { + createEventWithId(Created(aggregateId, _, task)) + } else Nil.asJava + } + + private[eventsourcing] def start(): util.List[Event] = { + currentStatus match { + case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _)) + case _ => Nil.asJava + } + } + + def requestCancel(): util.List[Event] = { + currentStatus match { + case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _)) + case _ => Nil.asJava + } + } + + private[eventsourcing] def complete(result: Result): util.List[Event] = { + currentStatus match { + case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result)) + case _ => Nil.asJava + } + } + + private[eventsourcing] def fail(): util.List[Event] = { + currentStatus match { + case Some(status) if !status.isFinished => createEventWithId(Failed(aggregateId, _)) + case _ => Nil.asJava + } + } + + private[eventsourcing] def cancel(): util.List[Event] = { + currentStatus match { + case Some(status) if !status.isFinished => createEventWithId(Cancelled(aggregateId, _)) + case _ => Nil.asJava + } + } + + private def createEventWithId(event: EventId => Event): util.List[Event] = + List(history.getNextEventId) + .map({ eventId => event(eventId) }) + .asJava +} + +object TaskAggregate { + def fromHistory(aggregateId: TaskAggregateId, history: History) = new TaskAggregate(aggregateId, history) +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala new file mode 100644 index 0000000..e6e8812 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregateId.scala @@ -0,0 +1,34 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.AggregateId +import org.apache.james.task.TaskId + +case class TaskAggregateId(taskId: TaskId) extends AggregateId { + + import TaskAggregateId._ + + override def asAggregateKey: String = s"$PREFIX$SEPARATOR${taskId.asString}" +} + +object TaskAggregateId { + private val PREFIX = "Task" + private val SEPARATOR = "/" +} \ No newline at end of file diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala new file mode 100644 index 0000000..973c8a9 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala @@ -0,0 +1,41 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.Command +import org.apache.james.task.Task.Result +import org.apache.james.task.{Task, TaskId} + +sealed trait TaskCommand extends Command + +object TaskCommand { + + case class Create(id: TaskId, task: Task) extends TaskCommand + + case class Start(id: TaskId) extends TaskCommand + + case class Complete(id: TaskId, result: Result) extends TaskCommand + + case class RequestCancel(id: TaskId) extends TaskCommand + + case class Fail(id: TaskId) extends TaskCommand + + case class Cancel(id: TaskId) extends TaskCommand + +} diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala new file mode 100644 index 0000000..2cc8419 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -0,0 +1,53 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.task.eventsourcing + +import java.util.concurrent.ConcurrentHashMap + +import org.apache.james.eventsourcing.Subscriber +import org.apache.james.task.{TaskExecutionDetails, TaskId} + +class TaskExecutionDetailsProjection() { + private[this] val projections = new ConcurrentHashMap[TaskId, TaskExecutionDetails] + + val asSubscriber: Subscriber = { + case created: Created => + update(created.getAggregateId.taskId, TaskExecutionDetails.from(created.task, created.aggregateId.taskId)) + case cancelRequested: CancelRequested => + update(cancelRequested.aggregateId.taskId)(_.cancelRequested()) + case started: Started => + update(started.aggregateId.taskId)(_.start()) + case completed: Completed => + update(completed.aggregateId.taskId)(_.completed()) + case failed: Failed => + update(failed.aggregateId.taskId)(_.failed()) + case canceled: Cancelled => + update(canceled.aggregateId.taskId)(_.cancelEffectively()) + case _ => + } + + def load(taskId: TaskId): Option[TaskExecutionDetails] = Option(projections.get(taskId)) + + private def update(taskId: TaskId, details: TaskExecutionDetails): Unit = projections.put(taskId, details) + + private def update(taskId: TaskId)(updater: TaskExecutionDetails => TaskExecutionDetails): Unit = + load(taskId) + .map(updater) + .foreach(update(taskId, _)) +} \ No newline at end of file diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala new file mode 100644 index 0000000..7e1ca87 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -0,0 +1,38 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ + +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.EventSourcingSystem +import org.apache.james.task.Task.Result +import org.apache.james.task.eventsourcing.TaskCommand._ +import org.apache.james.task.{TaskId, TaskManagerWorker} + +class WorkerStatusListener(taskId: TaskId, eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener { + + override def started(): Unit = eventSourcingSystem.dispatch(Start(taskId)) + + override def completed(result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result)) + + override def failed(t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId)) + + override def failed(): Unit = eventSourcingSystem.dispatch(Fail(taskId)) + + override def cancelled(): Unit = eventSourcingSystem.dispatch(Cancel(taskId)) +} \ No newline at end of file diff --git a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java index 9c93da4..3b5c5f8 100644 --- a/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java +++ b/server/task/src/test/java/org/apache/james/task/TaskManagerContract.java @@ -32,6 +32,12 @@ public interface TaskManagerContract { TaskManager taskManager(); @Test + default void submitShouldReturnATaskId() { + TaskId taskId = taskManager().submit(() -> Task.Result.COMPLETED); + assertThat(taskId).isNotNull(); + } + + @Test default void getStatusShouldReturnUnknownWhenUnknownId() { TaskId unknownId = TaskId.generateTaskId(); assertThatThrownBy(() -> taskManager().getExecutionDetails(unknownId)) diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java new file mode 100644 index 0000000..16b2b0b --- /dev/null +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.task.eventsourcing; + +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore; +import org.apache.james.task.CountDownLatchExtension; +import org.apache.james.task.TaskManager; +import org.apache.james.task.TaskManagerContract; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(CountDownLatchExtension.class) +class EventSourcingTaskManagerTest implements TaskManagerContract { + + private EventSourcingTaskManager taskManager; + @BeforeEach + void setUp() { + EventStore eventStore = new InMemoryEventStore(); + taskManager = new EventSourcingTaskManager(eventStore); + } + + @AfterEach + void tearDown() { + taskManager.close(); + } + + @Override + public TaskManager taskManager() { + return taskManager; + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
