This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1b1ee833b339a80ac949f6350e6c0a6d8e847e1b Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Oct 9 15:21:47 2019 +0200 JAMES-2813 wire UpdateAdditionalInformation command in the Event Sourcing system --- .../distributed/TasksSerializationModule.java | 11 ++++++++- .../eventsourcing/distributed/TaskEventDTO.scala | 19 +++++++++++++++ .../apache/james/task/SerialTaskManagerWorker.java | 2 +- .../james/task/eventsourcing/CommandHandlers.scala | 8 +++++++ .../task/eventsourcing/DecisionProjection.scala | 27 +++++++++++----------- .../eventsourcing/EventSourcingTaskManager.scala | 3 ++- .../task/eventsourcing/WorkerStatusListener.scala | 6 ++--- .../james/task/SerialTaskManagerWorkerTest.java | 5 ++-- 8 files changed, 59 insertions(+), 22 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java index 07cadec..ed2a80a 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java @@ -26,6 +26,7 @@ import java.util.stream.Stream; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.server.task.json.JsonTaskAdditionalInformationsSerializer; import org.apache.james.server.task.json.JsonTaskSerializer; +import org.apache.james.task.eventsourcing.AdditionalInformationUpdated; import org.apache.james.task.eventsourcing.CancelRequested; import org.apache.james.task.eventsourcing.Cancelled; import org.apache.james.task.eventsourcing.Completed; @@ -90,8 +91,16 @@ public interface TasksSerializationModule { .typeName("task-manager-cancelled") .withFactory(EventDTOModule::new); + TaskSerializationModuleFactory<AdditionalInformationUpdated, AdditionalInformationUpdatedDTO> UPDATED = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> EventDTOModule + .forEvent(AdditionalInformationUpdated.class) + .convertToDTO(AdditionalInformationUpdatedDTO.class) + .toDomainObjectConverter(dto -> dto.toDomainObject(jsonTaskAdditionalInformationsSerializer)) + .toDTOConverter((event, typeName) -> AdditionalInformationUpdatedDTO.fromDomainObject(jsonTaskAdditionalInformationsSerializer, event, typeName)) + .typeName("task-manager-updated") + .withFactory(EventDTOModule::new); + BiFunction<JsonTaskSerializer, JsonTaskAdditionalInformationsSerializer, List<EventDTOModule<?, ?>>> MODULES = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> Stream - .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED) + .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED, UPDATED) .map(moduleFactory -> moduleFactory.create(jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer)) .collect(Guavate.toImmutableList()); } diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala index 978d08e..4730a4c 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala @@ -27,6 +27,7 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO import org.apache.james.server.task.json.{JsonTaskAdditionalInformationsSerializer, JsonTaskSerializer} import org.apache.james.task.eventsourcing._ import org.apache.james.task.{Hostname, Task, TaskId} + import scala.compat.java8.OptionConverters._ sealed abstract class TaskEventDTO(val getType: String, val getAggregate: String, val getEvent: Int) extends EventDTO { @@ -140,3 +141,21 @@ object CancelledDTO { CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations) } } + +case class AdditionalInformationUpdatedDTO(@JsonProperty("type") typeName: String, + @JsonProperty("aggregate") aggregateId: String, + @JsonProperty("event") eventId: Int, + @JsonProperty("additionalInformation") getAdditionalInformation: String) + extends TaskEventDTO(typeName, aggregateId, eventId) { + def toDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer): AdditionalInformationUpdated = { + val deserializedAdditionalInformation = jsonTaskAdditionalInformationsSerializer.deserialize(getAdditionalInformation) + AdditionalInformationUpdated(domainAggregateId, domainEventId, deserializedAdditionalInformation) + } +} + +object AdditionalInformationUpdatedDTO { + def fromDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer)(event: AdditionalInformationUpdated, typeName: String): AdditionalInformationUpdatedDTO = { + val serializedAdditionalInformations = jsonTaskAdditionalInformationsSerializer.serialize(event.additionalInformation) + AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations) + } +} diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 8fb0e15..60b5a16 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -114,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .delayElement(Duration.ofSeconds(1)) .repeat() .flatMap(Mono::justOrEmpty) - .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information)); + .doOnNext(information -> listener.updated(taskWithId.getId(), information)); } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index 1184988..9db161e 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -81,4 +81,12 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex override def handle(command: Fail): util.List[_ <: Event] = { loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception) } +} + +class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] { + override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation] + + override def handle(command: UpdateAdditionalInformation): util.List[_ <: Event] = { + loadAggregate(loadHistory, command.id).update(command.additionalInformation) + } } \ No newline at end of file diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala index d5394bb..b5a460f 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala @@ -22,22 +22,23 @@ import org.apache.james.eventsourcing.Event import org.apache.james.task.TaskManager.Status case class DecisionProjection(status: Option[Status]) { - val update: Event => DecisionProjection = - DecisionProjection.create + def update(event: Event): DecisionProjection = { + DecisionProjection( + event match { + case event: Created => Some(Status.WAITING) + case event: Started => Some(Status.IN_PROGRESS) + case event: CancelRequested => Some(Status.CANCEL_REQUESTED) + case event: Cancelled => Some(Status.CANCELLED) + case event: Completed => Some(Status.COMPLETED) + case event: Failed => Some(Status.FAILED) + case event: AdditionalInformationUpdated => status + } + ) + } + } object DecisionProjection { - def create(event: Event): DecisionProjection = DecisionProjection(Some(fromEvent(event))) - def empty: DecisionProjection = DecisionProjection(None) - - private def fromEvent(event: Event): Status = event match { - case event: Created => Status.WAITING - case event: Started => Status.IN_PROGRESS - case event: CancelRequested => Status.CANCEL_REQUESTED - case event: Cancelled => Status.CANCELLED - case event: Completed => Status.COMPLETED - case event: Failed => Status.FAILED - } } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 20cf21d..35856b0 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -59,7 +59,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] new RequestCancelCommandHandler(loadHistory, hostname), new CompleteCommandHandler(loadHistory), new CancelCommandHandler(loadHistory), - new FailCommandHandler(loadHistory)), + new FailCommandHandler(loadHistory), + new UpdateCommandHandler(loadHistory)), subscribers = Set( executionDetailsProjection.asSubscriber(hostname), workDispatcher, diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala index b6a3f80..aa93d82 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -21,13 +21,12 @@ package org.apache.james.task.eventsourcing import java.util.Optional +import com.google.common.base.Throwables import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.task.Task.Result import org.apache.james.task.eventsourcing.TaskCommand._ import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker} -import com.google.common.base.Throwables - import scala.compat.java8.OptionConverters._ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener { @@ -49,5 +48,6 @@ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extend override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit = eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala )) - override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = ??? + override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = + eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation)) } \ No newline at end of file diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index 7448a20..b693347 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -100,10 +100,9 @@ class SerialTaskManagerWorkerTest { worker.executeTask(taskWithId).block(); - verify(listener, atMost(2)).updated(eq(taskWithId.getId()), notNull()); + verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull()); } - @Test void aRunningTaskShouldEmitAtMostOneInformationPerSecond() { TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) -> @@ -115,7 +114,7 @@ class SerialTaskManagerWorkerTest { worker.executeTask(taskWithId).block(); - verify(listener, times(2)).updated(eq(taskWithId.getId()), notNull()); + verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull()); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
