This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 21c0033b3bfc7fad82a6f92ceb0ece3d0b9ee9a3 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Wed Mar 25 15:10:22 2020 +0100 JAMES-3133 Reactify the event system --- event-sourcing/event-sourcing-core/pom.xml | 4 ++ .../apache/james/eventsourcing/CommandHandler.java | 6 ++- .../eventsourcing/javaapi/CommandHandlerJava.java | 34 ------------ .../james/eventsourcing/CommandDispatcher.scala | 56 +++++++++---------- .../org/apache/james/eventsourcing/EventBus.scala | 22 ++++++-- .../james/eventsourcing/EventSourcingSystem.scala | 3 +- .../eventsourcing/EventSourcingSystemTest.scala | 55 ++++++++++--------- .../org/apache/james/eventsourcing/Event.scala | 3 +- event-sourcing/event-store-api/pom.xml | 9 ++++ .../eventsourcing/eventstore/EventStore.scala | 11 ++-- .../eventstore/EventStoreContract.scala | 23 ++++---- event-sourcing/event-store-cassandra/pom.xml | 3 +- .../eventstore/cassandra/CassandraEventStore.scala | 28 ++++++---- .../eventstore/cassandra/EventStoreDao.scala | 4 +- event-sourcing/event-store-memory/pom.xml | 4 ++ .../eventstore/memory/InMemoryEventStore.scala | 22 +++++--- .../commands/DetectThresholdCrossingHandler.java | 18 ++++--- .../listeners/QuotaThresholdCrossingListener.java | 10 ++-- pom.xml | 5 ++ .../james/dlp/api/DLPConfigurationLoader.java | 3 +- .../dlp/api/DLPConfigurationStoreContract.java | 23 ++++---- .../jmap/api/filtering/FilteringManagement.java | 3 +- .../filtering/impl/DefineRulesCommandHandler.java | 16 +++--- .../impl/EventSourcingFilteringManagement.java | 15 +++--- .../api/filtering/FilteringManagementContract.java | 14 ++--- .../EventSourcingDLPConfigurationStore.java | 21 ++++---- .../commands/ClearCommandHandler.java | 15 +++--- .../commands/StoreCommandHandler.java | 15 +++--- .../transport/matchers/dlp/DlpRulesLoader.java | 4 +- .../james/jmap/draft/methods/GetFilterMethod.java | 29 +++++----- .../james/jmap/mailet/filter/JMAPFiltering.java | 8 ++- .../webadmin/routes/DLPConfigurationRoutes.java | 3 +- .../EventsourcingConfigurationManagement.java | 15 +++--- .../RegisterConfigurationCommandHandler.java | 16 +++--- .../EventsourcingConfigurationManagementTest.java | 19 ++++--- .../distributed/RabbitMQWorkQueue.java | 16 +++--- .../eventsourcing/distributed/ImmediateWorker.java | 10 ++-- .../org/apache/james/task/MemoryTaskManager.java | 36 +++++++------ .../apache/james/task/SerialTaskManagerWorker.java | 59 ++++++++++---------- .../org/apache/james/task/TaskManagerWorker.java | 18 ++++--- .../james/task/eventsourcing/CommandHandlers.scala | 63 ++++++++++++---------- .../eventsourcing/EventSourcingTaskManager.scala | 9 ++-- .../task/eventsourcing/WorkerStatusListener.scala | 17 +++--- .../james/task/SerialTaskManagerWorkerTest.java | 8 +++ .../EventSourcingTaskManagerTest.java | 8 +-- 45 files changed, 439 insertions(+), 344 deletions(-) diff --git a/event-sourcing/event-sourcing-core/pom.xml b/event-sourcing/event-sourcing-core/pom.xml index 078e693..9497ae1 100644 --- a/event-sourcing/event-sourcing-core/pom.xml +++ b/event-sourcing/event-sourcing-core/pom.xml @@ -56,6 +56,10 @@ <artifactId>guavate</artifactId> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-scala-extensions_${scala.base}</artifactId> + </dependency> + <dependency> <groupId>javax.inject</groupId> <artifactId>javax.inject</artifactId> </dependency> diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java index 6e3645b..dd69083 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java +++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java @@ -18,10 +18,12 @@ ****************************************************************/ package org.apache.james.eventsourcing; -import scala.collection.immutable.List; +import java.util.List; + +import org.reactivestreams.Publisher; public interface CommandHandler<C extends Command> { Class<C> handledClass(); - List<? extends Event> handle(C command); + Publisher<List<? extends Event>> handle(C command); } diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java deleted file mode 100644 index ce9cfc9..0000000 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java +++ /dev/null @@ -1,34 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ -package org.apache.james.eventsourcing.javaapi; - -import org.apache.james.eventsourcing.Command; -import org.apache.james.eventsourcing.CommandHandler; -import org.apache.james.eventsourcing.Event; - -import scala.jdk.javaapi.CollectionConverters; - -public interface CommandHandlerJava<C extends Command> extends CommandHandler<C> { - - java.util.List<? extends Event> handleJava(C command); - - default scala.collection.immutable.List<? extends Event> handle(C command) { - return CollectionConverters.asScala(handleJava(command)).toList(); - } -} diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala index 8dde47f..05abc6f 100644 --- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala @@ -18,11 +18,17 @@ ****************************************************************/ package org.apache.james.eventsourcing -import com.google.common.base.Preconditions +import java.util + import javax.inject.Inject + import org.apache.james.eventsourcing.eventstore.EventStoreFailedException +import org.reactivestreams.Publisher + +import com.google.common.base.Preconditions +import reactor.core.scala.publisher.SMono -import scala.util.{Failure, Success, Try} +import scala.jdk.CollectionConverters._ object CommandDispatcher { private val MAX_RETRY = 10 @@ -45,9 +51,16 @@ object CommandDispatcher { class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandler[_ <: Command]]) { Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION) - def dispatch(c: Command): Unit = { - trySeveralTimes(() => tryDispatch(c)) - .getOrElse(() => throw CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY)) + def dispatch(c: Command): Publisher[Void] = { + tryDispatch(c) + .retry(CommandDispatcher.MAX_RETRY, { + case _: EventStoreFailedException => true + case _ => false + }) + .onErrorMap({ + case _: EventStoreFailedException => CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY) + case error => error + }) } private def hasOnlyOneHandlerByCommand(handlers: Set[CommandHandler[_ <: Command]]): Boolean = @@ -58,35 +71,22 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: Command]] = handlers.map(handler => (handler.handledClass, handler)).toMap - - private def trySeveralTimes(singleTry: () => Boolean): Option[Unit] = - 0.until(CommandDispatcher.MAX_RETRY) - .find(_ => singleTry()) - .map(_ => ()) - - - private def tryDispatch(c: Command): Boolean = { - val maybeEvents: Option[Try[List[_ <: Event]]] = handleCommand(c) - maybeEvents match { - case Some(eventsTry) => - eventsTry - .flatMap(events => Try(eventBus.publish(events))) match { - case Success(_) => true - case Failure(_: EventStoreFailedException) => false - case Failure(e) => throw e - } + private def tryDispatch(c: Command): SMono[Void] = { + handleCommand(c) match { + case Some(eventsPublisher) => + SMono(eventsPublisher) + .flatMap(events => eventBus.publish(events.asScala)) case _ => - throw CommandDispatcher.UnknownCommandException(c) + SMono.raiseError(CommandDispatcher.UnknownCommandException(c)) } } - private def handleCommand(c: Command): Option[Try[List[_ <: Event]]] = { + private def handleCommand(c: Command): Option[Publisher[util.List[_ <: Event]]] = { handlersByClass .get(c.getClass) .map(commandHandler => - Try( - commandHandler - .asInstanceOf[CommandHandler[c.type]] - .handle(c))) + commandHandler + .asInstanceOf[CommandHandler[c.type]] + .handle(c)) } } \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala index 9d527d3..ca8d753 100644 --- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala @@ -19,22 +19,34 @@ package org.apache.james.eventsourcing import javax.inject.Inject + import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException} +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory +import reactor.core.scala.publisher.{SFlux, SMono} + object EventBus { private val LOGGER = LoggerFactory.getLogger(classOf[EventBus]) } class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) { @throws[EventStoreFailedException] - def publish(events: List[Event]): Unit = { - eventStore.appendAll(events) - events - .flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))) - .foreach {case (event, subscriber) => handle(event, subscriber)} + def publish(events: Iterable[Event]): SMono[Void] = { + SMono(eventStore.appendAll(events)) + .`then`(runHandlers(events, subscribers)) + } + def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] = { + SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber)))) + .flatMap(infos => runHandler(infos._1, infos._2)) + .`then`() + .`then`(SMono.empty) + } + + def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] = SMono.fromCallable(() => handle(event, subscriber)).`then`(SMono.empty) + private def handle(event : Event, subscriber: Subscriber) : Unit = { try { subscriber.handle(event) diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala index 2f3b71b..5cc52f2 100644 --- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala @@ -19,6 +19,7 @@ package org.apache.james.eventsourcing import org.apache.james.eventsourcing.eventstore.EventStore +import org.reactivestreams.Publisher object EventSourcingSystem { @@ -36,5 +37,5 @@ class EventSourcingSystem(handlers: Set[CommandHandler[_ <: Command]], private val eventBus = new EventBus(eventStore, subscribers) private val commandDispatcher = new CommandDispatcher(eventBus, handlers) - def dispatch(c: Command): Unit = commandDispatcher.dispatch(c) + def dispatch(c: Command): Publisher[Void] = commandDispatcher.dispatch(c) } \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala index 948867a..5555907 100644 --- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala +++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala @@ -18,7 +18,8 @@ ****************************************************************/ package org.apache.james.eventsourcing -import com.google.common.base.Splitter +import java.util.{List => JavaList} + import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} import org.junit.jupiter.api.Test @@ -26,8 +27,12 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{doThrow, mock, when} import org.mockito.internal.matchers.InstanceOf import org.mockito.internal.progress.ThreadSafeMockingProgress +import org.reactivestreams.Publisher + +import com.google.common.base.Splitter +import reactor.core.publisher.Mono +import reactor.core.scala.publisher.SMono -import scala.collection.immutable.List import scala.jdk.CollectionConverters._ object EventSourcingSystemTest { @@ -59,7 +64,7 @@ trait EventSourcingSystemTest { def dispatchShouldApplyCommandHandlerThenCallSubscribers(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block() assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1) } @@ -70,7 +75,7 @@ trait EventSourcingSystemTest { Set(simpleDispatcher(eventStore)), Set((_: Event) => throw new RuntimeException, subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block() assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1) } @@ -78,13 +83,13 @@ trait EventSourcingSystemTest { def throwingStoreShouldNotLeadToPublishing() : Unit = { val eventStore = mock(classOf[EventStore]) doThrow(new RuntimeException).when(eventStore).appendAll(EventSourcingSystemTest.anyScalaList) - when(eventStore.getEventsOfAggregate(any)).thenReturn(History.empty) + when(eventStore.getEventsOfAggregate(any)).thenReturn(SMono.just(History.empty)) val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem( Set(simpleDispatcher(eventStore)), Set((_: Event) => throw new RuntimeException, subscriber), eventStore) - assertThatThrownBy(() => eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))) + assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()) .isInstanceOf(classOf[RuntimeException]) assertThat(subscriber.getData.asJava).isEmpty() } @@ -93,17 +98,17 @@ trait EventSourcingSystemTest { def dispatchShouldApplyCommandHandlerThenStoreGeneratedEvents(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block() val expectedEvent = TestEvent(EventId.first, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1) - assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent) + assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent) } @Test def dispatchShouldCallSubscriberForSubsequentCommands(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2)) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block() + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block() assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1, EventSourcingSystemTest.PAYLOAD_2) } @@ -111,18 +116,18 @@ trait EventSourcingSystemTest { def dispatchShouldStoreEventsForSubsequentCommands(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2)) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block() + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block() val expectedEvent1 = TestEvent(EventId.first, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1) val expectedEvent2 = TestEvent(expectedEvent1.eventId.next, EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_2) - assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent1, expectedEvent2) + assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent1, expectedEvent2) } @Test def dispatcherShouldBeAbleToReturnSeveralEvents(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), eventStore) - eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand("This is a test")) + Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand("This is a test"))).block() assertThat(subscriber.getData.asJava).containsExactly("This", "is", "a", "test") } @@ -130,7 +135,7 @@ trait EventSourcingSystemTest { def unknownCommandsShouldBeIgnored(eventStore: EventStore) : Unit = { val subscriber = new DataCollectorSubscriber val eventSourcingSystem = new EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), eventStore) - assertThatThrownBy(() => eventSourcingSystem.dispatch(new Command() {})) + assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new Command() {})).block()) .isInstanceOf(classOf[CommandDispatcher.UnknownCommandException]) } @@ -146,22 +151,22 @@ trait EventSourcingSystemTest { def simpleDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() { override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand] - override def handle(myCommand: EventSourcingSystemTest.MyCommand): List[TestEvent] = { - val history = eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID) - List(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)) + override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = { + SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)) + .map(history => Seq(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)).asJava) } } def wordCuttingDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() { override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand] - override def handle(myCommand: EventSourcingSystemTest.MyCommand): List[TestEvent] = { - val history = eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID) - val eventIdIncrementer = new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId) - Splitter.on(" ").splitToList(myCommand.getPayload) - .asScala - .toList - .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word)) + override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = { + SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)) + .map(history => new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId)) + .map(eventIdIncrementer => Splitter.on(" ").splitToList(myCommand.getPayload) + .asScala + .toList + .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word)).asJava) } } } \ No newline at end of file diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala index 79b80bc..5440ab0 100644 --- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala +++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala @@ -19,7 +19,8 @@ package org.apache.james.eventsourcing object Event { - def belongsToSameAggregate(events: List[_ <: Event]): Boolean = events + def belongsToSameAggregate(events: Iterable[_ <: Event]): Boolean = events + .toSeq .view .map(event => event.getAggregateId) .distinct diff --git a/event-sourcing/event-store-api/pom.xml b/event-sourcing/event-store-api/pom.xml index 44064ce..2bffd38 100644 --- a/event-sourcing/event-store-api/pom.xml +++ b/event-sourcing/event-store-api/pom.xml @@ -52,6 +52,15 @@ <artifactId>guavate</artifactId> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-scala-extensions_${scala.base}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.reactivestreams</groupId> + <artifactId>reactive-streams</artifactId> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> </dependency> diff --git a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala index b90c314..fcfac47 100644 --- a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala +++ b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala @@ -22,20 +22,19 @@ import org.apache.james.eventsourcing.{AggregateId, Event} import scala.annotation.varargs import scala.jdk.CollectionConverters._ +import org.reactivestreams.Publisher trait EventStore { - def append(event: Event): Unit = appendAll(List(event)) + def append(event: Event): Publisher[Void] = appendAll(List(event)) @varargs - def appendAll(events: Event*): Unit = appendAll(events.toList) - - def appendAll(events: java.util.List[Event]): Unit = appendAll(events.asScala.toList) + def appendAll(events: Event*): Publisher[Void] = appendAll(events.toList) /** * This method should check that no input event has an id already stored and throw otherwise * It should also check that all events belong to the same aggregate */ - def appendAll(events: List[Event]): Unit + def appendAll(events: Iterable[Event]): Publisher[Void] - def getEventsOfAggregate(aggregateId: AggregateId): History + def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History] } \ No newline at end of file diff --git a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala index f519a9c..23f454b 100644 --- a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala +++ b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala @@ -22,6 +22,9 @@ import org.apache.james.eventsourcing.{EventId, TestAggregateId, TestEvent} import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy} import org.junit.jupiter.api.Test +import reactor.core.scala.publisher +import reactor.core.scala.publisher.SMono + object EventStoreContract { val AGGREGATE_1 = TestAggregateId(1) val AGGREGATE_2 = TestAggregateId(2) @@ -38,35 +41,35 @@ trait EventStoreContract { def appendShouldThrowWhenEventFromSeveralAggregates(testee: EventStore) : Unit = { val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") val event2 = TestEvent(event1.eventId.next, EventStoreContract.AGGREGATE_2, "second") - assertThatThrownBy(() => testee.appendAll(event1, event2)) + assertThatThrownBy(() => SMono(testee.appendAll(event1, event2)).block()) .isInstanceOf(classOf[IllegalArgumentException]) } @Test def appendShouldDoNothingOnEmptyEventList(testee: EventStore) : Unit = - assertThatCode(() => testee.appendAll()) + assertThatCode(() => SMono(testee.appendAll()).block()) .doesNotThrowAnyException() @Test def appendShouldThrowWhenTryingToRewriteHistory(testee: EventStore) : Unit = { val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") - testee.append(event1) + SMono(testee.append(event1)).block() val event2 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "second") assertThatThrownBy( - () => testee.append(event2)) + () => SMono(testee.append(event2)).block()) .isInstanceOf(classOf[EventStoreFailedException]) } @Test def getEventsOfAggregateShouldReturnEmptyHistoryWhenUnknown(testee: EventStore) : Unit = - assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)) + assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block()) .isEqualTo(History.empty) @Test def getEventsOfAggregateShouldReturnAppendedEvent(testee: EventStore) : Unit = { val event = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") - testee.append(event) - assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)) + SMono(testee.append(event)).block() + assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block()) .isEqualTo(History.of(event)) } @@ -74,9 +77,9 @@ trait EventStoreContract { def getEventsOfAggregateShouldReturnAppendedEvents(testee: EventStore) : Unit = { val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first") val event2 = TestEvent(event1.eventId.next, EventStoreContract.AGGREGATE_1, "second") - testee.append(event1) - testee.append(event2) - assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)) + SMono(testee.append(event1)).block() + SMono(testee.append(event2)).block() + assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block()) .isEqualTo(History.of(event1, event2)) } } \ No newline at end of file diff --git a/event-sourcing/event-store-cassandra/pom.xml b/event-sourcing/event-store-cassandra/pom.xml index 8bfd47a..e083ea4 100644 --- a/event-sourcing/event-store-cassandra/pom.xml +++ b/event-sourcing/event-store-cassandra/pom.xml @@ -80,8 +80,7 @@ </dependency> <dependency> <groupId>io.projectreactor</groupId> - <artifactId>reactor-scala-extensions_2.13</artifactId> - <version>0.5.0</version> + <artifactId>reactor-scala-extensions_${scala.base}</artifactId> </dependency> <dependency> <groupId>net.javacrumbs.json-unit</groupId> diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala index 6c41973..d9ab8ed 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala @@ -20,25 +20,35 @@ package org.apache.james.eventsourcing.eventstore.cassandra import com.google.common.base.Preconditions import javax.inject.Inject + import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException, History} import org.apache.james.eventsourcing.{AggregateId, Event} +import org.reactivestreams.Publisher + +import reactor.core.scala.publisher.SMono class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends EventStore { - override def appendAll(events: List[Event]): Unit = { + override def appendAll(events: Iterable[Event]): Publisher[Void] = { if (events.nonEmpty) { doAppendAll(events) + } else { + SMono.empty } } - private def doAppendAll(events: List[Event]): Unit = { + private def doAppendAll(events: Iterable[Event]): SMono[Void] = { Preconditions.checkArgument(Event.belongsToSameAggregate(events)) - val success: Boolean = eventStoreDao.appendAll(events).block() - if (!success) { - throw EventStoreFailedException("Concurrent update to the EventStore detected") - } + eventStoreDao.appendAll(events) + .filter(success => success) + .single() + .onErrorMap({ + case _: NoSuchElementException => EventStoreFailedException("Concurrent update to the EventStore detected") + case e => e + }) + .`then`(SMono.empty) } - override def getEventsOfAggregate(aggregateId: AggregateId): History = { - eventStoreDao.getEventsOfAggregate(aggregateId).block() + override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = { + eventStoreDao.getEventsOfAggregate(aggregateId) } -} \ No newline at end of file +} diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala index 95a9239..9fda3a4 100644 --- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala +++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala @@ -50,7 +50,7 @@ class EventStoreDao @Inject() (val session: Session, val jsonEventSerializer: Js .where(QueryBuilder.eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID)))) } - private[cassandra] def appendAll(events: List[Event]): SMono[Boolean] = { + private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] = { val batch: BatchStatement = new BatchStatement events.foreach((event: Event) => batch.add(insertEvent(event))) SMono(cassandraAsyncExecutor.executeReturnApplied(batch)) @@ -74,7 +74,7 @@ class EventStoreDao @Inject() (val session: Session, val jsonEventSerializer: Js val listEvents: SMono[List[Event]] = events.collectSeq() .map(_.toList) - listEvents.map(History.of) + listEvents.map(History.of(_)) } private def toEvent(row: Row): Event = { diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml index d4720c8..8e58626 100644 --- a/event-sourcing/event-store-memory/pom.xml +++ b/event-sourcing/event-store-memory/pom.xml @@ -65,6 +65,10 @@ <scope>test</scope> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-scala-extensions_${scala.base}</artifactId> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> <scope>test</scope> diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala index c4f9621..caf254a 100644 --- a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala +++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala @@ -23,34 +23,44 @@ import java.util.concurrent.atomic.AtomicReference import com.google.common.base.Preconditions import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, Event} +import org.reactivestreams.Publisher + +import reactor.core.scala.publisher.SMono class InMemoryEventStore() extends EventStore { private val storeRef: AtomicReference[Map[AggregateId, History]] = new AtomicReference(Map().withDefault(_ => History.empty)) - override def appendAll(events: List[Event]): Unit = if (events.nonEmpty) doAppendAll(events) + override def appendAll(events: Iterable[Event]): Publisher[Void] = { + if (events.nonEmpty) { + SMono.fromCallable(() => doAppendAll(events)).`then`() + } else { + SMono.empty + } + } - override def getEventsOfAggregate(aggregateId: AggregateId): History = { + override def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History] = { Preconditions.checkNotNull(aggregateId) - storeRef.get()(aggregateId) + SMono.fromCallable(() => storeRef.get()(aggregateId)) } - private def doAppendAll(events: Seq[Event]): Unit = { + private def doAppendAll(events: Iterable[Event]): Boolean = { val aggregateId: AggregateId = getAggregateId(events) storeRef.updateAndGet(store => { val updatedHistory = History.of(store(aggregateId).getEvents ++ events) store.updated(aggregateId, updatedHistory) }) + true } - private def getAggregateId(events: Seq[Event]): AggregateId = { + private def getAggregateId(events: Iterable[Event]): AggregateId = { Preconditions.checkArgument(events.nonEmpty) val aggregateId = events.head.getAggregateId Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events)) aggregateId } - private def belongsToSameAggregate(aggregateId: AggregateId, events: Seq[Event]) = + private def belongsToSameAggregate(aggregateId: AggregateId, events: Iterable[Event]) = events.forall(_.getAggregateId.equals(aggregateId)) } \ No newline at end of file diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java index 83f09a4..551644c 100644 --- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java +++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java @@ -21,14 +21,16 @@ package org.apache.james.mailbox.quota.mailing.commands; import java.util.List; +import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.eventstore.History; -import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration; import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds; +import org.reactivestreams.Publisher; -public class DetectThresholdCrossingHandler implements CommandHandlerJava<DetectThresholdCrossing> { +import reactor.core.publisher.Mono; + +public class DetectThresholdCrossingHandler implements CommandHandler<DetectThresholdCrossing> { private final EventStore eventStore; private final QuotaMailingListenerConfiguration quotaMailingListenerConfiguration; @@ -41,15 +43,15 @@ public class DetectThresholdCrossingHandler implements CommandHandlerJava<Detect } @Override - public List<? extends Event> handleJava(DetectThresholdCrossing command) { + public Publisher<List<? extends Event>> handle(DetectThresholdCrossing command) { return loadAggregate(command) - .detectThresholdCrossing(quotaMailingListenerConfiguration, command); + .map(aggregate -> aggregate.detectThresholdCrossing(quotaMailingListenerConfiguration, command)); } - private UserQuotaThresholds loadAggregate(DetectThresholdCrossing command) { + private Mono<UserQuotaThresholds> loadAggregate(DetectThresholdCrossing command) { UserQuotaThresholds.Id aggregateId = UserQuotaThresholds.Id.from(command.getUsername(), listenerName); - History history = eventStore.getEventsOfAggregate(aggregateId); - return UserQuotaThresholds.fromEvents(aggregateId, history); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .map(history -> UserQuotaThresholds.fromEvents(aggregateId, history)); } @Override diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java index d4e7d5d..577e2bc 100644 --- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java +++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java @@ -42,6 +42,8 @@ import org.apache.mailet.MailetContext; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class QuotaThresholdCrossingListener implements MailboxListener.GroupMailboxListener { public static class QuotaThresholdCrossingListenerGroup extends Group { @@ -74,12 +76,12 @@ public class QuotaThresholdCrossingListener implements MailboxListener.GroupMail @Override public void event(Event event) { if (event instanceof QuotaUsageUpdatedEvent) { - handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) event); + handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) event).block(); } } - private void handleEvent(Username username, QuotaUsageUpdatedEvent event) { - eventSourcingSystem.dispatch( - new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())); + private Mono<Void> handleEvent(Username username, QuotaUsageUpdatedEvent event) { + return Mono.from(eventSourcingSystem.dispatch( + new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant()))); } } diff --git a/pom.xml b/pom.xml index 03ad478..ac8afa0 100644 --- a/pom.xml +++ b/pom.xml @@ -2184,6 +2184,11 @@ <version>${feign-form.version}</version> </dependency> <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-scala-extensions_${scala.base}</artifactId> + <version>0.5.0</version> + </dependency> + <dependency> <groupId>io.netty</groupId> <artifactId>netty</artifactId> <version>${netty.version}</version> diff --git a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java index ccd7223..e18a5c8 100644 --- a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java +++ b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java @@ -20,7 +20,8 @@ package org.apache.james.dlp.api; import org.apache.james.core.Domain; +import org.reactivestreams.Publisher; public interface DLPConfigurationLoader { - DLPRules list(Domain domain); + Publisher<DLPRules> list(Domain domain); } diff --git a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java index 6ed3b0d..9f3b9d6 100644 --- a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java +++ b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java @@ -30,6 +30,7 @@ import org.apache.james.core.Domain; import org.junit.jupiter.api.Test; import com.google.common.collect.ImmutableList; +import reactor.core.publisher.Mono; public interface DLPConfigurationStoreContract { @@ -37,7 +38,7 @@ public interface DLPConfigurationStoreContract { @Test default void listShouldReturnEmptyWhenNone(DLPConfigurationStore dlpConfigurationStore) { - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)) + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()) .isEmpty(); } @@ -45,7 +46,7 @@ public interface DLPConfigurationStoreContract { default void listShouldReturnExistingEntries(DLPConfigurationStore dlpConfigurationStore) { dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, RULE_2); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE, RULE_2); } @Test @@ -53,7 +54,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); dlpConfigurationStore.store(OTHER_DOMAIN, RULE_2); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE); } @Test @@ -62,7 +63,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.clear(Domain.LOCALHOST); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty(); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty(); } @Test @@ -78,7 +79,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.clear(Domain.LOCALHOST); - assertThat(dlpConfigurationStore.list(OTHER_DOMAIN)).containsOnly(RULE_2); + assertThat(Mono.from(dlpConfigurationStore.list(OTHER_DOMAIN)).block()).containsOnly(RULE_2); } @Test @@ -89,7 +90,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE); } @Test @@ -97,7 +98,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2); dlpConfigurationStore.store(Domain.LOCALHOST, RULE); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE); } @Test @@ -105,7 +106,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, RULE_2); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE, RULE_2); } @Test @@ -119,7 +120,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); dlpConfigurationStore.store(Domain.LOCALHOST, RULE_UPDATED); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE_UPDATED); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE_UPDATED); } @Test @@ -127,7 +128,7 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); dlpConfigurationStore.store(Domain.LOCALHOST, RULE); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE); } @Test @@ -135,6 +136,6 @@ public interface DLPConfigurationStoreContract { dlpConfigurationStore.store(Domain.LOCALHOST, RULE); dlpConfigurationStore.store(Domain.LOCALHOST, new DLPRules(ImmutableList.of())); - assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty(); + assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty(); } } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java index c8aec6a..6e780f9 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.james.core.Username; +import org.reactivestreams.Publisher; import com.google.common.collect.ImmutableList; @@ -38,6 +39,6 @@ public interface FilteringManagement { defineRulesForUser(username, ImmutableList.of()); } - List<Rule> listRulesForUser(Username username); + Publisher<Rule> listRulesForUser(Username username); } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java index 4931e07..9378a9f 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java @@ -21,11 +21,14 @@ package org.apache.james.jmap.api.filtering.impl; import java.util.List; +import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; +import org.reactivestreams.Publisher; -public class DefineRulesCommandHandler implements CommandHandlerJava<DefineRulesCommand> { +import reactor.core.publisher.Mono; + +public class DefineRulesCommandHandler implements CommandHandler<DefineRulesCommand> { private final EventStore eventStore; @@ -39,14 +42,11 @@ public class DefineRulesCommandHandler implements CommandHandlerJava<DefineRules } @Override - public List<? extends Event> handleJava(DefineRulesCommand storeCommand) { + public Publisher<List<? extends Event>> handle(DefineRulesCommand storeCommand) { FilteringAggregateId aggregateId = new FilteringAggregateId(storeCommand.getUsername()); - return FilteringAggregate - .load( - aggregateId, - eventStore.getEventsOfAggregate(aggregateId)) - .defineRules(storeCommand.getRules()); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .map(history -> FilteringAggregate.load(aggregateId, history).defineRules(storeCommand.getRules())); } } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java index d8dad9f..429cc79 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java @@ -29,10 +29,14 @@ import org.apache.james.eventsourcing.Subscriber; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.jmap.api.filtering.FilteringManagement; import org.apache.james.jmap.api.filtering.Rule; +import org.reactivestreams.Publisher; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class EventSourcingFilteringManagement implements FilteringManagement { private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of(); @@ -51,19 +55,16 @@ public class EventSourcingFilteringManagement implements FilteringManagement { @Override public void defineRulesForUser(Username username, List<Rule> rules) { - eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules)); + Mono.from(eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules))).block(); } @Override - public List<Rule> listRulesForUser(Username username) { + public Publisher<Rule> listRulesForUser(Username username) { Preconditions.checkNotNull(username); FilteringAggregateId aggregateId = new FilteringAggregateId(username); - return FilteringAggregate - .load( - aggregateId, - eventStore.getEventsOfAggregate(aggregateId)) - .listRules(); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .flatMapMany(history -> Flux.fromIterable(FilteringAggregate.load(aggregateId, history).listRules())); } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java index d959f23..7a355d4 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java @@ -35,6 +35,8 @@ import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + public interface FilteringManagementContract { String BART_SIMPSON_CARTOON = "[email protected]"; @@ -46,7 +48,7 @@ public interface FilteringManagementContract { @Test default void listingRulesForUnknownUserShouldReturnEmptyList(EventStore eventStore) { - assertThat(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME)) + assertThat(Flux.from(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME)).toStream()) .isEmpty(); } @@ -63,7 +65,7 @@ public interface FilteringManagementContract { testee.defineRulesForUser(USERNAME, RULE_1, RULE_2); - assertThat(testee.listRulesForUser(USERNAME)) + assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()) .containsExactly(RULE_1, RULE_2); } @@ -74,7 +76,7 @@ public interface FilteringManagementContract { testee.defineRulesForUser(USERNAME, RULE_1, RULE_2); testee.defineRulesForUser(USERNAME, RULE_2, RULE_1); - assertThat(testee.listRulesForUser(USERNAME)) + assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()) .containsExactly(RULE_2, RULE_1); } @@ -108,7 +110,7 @@ public interface FilteringManagementContract { FilteringManagement testee = instantiateFilteringManagement(eventStore); testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1); - assertThat(testee.listRulesForUser(USERNAME)) + assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()) .containsExactly(RULE_3, RULE_2, RULE_1); } @@ -119,7 +121,7 @@ public interface FilteringManagementContract { testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1); testee.clearRulesForUser(USERNAME); - assertThat(testee.listRulesForUser(USERNAME)).isEmpty(); + assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()).isEmpty(); } @Test @@ -128,7 +130,7 @@ public interface FilteringManagementContract { testee.defineRulesForUser(USERNAME, RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, RULE_1); - assertThat(testee.listRulesForUser(USERNAME)) + assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()) .containsExactly(RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, RULE_1); } diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java index 957a86d..48d88ba 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java @@ -37,10 +37,13 @@ import org.apache.james.dlp.eventsourcing.commands.StoreCommandHandler; import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.Subscriber; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.util.streams.Iterables; +import org.reactivestreams.Publisher; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore { private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of(); @@ -60,29 +63,29 @@ public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore } @Override - public DLPRules list(Domain domain) { + public Publisher<DLPRules> list(Domain domain) { DLPAggregateId aggregateId = new DLPAggregateId(domain); - return DLPDomainConfiguration.load( - aggregateId, - eventStore.getEventsOfAggregate(aggregateId)) - .retrieveRules(); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .map(history -> DLPDomainConfiguration.load(aggregateId, history).retrieveRules()); } @Override public void store(Domain domain, DLPRules rules) { - eventSourcingSystem.dispatch(new StoreCommand(domain, rules)); + Mono.from(eventSourcingSystem.dispatch(new StoreCommand(domain, rules))).block(); } @Override public void clear(Domain domain) { - eventSourcingSystem.dispatch(new ClearCommand(domain)); + Mono.from(eventSourcingSystem.dispatch(new ClearCommand(domain))).block(); } @Override public Optional<DLPConfigurationItem> fetch(Domain domain, Id ruleId) { - return Iterables.toStream(list(domain)) + return Mono.from(list(domain)) + .flatMapMany(rules -> Flux.fromIterable(rules.getItems())) + .toStream() .filter((DLPConfigurationItem item) -> item.getId().equals(ruleId)) .findFirst(); } diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java index c59f3ce..e9ed63e 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java @@ -23,11 +23,14 @@ import java.util.List; import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId; import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration; +import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; +import org.reactivestreams.Publisher; -public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> { +import reactor.core.publisher.Mono; + +public class ClearCommandHandler implements CommandHandler<ClearCommand> { private final EventStore eventStore; @@ -41,12 +44,10 @@ public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> { } @Override - public List<? extends Event> handleJava(ClearCommand clearCommand) { + public Publisher<List<? extends Event>> handle(ClearCommand clearCommand) { DLPAggregateId aggregateId = new DLPAggregateId(clearCommand.getDomain()); - return DLPDomainConfiguration.load( - aggregateId, - eventStore.getEventsOfAggregate(aggregateId)) - .clear(); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .map(history -> DLPDomainConfiguration.load(aggregateId, history).clear()); } } diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java index e24bd83..0d628d5 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java @@ -23,11 +23,14 @@ import java.util.List; import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId; import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration; +import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; +import org.reactivestreams.Publisher; -public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> { +import reactor.core.publisher.Mono; + +public class StoreCommandHandler implements CommandHandler<StoreCommand> { private final EventStore eventStore; @@ -41,12 +44,10 @@ public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> { } @Override - public List<? extends Event> handleJava(StoreCommand storeCommand) { + public Publisher<List<? extends Event>> handle(StoreCommand storeCommand) { DLPAggregateId aggregateId = new DLPAggregateId(storeCommand.getDomain()); - return DLPDomainConfiguration.load( - aggregateId, - eventStore.getEventsOfAggregate(aggregateId)) - .store(storeCommand.getRules()); + return Mono.from(eventStore.getEventsOfAggregate(aggregateId)) + .map(history -> DLPDomainConfiguration.load(aggregateId, history).store(storeCommand.getRules())); } } diff --git a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java index 08e3c4d..3d1f95f 100644 --- a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java +++ b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java @@ -25,6 +25,8 @@ import org.apache.james.core.Domain; import org.apache.james.dlp.api.DLPConfigurationStore; import org.apache.james.dlp.api.DLPRules; +import reactor.core.publisher.Mono; + public interface DlpRulesLoader { DlpDomainRules load(Domain domain); @@ -40,7 +42,7 @@ public interface DlpRulesLoader { @Override public DlpDomainRules load(Domain domain) { - return toRules(configurationStore.list(domain)); + return toRules(Mono.from(configurationStore.list(domain)).block()); } private DlpDomainRules toRules(DLPRules items) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java index 5bb238c..618d862 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java @@ -19,14 +19,12 @@ package org.apache.james.jmap.draft.methods; -import java.util.List; import java.util.stream.Stream; import javax.inject.Inject; import org.apache.james.core.Username; import org.apache.james.jmap.api.filtering.FilteringManagement; -import org.apache.james.jmap.api.filtering.Rule; import org.apache.james.jmap.draft.model.GetFilterRequest; import org.apache.james.jmap.draft.model.GetFilterResponse; import org.apache.james.jmap.draft.model.MethodCallId; @@ -37,8 +35,12 @@ import org.apache.james.util.MDCBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; import com.google.common.base.Preconditions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class GetFilterMethod implements Method { private static final Logger LOGGER = LoggerFactory.getLogger(GetFilterMethod.class); @@ -93,17 +95,18 @@ public class GetFilterMethod implements Method { } private Stream<JmapResponse> retrieveFilter(MethodCallId methodCallId, Username username) { - List<Rule> rules = filteringManagement.listRulesForUser(username); - - GetFilterResponse getFilterResponse = GetFilterResponse.builder() - .rules(rules) - .build(); - - return Stream.of(JmapResponse.builder() - .methodCallId(methodCallId) - .response(getFilterResponse) - .responseName(RESPONSE_NAME) - .build()); + return Flux.from(filteringManagement.listRulesForUser(username)) + .collect(Guavate.toImmutableList()) + .map(rules -> GetFilterResponse.builder() + .rules(rules) + .build()) + .map(getFilterResponse -> JmapResponse.builder() + .methodCallId(methodCallId) + .response(getFilterResponse) + .responseName(RESPONSE_NAME) + .build()) + .flatMapMany(Mono::just) + .toStream(); } private JmapResponse unKnownError(MethodCallId methodCallId) { diff --git a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java index 0a0d58f..efd5fc7 100644 --- a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java +++ b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java @@ -36,6 +36,10 @@ import org.apache.mailet.base.GenericMailet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.github.steveash.guavate.Guavate; + +import reactor.core.publisher.Flux; + /** * Mailet for applying JMAP filtering to incoming email. * @@ -77,7 +81,9 @@ public class JMAPFiltering extends GenericMailet { } private void findFirstApplicableRule(Username username, Mail mail) { - List<Rule> filteringRules = filteringManagement.listRulesForUser(username); + List<Rule> filteringRules = Flux.from(filteringManagement.listRulesForUser(username)) + .collect(Guavate.toImmutableList()) + .block(); RuleMatcher ruleMatcher = new RuleMatcher(filteringRules); Stream<Rule> matchingRules = ruleMatcher.findApplicableRules(mail); diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java index 9dc3e32..e197a5a 100644 --- a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java +++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java @@ -55,6 +55,7 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import reactor.core.publisher.Mono; import spark.HaltException; import spark.Request; import spark.Service; @@ -166,7 +167,7 @@ public class DLPConfigurationRoutes implements Routes { public void defineList(Service service) { service.get(SPECIFIC_DLP_RULE_DOMAIN, (request, response) -> { Domain senderDomain = parseDomain(request); - DLPRules dlpConfigurations = dlpConfigurationStore.list(senderDomain); + DLPRules dlpConfigurations = Mono.from(dlpConfigurationStore.list(senderDomain)).block(); DLPConfigurationDTO dto = DLPConfigurationDTO.toDTO(dlpConfigurations); response.status(HttpStatus.OK_200); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java index a9ca76a..ba2c621 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java @@ -19,8 +19,6 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration; -import java.util.Optional; - import javax.inject.Inject; import org.apache.james.eventsourcing.AggregateId; @@ -32,6 +30,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import reactor.core.publisher.Mono; + public class EventsourcingConfigurationManagement { static final String CONFIGURATION_AGGREGATE_KEY = "CassandraMailQueueViewConfiguration"; static final AggregateId CONFIGURATION_AGGREGATE_ID = () -> CONFIGURATION_AGGREGATE_KEY; @@ -51,15 +51,16 @@ public class EventsourcingConfigurationManagement { } @VisibleForTesting - Optional<CassandraMailQueueViewConfiguration> load() { - return ConfigurationAggregate - .load(CONFIGURATION_AGGREGATE_ID, eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)) - .getCurrentConfiguration(); + Mono<CassandraMailQueueViewConfiguration> load() { + return Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)) + .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate + .load(CONFIGURATION_AGGREGATE_ID, history) + .getCurrentConfiguration())); } public void registerConfiguration(CassandraMailQueueViewConfiguration newConfiguration) { Preconditions.checkNotNull(newConfiguration); - eventSourcingSystem.dispatch(new RegisterConfigurationCommand(newConfiguration, CONFIGURATION_AGGREGATE_ID)); + Mono.from(eventSourcingSystem.dispatch(new RegisterConfigurationCommand(newConfiguration, CONFIGURATION_AGGREGATE_ID))).block(); } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java index 33c61e9..bf54102 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java @@ -21,11 +21,14 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration; import java.util.List; +import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; +import org.reactivestreams.Publisher; -class RegisterConfigurationCommandHandler implements CommandHandlerJava<RegisterConfigurationCommand> { +import reactor.core.publisher.Mono; + +class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConfigurationCommand> { private final EventStore eventStore; @@ -39,9 +42,10 @@ class RegisterConfigurationCommandHandler implements CommandHandlerJava<Register } @Override - public List<? extends Event> handleJava(RegisterConfigurationCommand command) { - return ConfigurationAggregate - .load(command.getAggregateId(), eventStore.getEventsOfAggregate(command.getAggregateId())) - .registerConfiguration(command.getConfiguration()); + public Publisher<List<? extends Event>> handle(RegisterConfigurationCommand command) { + return Mono.from(eventStore.getEventsOfAggregate(command.getAggregateId())) + .map(history -> ConfigurationAggregate + .load(command.getAggregateId(), history) + .registerConfiguration(command.getConfiguration())); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java index 1518a1d..5d191bf 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java @@ -31,6 +31,8 @@ import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import reactor.core.publisher.Mono; + class EventsourcingConfigurationManagementTest { @RegisterExtension @@ -69,7 +71,7 @@ class EventsourcingConfigurationManagementTest { void loadShouldReturnEmptyIfNoConfigurationStored(EventStore eventStore) { EventsourcingConfigurationManagement testee = createConfigurationManagement(eventStore); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .isEmpty(); } @@ -80,7 +82,7 @@ class EventsourcingConfigurationManagementTest { testee.registerConfiguration(SECOND_CONFIGURATION); testee.registerConfiguration(THIRD_CONFIGURATION); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(THIRD_CONFIGURATION); } @@ -125,7 +127,7 @@ class EventsourcingConfigurationManagementTest { .build(); testee.registerConfiguration(increaseOneBucketConfiguration); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(increaseOneBucketConfiguration); } @@ -135,7 +137,7 @@ class EventsourcingConfigurationManagementTest { testee.registerConfiguration(FIRST_CONFIGURATION); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(FIRST_CONFIGURATION); } @@ -189,7 +191,7 @@ class EventsourcingConfigurationManagementTest { .build(); testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(decreaseTwiceSliceWindowConfiguration); } @@ -209,7 +211,7 @@ class EventsourcingConfigurationManagementTest { .build(); testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(decreaseTwiceSliceWindowConfiguration); } @@ -229,7 +231,7 @@ class EventsourcingConfigurationManagementTest { .build(); testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration); - assertThat(testee.load()) + assertThat(Mono.from(testee.load()).blockOptional()) .contains(decreaseTwiceSliceWindowConfiguration); } @@ -239,7 +241,8 @@ class EventsourcingConfigurationManagementTest { testee.registerConfiguration(FIRST_CONFIGURATION); testee.registerConfiguration(FIRST_CONFIGURATION); - assertThat(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID) + assertThat(Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)) + .block() .getEventsJava()) .hasSize(1); } diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index de68b24..dbdce52 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -140,22 +140,22 @@ public class RabbitMQWorkQueue implements WorkQueue { private Mono<Task> deserialize(String json, TaskId taskId) { return Mono.fromCallable(() -> taskSerializer.deserialize(json)) - .doOnError(error -> { + .onErrorResume(error -> { String errorMessage = String.format("Unable to deserialize submitted Task %s", taskId.asString()); LOGGER.error(errorMessage, error); - worker.fail(taskId, Optional.empty(), errorMessage, error); - }) - .onErrorResume(error -> Mono.empty()); + return Mono.from(worker.fail(taskId, Optional.empty(), errorMessage, error)) + .then(Mono.empty()); + }); } private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) { return worker.executeTask(new TaskWithId(taskId, task)) - .doOnError(error -> { + .onErrorResume(error -> { String errorMessage = String.format("Unable to run submitted Task %s", taskId.asString()); LOGGER.warn(errorMessage, error); - worker.fail(taskId, task.details(), errorMessage, error); - }) - .onErrorResume(error -> Mono.empty()); + return Mono.from(worker.fail(taskId, task.details(), errorMessage, error)) + .then(Mono.empty()); + }); } private void listenToCancelRequests() { diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java index 93d8f75..71e6214 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java @@ -27,6 +27,7 @@ import org.apache.james.task.TaskExecutionDetails; import org.apache.james.task.TaskId; import org.apache.james.task.TaskManagerWorker; import org.apache.james.task.TaskWithId; +import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -39,8 +40,8 @@ class ImmediateWorker implements TaskManagerWorker { @Override public Mono<Task.Result> executeTask(TaskWithId taskWithId) { - tasks.add(taskWithId); - return Mono.fromCallable(() -> taskWithId.getTask().run()) + return Mono.fromRunnable(() -> tasks.add(taskWithId)) + .then(Mono.fromCallable(() -> taskWithId.getTask().run())) .doOnNext(result -> results.add(result)) .subscribeOn(Schedulers.elastic()); } @@ -50,8 +51,9 @@ class ImmediateWorker implements TaskManagerWorker { } @Override - public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) { - failedTasks.add(taskId); + public Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) { + return Mono.fromRunnable(() -> failedTasks.add(taskId)) + .then(); } @Override diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java index 3d2da18..8d983b0 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java @@ -30,10 +30,13 @@ import java.util.function.Consumer; import javax.annotation.PreDestroy; import javax.inject.Inject; +import org.reactivestreams.Publisher; + import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; public class MemoryTaskManager implements TaskManager { @@ -54,40 +57,41 @@ public class MemoryTaskManager implements TaskManager { } @Override - public void started(TaskId taskId) { - updaterFactory.apply(taskId).accept(details -> details.started(hostname)); + public Publisher<Void> started(TaskId taskId) { + return Mono.fromRunnable(() -> updaterFactory.apply(taskId) + .accept(details -> details.started(hostname))); } @Override - public void completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { - updaterFactory.apply(taskId) - .accept(details -> details.completed(additionalInformation)); + public Publisher<Void> completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { + return Mono.fromRunnable(() -> updaterFactory.apply(taskId) + .accept(details -> details.completed(additionalInformation))); } @Override - public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t) { - failed(taskId, additionalInformation); + public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t) { + return failed(taskId, additionalInformation); } @Override - public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t) { - failed(taskId, additionalInformation); + public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t) { + return failed(taskId, additionalInformation); } @Override - public void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { - updaterFactory.apply(taskId) - .accept(details -> details.failed(additionalInformation)); + public Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { + return Mono.fromRunnable(() -> updaterFactory.apply(taskId) + .accept(details -> details.failed(additionalInformation))); } @Override - public void cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { - updaterFactory.apply(taskId) - .accept(details -> details.cancelEffectively(additionalInformation)); + public Publisher<Void> cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) { + return Mono.fromRunnable(() -> updaterFactory.apply(taskId) + .accept(details -> details.cancelEffectively(additionalInformation))); } @Override - public void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) { + public Publisher<Void> updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation) { //The memory task manager doesn't use polling to update its additionalInformation. throw new IllegalStateException(); } diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java index 9bdad89..b024c4c 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.james.util.MDCBuilder; import org.apache.james.util.concurrent.NamedThreadFactory; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,20 +70,20 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { return Mono.using( () -> pollAdditionalInformation(taskWithId).subscribe(), ignored -> Mono.fromFuture(future) - .doOnError(exception -> handleExecutionError(taskWithId, listener, exception)) - .onErrorReturn(Task.Result.PARTIAL), + .onErrorResume(exception -> Mono.from(handleExecutionError(taskWithId, listener, exception)) + .thenReturn(Task.Result.PARTIAL)), Disposable::dispose); } else { - listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); - return Mono.empty(); + return Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())) + .then(Mono.empty()); } } - private void handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) { + private Publisher<Void> handleExecutionError(TaskWithId taskWithId, Listener listener, Throwable exception) { if (exception instanceof CancellationException) { - listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); + return listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); } else { - listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); + return listener.failed(taskWithId.getId(), taskWithId.getTask().details(), exception); } } @@ -91,7 +92,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .delayElement(pollingInterval, Schedulers.elastic()) .repeat() .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, sink) -> maybeDetails.ifPresent(sink::next)) - .doOnNext(information -> listener.updated(taskWithId.getId(), information)); + .flatMap(information -> Mono.from(listener.updated(taskWithId.getId(), information)).thenReturn(information)); } @@ -101,27 +102,27 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { .addContext(Task.TASK_ID, taskWithId.getId()) .addContext(Task.TASK_TYPE, taskWithId.getTask().type()) .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()), - () -> run(taskWithId, listener)); + () -> run(taskWithId, listener).block()); } - private Task.Result run(TaskWithId taskWithId, Listener listener) { - listener.started(taskWithId.getId()); - try { - return taskWithId.getTask() - .run() - .onComplete(result -> listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())) - .onFailure(() -> { - LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); - listener.failed(taskWithId.getId(), taskWithId.getTask().details()); - }); - } catch (InterruptedException e) { - listener.cancelled(taskWithId.getId(), taskWithId.getTask().details()); - return Task.Result.PARTIAL; - } catch (Exception e) { - LOGGER.error("Error while running task {}", taskWithId.getId(), e); - listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e); - return Task.Result.PARTIAL; - } + private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) { + return Mono.from(listener.started(taskWithId.getId())) + .then(Mono.fromCallable(() -> runTask(taskWithId, listener))) + .onErrorResume(InterruptedException.class, e -> Mono.from(listener.cancelled(taskWithId.getId(), taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL)) + .onErrorResume(Exception.class, e -> { + LOGGER.error("Error while running task {}", taskWithId.getId(), e); + return Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL); + }); + } + + private Task.Result runTask(TaskWithId taskWithId, Listener listener) throws InterruptedException { + return taskWithId.getTask() + .run() + .onComplete(result -> Mono.from(listener.completed(taskWithId.getId(), result, taskWithId.getTask().details())).block()) + .onFailure(() -> { + LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId()); + Mono.from(listener.failed(taskWithId.getId(), taskWithId.getTask().details())).block(); + }); } @Override @@ -133,8 +134,8 @@ public class SerialTaskManagerWorker implements TaskManagerWorker { } @Override - public void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) { - listener.failed(taskId, additionalInformation, errorMessage, reason); + public Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason) { + return listener.failed(taskId, additionalInformation, errorMessage, reason); } @Override diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java index c6475e0..c29069e 100644 --- a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java +++ b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java @@ -21,29 +21,31 @@ package org.apache.james.task; import java.io.Closeable; import java.util.Optional; +import org.reactivestreams.Publisher; + import reactor.core.publisher.Mono; public interface TaskManagerWorker extends Closeable { interface Listener { - void started(TaskId taskId); + Publisher<Void> started(TaskId taskId); - void completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); + Publisher<Void> completed(TaskId taskId, Task.Result result, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); - void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t); + Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable t); - void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t); + Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, Throwable t); - void failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); + Publisher<Void> failed(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); - void cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); + Publisher<Void> cancelled(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation); - void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation); + Publisher<Void> updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation additionalInformation); } Mono<Task.Result> executeTask(TaskWithId taskWithId); void cancelTask(TaskId taskId); - void fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason); + Publisher<Void> fail(TaskId taskId, Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, String errorMessage, Throwable reason); } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index 39e0ea4..de682c1 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -1,4 +1,4 @@ -/** ************************************************************** +/***************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,87 +6,92 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - * ***************************************************************/ + ****************************************************************/ package org.apache.james.task.eventsourcing -import java.util +import java.util.List import org.apache.james.eventsourcing.eventstore.History import org.apache.james.eventsourcing.{CommandHandler, Event} import org.apache.james.task.eventsourcing.TaskCommand._ import org.apache.james.task.{Hostname, TaskId} +import org.reactivestreams.Publisher + +import reactor.core.scala.publisher.SMono + +import scala.jdk.CollectionConverters._ sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandler[T] { - def loadAggregate(loadHistory: TaskAggregateId => History, taskId: TaskId): TaskAggregate = { + def loadAggregate(loadHistory: TaskAggregateId => SMono[History], taskId: TaskId): SMono[TaskAggregate] = { val aggregateId = TaskAggregateId(taskId) - TaskAggregate.fromHistory(aggregateId, loadHistory(aggregateId)) + loadHistory(aggregateId).map(TaskAggregate.fromHistory(aggregateId, _)) } } -class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, hostname: Hostname) extends TaskCommandHandler[Create] { +class CreateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], hostname: Hostname) extends TaskCommandHandler[Create] { override def handledClass: Class[Create] = classOf[Create] - override def handle(command: Create): List[_ <: Event] = { - TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname) + override def handle(command: Create): Publisher[List[_ <: Event]] = { + SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).asJava) } } -class StartCommandHandler(private val loadHistory: TaskAggregateId => History, +class StartCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], private val hostname: Hostname) extends TaskCommandHandler[Start] { override def handledClass: Class[Start] = classOf[Start] - override def handle(command: Start): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).start(hostname) + override def handle(command: Start): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.start(hostname).asJava) } } -class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => History, +class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] { override def handledClass: Class[RequestCancel] = classOf[RequestCancel] - override def handle(command: RequestCancel): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).requestCancel(hostname) + override def handle(command: RequestCancel): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).asJava) } } -class CompleteCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Complete] { +class CompleteCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Complete] { override def handledClass: Class[Complete] = classOf[Complete] - override def handle(command: Complete): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).complete(command.result, command.additionalInformation) + override def handle(command: Complete): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).asJava) } } -class CancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Cancel] { +class CancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Cancel] { override def handledClass: Class[Cancel] = classOf[Cancel] - override def handle(command: Cancel): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).cancel(command.additionalInformation) + override def handle(command: Cancel): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).asJava) } } -class FailCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Fail] { +class FailCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Fail] { override def handledClass: Class[Fail] = classOf[Fail] - override def handle(command: Fail): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception) + override def handle(command: Fail): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).asJava) } } -class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] { +class UpdateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[UpdateAdditionalInformation] { override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation] - override def handle(command: UpdateAdditionalInformation): List[_ <: Event] = { - loadAggregate(loadHistory, command.id).update(command.additionalInformation) + override def handle(command: UpdateAdditionalInformation): Publisher[List[_ <: Event]] = { + loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).asJava) } } \ No newline at end of file diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index ad5d848..d9e6fa6 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -25,13 +25,16 @@ import java.util import com.google.common.annotations.VisibleForTesting import javax.annotation.PreDestroy import javax.inject.Inject + import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, EventSourcingSystem, Subscriber} import org.apache.james.lifecycle.api.Startable import org.apache.james.task.TaskManager.ReachedTimeoutException import org.apache.james.task._ import org.apache.james.task.eventsourcing.TaskCommand._ + import reactor.core.publisher.{Flux, Mono} +import reactor.core.scala.publisher.SMono import reactor.core.scheduler.Schedulers class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]( @@ -52,7 +55,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] import scala.jdk.CollectionConverters._ - private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _ + private val loadHistory: AggregateId => SMono[History] = aggregateId => SMono(eventStore.getEventsOfAggregate(aggregateId)) private val eventSourcingSystem = new EventSourcingSystem( handlers = Set( new CreateCommandHandler(loadHistory, hostname), @@ -75,7 +78,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] override def submit(task: Task): TaskId = { val taskId = TaskId.generateTaskId val command = Create(taskId, task) - eventSourcingSystem.dispatch(command) + SMono(eventSourcingSystem.dispatch(command)).block() taskId } @@ -93,7 +96,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] override def cancel(id: TaskId): Unit = { val command = RequestCancel(id) - eventSourcingSystem.dispatch(command) + SMono(eventSourcingSystem.dispatch(command)).block() } @throws(classOf[TaskNotFoundException]) diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala index aa93d82..1edbf03 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -26,28 +26,31 @@ import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.task.Task.Result import org.apache.james.task.eventsourcing.TaskCommand._ import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker} +import org.reactivestreams.Publisher + +import reactor.core.scala.publisher.SMono import scala.compat.java8.OptionConverters._ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener { - override def started(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Start(taskId)) + override def started(taskId: TaskId): Publisher[Void] = eventSourcingSystem.dispatch(Start(taskId)) - override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit = + override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] = eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala)) - override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Unit = + override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Publisher[Void] = eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t)))) - override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Unit = + override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Publisher[Void] = eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t)))) - override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit = + override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] = eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None)) - override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit = + override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] = eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala )) - override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = + override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Publisher[Void] = eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation)) } \ No newline at end of file diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java index a83392f..5c4f23d 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import java.io.IOException; import java.time.Duration; @@ -56,6 +57,13 @@ class SerialTaskManagerWorkerTest { @BeforeEach void beforeEach() { listener = mock(TaskManagerWorker.Listener.class); + when(listener.started(any())).thenReturn(Mono.empty()); + when(listener.cancelled(any(), any())).thenReturn(Mono.empty()); + when(listener.completed(any(), any(), any())).thenReturn(Mono.empty()); + when(listener.updated(any(), any())).thenReturn(Mono.empty()); + when(listener.failed(any(), any())).thenReturn(Mono.empty()); + when(listener.failed(any(), any(), any())).thenReturn(Mono.empty()); + when(listener.failed(any(), any(), any(), any())).thenReturn(Mono.empty()); worker = new SerialTaskManagerWorker(listener, UPDATE_INFORMATION_POLLING_DURATION); } diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index dcc727c..852551c 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -41,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import reactor.core.publisher.Mono; + @ExtendWith(CountDownLatchExtension.class) class EventSourcingTaskManagerTest implements TaskManagerContract { ConditionFactory CALMLY_AWAIT = Awaitility @@ -79,7 +81,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { void createdTaskShouldKeepOriginHostname() { TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> Task.Result.COMPLETED)); TaskAggregateId aggregateId = new TaskAggregateId(taskId); - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava()) + assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava()) .filteredOn(event -> event instanceof Created) .extracting("hostname") .containsOnly(HOSTNAME); @@ -91,7 +93,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskAggregateId aggregateId = new TaskAggregateId(taskId); CALMLY_AWAIT.untilAsserted(() -> - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava()) + assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava()) .filteredOn(event -> event instanceof Started) .extracting("hostname") .containsOnly(HOSTNAME)); @@ -107,7 +109,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskAggregateId aggregateId = new TaskAggregateId(taskId); CALMLY_AWAIT.untilAsserted(() -> - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava()) + assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava()) .filteredOn(event -> event instanceof CancelRequested) .extracting("hostname") .containsOnly(HOSTNAME)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
