This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ad2c55cd985fd4f55fb21f67d78941a0ed169fbb Author: Matthieu Baechler <[email protected]> AuthorDate: Tue Oct 8 10:25:30 2019 +0200 JAMES-2813 refactor TaskAggregate to prevent creation of the aggregate if History doesn't start with Created event --- .../james/task/eventsourcing/CommandHandlers.scala | 2 +- .../james/task/eventsourcing/TaskAggregate.scala | 68 ++++++++++++---------- .../task/eventsourcing/TaskAggregateTest.java | 21 +++++-- 3 files changed, 55 insertions(+), 36 deletions(-) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index ddda76a..1184988 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -37,7 +37,7 @@ class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, override def handledClass: Class[Create] = classOf[Create] override def handle(command: Create): util.List[_ <: Event] = { - loadAggregate(loadHistory, command.id).create(command.task, hostname) + TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname) } } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 0c158ba..1a2094b 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -31,63 +31,67 @@ import scala.collection.JavaConverters._ class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) { - private val currentStatus: Option[Status] = history + history.getEvents.asScala.headOption match { + case Some(Created(_, _, _, _)) => + case _ => throw new IllegalArgumentException("History must start with Created event") + } + + private val currentStatus: Status = history .getEvents .asScala .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event)) .status - - - def create(task: Task, hostname: Hostname): util.List[Event] = { - if (currentStatus.isEmpty) { - createEventWithId(Created(aggregateId, _, task, hostname)) - } else Nil.asJava - } + .get private[eventsourcing] def start(hostname: Hostname): util.List[Event] = { - currentStatus match { - case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname)) - case _ => Nil.asJava + if (!currentStatus.isFinished) { + createEventWithId(Started(aggregateId, _, hostname)) + } else { + Nil.asJava } } def requestCancel(hostname: Hostname): util.List[Event] = { - currentStatus match { - case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _, hostname)) - case _ => Nil.asJava + if (!currentStatus.isFinished) { + createEventWithId(CancelRequested(aggregateId, _, hostname)) + } else { + Nil.asJava } } private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = { currentStatus match { - case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) - case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) - case Some(Status.COMPLETED) => Nil.asJava - case Some(Status.FAILED) => Nil.asJava - case Some(Status.WAITING) => Nil.asJava - case Some(Status.CANCELLED) => Nil.asJava + case Status.IN_PROGRESS => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + case Status.CANCEL_REQUESTED => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + case Status.COMPLETED => Nil.asJava + case Status.FAILED => Nil.asJava + case Status.WAITING => Nil.asJava + case Status.CANCELLED => Nil.asJava case _ => Nil.asJava } } private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = { - currentStatus match { - case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation)) - case _ => Nil.asJava + if (!currentStatus.isFinished) { + createEventWithId(Completed(aggregateId, _, result, additionalInformation)) + } else { + Nil.asJava } } private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): util.List[Event] = { - currentStatus match { - case Some(status) if !status.isFinished => createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception)) - case _ => Nil.asJava + if (!currentStatus.isFinished) { + createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception)) + } else { + Nil.asJava } } private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): util.List[Event] = { - currentStatus match { - case Some(status) if !status.isFinished => createEventWithId(Cancelled(aggregateId, _, additionalInformation)) - case _ => Nil.asJava + if (!currentStatus.isFinished) { + createEventWithId(Cancelled(aggregateId, _, additionalInformation)) + } else { + Nil.asJava } } @@ -98,5 +102,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor } object TaskAggregate { - def fromHistory(aggregateId: TaskAggregateId, history: History) = new TaskAggregate(aggregateId, history) + def fromHistory(aggregateId: TaskAggregateId, history: History): TaskAggregate = new TaskAggregate(aggregateId, history) + + def create(aggregateId: TaskAggregateId, task: Task, hostname: Hostname): util.List[Event] = { + List[Event](Created(aggregateId, EventId.first(), task, hostname)).asJava + } } diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java index 03797da..075deda 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java @@ -19,6 +19,7 @@ package org.apache.james.task.eventsourcing; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Arrays; import java.util.function.Function; @@ -35,8 +36,7 @@ import org.apache.james.task.TaskId; import org.junit.jupiter.api.Test; import com.google.common.collect.Streams; -import scala.None; -import scala.None$; +import scala.Option; class TaskAggregateTest { @@ -53,6 +53,17 @@ class TaskAggregateTest { } @Test + void TaskAggregateShouldThrowWhenHistoryDoesntStartWithCreatedEvent() { + assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, buildHistory(eventId -> Started.apply(ID, eventId, HOSTNAME)))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + void TaskAggregateShouldThrowWhenEmptyHistory() { + assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, History.empty())).isInstanceOf(IllegalArgumentException.class); + } + + @Test void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { History history = buildHistory( eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME) @@ -90,7 +101,7 @@ class TaskAggregateTest { History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), eventId -> Started.apply(ID, eventId, HOSTNAME), - eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, task.type(), None$.empty()) + eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); @@ -102,7 +113,7 @@ class TaskAggregateTest { History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), eventId -> Started.apply(ID, eventId, HOSTNAME), - eventId -> Failed.apply(ID, eventId, task.type(), None$.empty()) + eventId -> Failed.apply(ID, eventId, Option.empty(), Option.empty(), Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); @@ -114,7 +125,7 @@ class TaskAggregateTest { History history = buildHistory( eventId -> Created.apply(ID, eventId, task, HOSTNAME), eventId -> Started.apply(ID, eventId, HOSTNAME), - eventId -> Cancelled.apply(ID, eventId, task.type(), None$.empty()) + eventId -> Cancelled.apply(ID, eventId, Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
