This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5cafcf00056a659e371586990761bd6ba61494ba Author: Gautier DI FOLCO <[email protected]> AuthorDate: Tue Aug 27 11:01:11 2019 +0200 JAMES-2813 Inject TerminationSubscriber into EventSourcingTaskManager --- .../apache/james/DistributedTaskManagerModule.java | 4 + .../distributed/DistributedTaskManagerTest.java | 3 +- .../eventsourcing/EventSourcingTaskManager.scala | 6 +- .../apache/james/task/eventsourcing/Events.scala | 8 +- .../task/eventsourcing/TerminationSubscriber.scala | 46 +++++++ .../EventSourcingTaskManagerTest.java | 2 +- .../MemoryTerminationSubscriberTest.java | 27 ++++ .../TerminationSubscriberContract.java | 144 +++++++++++++++++++++ 8 files changed, 233 insertions(+), 7 deletions(-) diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java index 4d2b2ab..7dd21ef 100644 --- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java +++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/DistributedTaskManagerModule.java @@ -23,7 +23,9 @@ package org.apache.james; import org.apache.james.modules.server.HostnameModule; import org.apache.james.task.TaskManager; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; +import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; +import org.apache.james.task.eventsourcing.TerminationSubscriber; import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.distributed.RabbitMQWorkQueueSupplier; @@ -40,6 +42,8 @@ public class DistributedTaskManagerModule extends AbstractModule { bind(TaskManager.class).in(Scopes.SINGLETON); bind(WorkQueueSupplier.class).in(Scopes.SINGLETON); bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class); + bind(TerminationSubscriber.class).in(Scopes.SINGLETON); + bind(TerminationSubscriber.class).toInstance(new MemoryTerminationSubscriber()); bind(TaskManager.class).to(EventSourcingTaskManager.class); bind(WorkQueueSupplier.class).to(RabbitMQWorkQueueSupplier.class); } diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index ca70292..ced6652 100644 --- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -48,6 +48,7 @@ import org.apache.james.task.TaskManager; import org.apache.james.task.TaskManagerContract; import org.apache.james.task.eventsourcing.EventSourcingTaskManager; import org.apache.james.task.eventsourcing.Hostname; +import org.apache.james.task.eventsourcing.MemoryTerminationSubscriber; import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection; import org.apache.james.task.eventsourcing.WorkQueueSupplier; import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection; @@ -107,7 +108,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { } private EventSourcingTaskManager taskManager(Hostname hostname) { - return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname); + return new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, hostname, new MemoryTerminationSubscriber()); } @Test diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index f57c947..93b8302 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -34,7 +34,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] workQueueSupplier: WorkQueueSupplier, val eventStore: EventStore, val executionDetailsProjection: TaskExecutionDetailsProjection, - val hostname: Hostname) extends TaskManager with Closeable { + val hostname: Hostname, + val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable { private val delayBetweenPollingInMs = 500 @@ -60,7 +61,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] new FailCommandHandler(loadHistory)), subscribers = Set( executionDetailsProjection.asSubscriber(hostname), - workDispatcher), + workDispatcher, + terminationSubscriber), eventStore = eventStore) private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala index 6d9c7ba..68730bd 100644 --- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala @@ -28,6 +28,8 @@ sealed abstract class TaskEvent(aggregateId: TaskAggregateId, val eventId: Event override def getAggregateId: TaskAggregateId = aggregateId } +sealed abstract class TerminalTaskEvent(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) + case class Hostname(private val value: String) { def asString: String = value } @@ -50,8 +52,8 @@ case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId, hostname: Hostname) extends TaskEvent(aggregateId, eventId) -case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result) extends TaskEvent(aggregateId, eventId) +case class Completed(aggregateId: TaskAggregateId, override val eventId: EventId, result: Result) extends TerminalTaskEvent(aggregateId, eventId) -case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) +case class Failed(aggregateId: TaskAggregateId, override val eventId: EventId) extends TerminalTaskEvent(aggregateId, eventId) -case class Cancelled(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId, eventId) +case class Cancelled(aggregateId: TaskAggregateId, override val eventId: EventId) extends TerminalTaskEvent(aggregateId, eventId) diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala new file mode 100644 index 0000000..af23af4 --- /dev/null +++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala @@ -0,0 +1,46 @@ +/** ************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ + +package org.apache.james.task.eventsourcing + +import org.apache.james.eventsourcing.{Event, Subscriber} +import org.reactivestreams.Publisher + +import reactor.core.publisher.DirectProcessor + +trait TerminationSubscriber extends Subscriber { + override def handle(event: Event): Unit = event match { + case event: TerminalTaskEvent => addEvent(event) + case _ => + } + + def addEvent(event: Event): Unit + + def listenEvents: Publisher[Event] +} + +class MemoryTerminationSubscriber extends TerminationSubscriber { + private val events = DirectProcessor.create[Event]() + + override def addEvent(event: Event): Unit = + events.onNext(event) + + override def listenEvents: Publisher[Event] = + events.share() +} \ No newline at end of file diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index 281e217..cb5f19a 100644 --- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -61,7 +61,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskManagerWorker worker = new SerialTaskManagerWorker(listener); return new MemoryWorkQueue(worker); }; - taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME); + taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection, HOSTNAME, new MemoryTerminationSubscriber()); } @AfterEach diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java new file mode 100644 index 0000000..b8d2707 --- /dev/null +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/MemoryTerminationSubscriberTest.java @@ -0,0 +1,27 @@ +/** + * ************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ***************************************************************/ + +package org.apache.james.task.eventsourcing; + +class MemoryTerminationSubscriberTest implements TerminationSubscriberContract { + public TerminationSubscriber subscriber() { + return new MemoryTerminationSubscriber(); + } +} diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java new file mode 100644 index 0000000..5b0f5ff --- /dev/null +++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java @@ -0,0 +1,144 @@ +/** + * ************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ***************************************************************/ + +package org.apache.james.task.eventsourcing; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Duration; +import java.util.List; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.task.Task; +import org.apache.james.task.TaskId; + +import org.assertj.core.api.ListAssert; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + +public interface TerminationSubscriberContract { + + Completed COMPLETED_EVENT = new Completed(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), Task.Result.COMPLETED); + Failed FAILED_EVENT = new Failed(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42)); + Cancelled CANCELLED_EVENT = new Cancelled(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42)); + Duration DELAY_BETWEEN_EVENTS = Duration.ofMillis(50); + Duration DELAY_BEFORE_PUBLISHING = Duration.ofMillis(50); + + TerminationSubscriber subscriber(); + + @Test + default void handlingCompletedShouldBeListed() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, COMPLETED_EVENT); + + assertEvents(subscriber).containsOnly(COMPLETED_EVENT); + } + + @Test + default void handlingFailedShouldBeListed() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, FAILED_EVENT); + + assertEvents(subscriber).containsOnly(FAILED_EVENT); + } + + @Test + default void handlingCancelledShouldBeListed() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, CANCELLED_EVENT); + + assertEvents(subscriber).containsOnly(CANCELLED_EVENT); + } + + @Test + default void handlingNonTerminalEventShouldNotBeListed() { + TerminationSubscriber subscriber = subscriber(); + TaskEvent event = new Started(new TaskAggregateId(TaskId.generateTaskId()), EventId.fromSerialized(42), new Hostname("foo")); + + sendEvents(subscriber, event); + + assertEvents(subscriber).isEmpty(); + } + + @Test + default void handlingMultipleEventsShouldBeListed() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); + + assertEvents(subscriber).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); + } + + @Test + default void multipleListeningEventsShouldShareEvents() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); + + List<List<Event>> listenedEvents = Flux.range(0, 2) + .subscribeOn(Schedulers.elastic()) + .flatMap(ignored -> collectEvents(subscriber)) + .collectList() + .block(); + assertThat(listenedEvents).hasSize(2); + assertThat(listenedEvents.get(0)).containsExactly(COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); + assertThat(listenedEvents.get(1)).isEqualTo(listenedEvents.get(0)); + } + + @Test + default void dynamicListeningEventsShouldGetOnlyNewEvents() { + TerminationSubscriber subscriber = subscriber(); + + sendEvents(subscriber, COMPLETED_EVENT, FAILED_EVENT, CANCELLED_EVENT); + + List<Event> listenedEvents = Mono.delay(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(3).dividedBy(2))) + .then(Mono.defer(() -> collectEvents(subscriber))) + .subscribeOn(Schedulers.elastic()) + .block(); + assertThat(listenedEvents).containsExactly(FAILED_EVENT, CANCELLED_EVENT); + } + + default ListAssert<Event> assertEvents(TerminationSubscriber subscriber) { + return assertThat(collectEvents(subscriber) + .block()); + } + + default Mono<List<Event>> collectEvents(TerminationSubscriber subscriber) { + return Flux.from(subscriber.listenEvents()) + .subscribeOn(Schedulers.elastic()) + .take(DELAY_BEFORE_PUBLISHING.plus(DELAY_BETWEEN_EVENTS.multipliedBy(7))) + .collectList(); + } + + default void sendEvents(TerminationSubscriber subscriber, Event... events) { + Mono.delay(DELAY_BEFORE_PUBLISHING) + .flatMapMany(ignored -> Flux.fromArray(events) + .subscribeOn(Schedulers.elastic()) + .delayElements(DELAY_BETWEEN_EVENTS) + .doOnNext(subscriber::handle)) + .subscribe(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
