This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b483682befaed94e483e9137546e93ae9bc1bdbe Author: Matthieu Baechler <[email protected]> AuthorDate: Mon Oct 7 17:23:54 2019 +0200 JAMES-2813 handle UpdateAdditionalInformation command into the aggregate --- .../apache/james/task/eventsourcing/Events.scala | 2 + .../james/task/eventsourcing/TaskAggregate.scala | 12 ++ .../james/task/eventsourcing/TaskCommand.scala | 2 + .../task/eventsourcing/TaskAggregateTest.java | 122 +++++++++++++++++++++ 4 files changed, 138 insertions(+) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala index f31c33a..42b4cc0 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -33,6 +33,8 @@ case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) +case class AdditionalInformationUpdated(aggregateId: TaskAggregateId, override val eventId: EventId, additionalInformation: AdditionalInformation) extends TaskEvent(aggregateId, eventId) + case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result, additionalInformation: Option[AdditionalInformation]) extends TerminalTaskEvent(aggregateId, eventId, additionalInformation) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 9b902d2..0c158ba 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -58,6 +58,18 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor } } + private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = { + currentStatus match { + case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) + case Some(Status.COMPLETED) => Nil.asJava + case Some(Status.FAILED) => Nil.asJava + case Some(Status.WAITING) => Nil.asJava + case Some(Status.CANCELLED) => Nil.asJava + case _ => Nil.asJava + } + } + private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = { currentStatus match { case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation)) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala index 6c8b20e..15abba5 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskCommand.scala @@ -31,6 +31,8 @@ object TaskCommand { case class Start(id: TaskId) extends TaskCommand + case class UpdateAdditionalInformation(id: TaskId, additionalInformation: AdditionalInformation) extends TaskCommand + case class Complete(id: TaskId, result: Result, additionalInformation: Option[AdditionalInformation]) extends TaskCommand case class RequestCancel(id: TaskId) extends TaskCommand diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java new file mode 100644 index 0000000..03797da --- /dev/null +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java @@ -0,0 +1,122 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.task.eventsourcing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.eventstore.History; +import org.apache.james.task.Hostname; +import org.apache.james.task.MemoryReferenceWithCounterTask; +import org.apache.james.task.Task; +import org.apache.james.task.TaskId; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Streams; +import scala.None; +import scala.None$; + +class TaskAggregateTest { + + static final Hostname HOSTNAME = Hostname.apply("foo"); + static final TaskAggregateId ID = TaskAggregateId.apply(TaskId.generateTaskId()); + + History buildHistory(Function<EventId, Event>... events) { + return History.of( + Streams.zip( + Stream.iterate(EventId.first(), EventId::next), + Arrays.stream(events), + (id, event) -> event.apply(id)) + .collect(Collectors.toList())); + } + + @Test + void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + History history = buildHistory( + eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); + } + + @Test + void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() { + History history = buildHistory( + eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))) + .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3))); + } + + @Test + void givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() { + History history = buildHistory( + eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> CancelRequested.apply(ID, eventId, HOSTNAME) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))) + .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3))); + } + + @Test + void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); + History history = buildHistory( + eventId -> Created.apply(ID, eventId, task, HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, task.type(), None$.empty()) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); + } + + @Test + void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); + History history = buildHistory( + eventId -> Created.apply(ID, eventId, task, HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> Failed.apply(ID, eventId, task.type(), None$.empty()) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); + } + + @Test + void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() { + MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED); + History history = buildHistory( + eventId -> Created.apply(ID, eventId, task, HOSTNAME), + eventId -> Started.apply(ID, eventId, HOSTNAME), + eventId -> Cancelled.apply(ID, eventId, task.type(), None$.empty()) + ); + TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); + assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
