This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 21c0033b3bfc7fad82a6f92ceb0ece3d0b9ee9a3
Author: Gautier DI FOLCO <[email protected]>
AuthorDate: Wed Mar 25 15:10:22 2020 +0100

    JAMES-3133 Reactify the event system
---
 event-sourcing/event-sourcing-core/pom.xml         |  4 ++
 .../apache/james/eventsourcing/CommandHandler.java |  6 ++-
 .../eventsourcing/javaapi/CommandHandlerJava.java  | 34 ------------
 .../james/eventsourcing/CommandDispatcher.scala    | 56 +++++++++----------
 .../org/apache/james/eventsourcing/EventBus.scala  | 22 ++++++--
 .../james/eventsourcing/EventSourcingSystem.scala  |  3 +-
 .../eventsourcing/EventSourcingSystemTest.scala    | 55 ++++++++++---------
 .../org/apache/james/eventsourcing/Event.scala     |  3 +-
 event-sourcing/event-store-api/pom.xml             |  9 ++++
 .../eventsourcing/eventstore/EventStore.scala      | 11 ++--
 .../eventstore/EventStoreContract.scala            | 23 ++++----
 event-sourcing/event-store-cassandra/pom.xml       |  3 +-
 .../eventstore/cassandra/CassandraEventStore.scala | 28 ++++++----
 .../eventstore/cassandra/EventStoreDao.scala       |  4 +-
 event-sourcing/event-store-memory/pom.xml          |  4 ++
 .../eventstore/memory/InMemoryEventStore.scala     | 22 +++++---
 .../commands/DetectThresholdCrossingHandler.java   | 18 ++++---
 .../listeners/QuotaThresholdCrossingListener.java  | 10 ++--
 pom.xml                                            |  5 ++
 .../james/dlp/api/DLPConfigurationLoader.java      |  3 +-
 .../dlp/api/DLPConfigurationStoreContract.java     | 23 ++++----
 .../jmap/api/filtering/FilteringManagement.java    |  3 +-
 .../filtering/impl/DefineRulesCommandHandler.java  | 16 +++---
 .../impl/EventSourcingFilteringManagement.java     | 15 +++---
 .../api/filtering/FilteringManagementContract.java | 14 ++---
 .../EventSourcingDLPConfigurationStore.java        | 21 ++++----
 .../commands/ClearCommandHandler.java              | 15 +++---
 .../commands/StoreCommandHandler.java              | 15 +++---
 .../transport/matchers/dlp/DlpRulesLoader.java     |  4 +-
 .../james/jmap/draft/methods/GetFilterMethod.java  | 29 +++++-----
 .../james/jmap/mailet/filter/JMAPFiltering.java    |  8 ++-
 .../webadmin/routes/DLPConfigurationRoutes.java    |  3 +-
 .../EventsourcingConfigurationManagement.java      | 15 +++---
 .../RegisterConfigurationCommandHandler.java       | 16 +++---
 .../EventsourcingConfigurationManagementTest.java  | 19 ++++---
 .../distributed/RabbitMQWorkQueue.java             | 16 +++---
 .../eventsourcing/distributed/ImmediateWorker.java | 10 ++--
 .../org/apache/james/task/MemoryTaskManager.java   | 36 +++++++------
 .../apache/james/task/SerialTaskManagerWorker.java | 59 ++++++++++----------
 .../org/apache/james/task/TaskManagerWorker.java   | 18 ++++---
 .../james/task/eventsourcing/CommandHandlers.scala | 63 ++++++++++++----------
 .../eventsourcing/EventSourcingTaskManager.scala   |  9 ++--
 .../task/eventsourcing/WorkerStatusListener.scala  | 17 +++---
 .../james/task/SerialTaskManagerWorkerTest.java    |  8 +++
 .../EventSourcingTaskManagerTest.java              |  8 +--
 45 files changed, 439 insertions(+), 344 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/pom.xml 
b/event-sourcing/event-sourcing-core/pom.xml
index 078e693..9497ae1 100644
--- a/event-sourcing/event-sourcing-core/pom.xml
+++ b/event-sourcing/event-sourcing-core/pom.xml
@@ -56,6 +56,10 @@
             <artifactId>guavate</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>javax.inject</groupId>
             <artifactId>javax.inject</artifactId>
         </dependency>
diff --git 
a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
 
b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
index 6e3645b..dd69083 100644
--- 
a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
+++ 
b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
@@ -18,10 +18,12 @@
  ****************************************************************/
 package org.apache.james.eventsourcing;
 
-import scala.collection.immutable.List;
+import java.util.List;
+
+import org.reactivestreams.Publisher;
 
 public interface CommandHandler<C extends Command> {
   Class<C> handledClass();
 
-  List<? extends Event> handle(C command);
+  Publisher<List<? extends Event>> handle(C command);
 }
diff --git 
a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java
 
b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java
deleted file mode 100644
index ce9cfc9..0000000
--- 
a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.eventsourcing.javaapi;
-
-import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
-
-import scala.jdk.javaapi.CollectionConverters;
-
-public interface CommandHandlerJava<C extends Command> extends 
CommandHandler<C> {
-
-    java.util.List<? extends Event> handleJava(C command);
-
-    default scala.collection.immutable.List<? extends Event> handle(C command) 
{
-        return CollectionConverters.asScala(handleJava(command)).toList();
-    }
-}
diff --git 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index 8dde47f..05abc6f 100644
--- 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -18,11 +18,17 @@
  ****************************************************************/
 package org.apache.james.eventsourcing
 
-import com.google.common.base.Preconditions
+import java.util
+
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
+import org.reactivestreams.Publisher
+
+import com.google.common.base.Preconditions
+import reactor.core.scala.publisher.SMono
 
-import scala.util.{Failure, Success, Try}
+import scala.jdk.CollectionConverters._
 
 object CommandDispatcher {
   private val MAX_RETRY = 10
@@ -45,9 +51,16 @@ object CommandDispatcher {
 class CommandDispatcher @Inject()(eventBus: EventBus, handlers: 
Set[CommandHandler[_ <: Command]]) {
   Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), 
CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION)
 
-  def dispatch(c: Command): Unit = {
-    trySeveralTimes(() => tryDispatch(c))
-      .getOrElse(() => throw CommandDispatcher.TooManyRetries(c, 
CommandDispatcher.MAX_RETRY))
+  def dispatch(c: Command): Publisher[Void] = {
+    tryDispatch(c)
+      .retry(CommandDispatcher.MAX_RETRY, {
+        case _: EventStoreFailedException => true
+        case _ => false
+      })
+      .onErrorMap({
+        case _: EventStoreFailedException => 
CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY)
+        case error => error
+      })
   }
 
   private def hasOnlyOneHandlerByCommand(handlers: Set[CommandHandler[_ <: 
Command]]): Boolean =
@@ -58,35 +71,22 @@ class CommandDispatcher @Inject()(eventBus: EventBus, 
handlers: Set[CommandHandl
   private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: 
Command]] =
     handlers.map(handler => (handler.handledClass, handler)).toMap
 
-
-  private def trySeveralTimes(singleTry: () => Boolean): Option[Unit] =
-    0.until(CommandDispatcher.MAX_RETRY)
-      .find(_ => singleTry())
-      .map(_ => ())
-
-
-  private def tryDispatch(c: Command): Boolean = {
-    val maybeEvents: Option[Try[List[_ <: Event]]] = handleCommand(c)
-    maybeEvents match {
-      case Some(eventsTry) =>
-        eventsTry
-          .flatMap(events => Try(eventBus.publish(events))) match {
-          case Success(_) => true
-          case Failure(_: EventStoreFailedException) => false
-          case Failure(e) => throw e
-        }
+  private def tryDispatch(c: Command): SMono[Void] = {
+    handleCommand(c) match {
+      case Some(eventsPublisher) =>
+        SMono(eventsPublisher)
+          .flatMap(events => eventBus.publish(events.asScala))
       case _ =>
-        throw CommandDispatcher.UnknownCommandException(c)
+        SMono.raiseError(CommandDispatcher.UnknownCommandException(c))
     }
   }
 
-  private def handleCommand(c: Command): Option[Try[List[_ <: Event]]] = {
+  private def handleCommand(c: Command): Option[Publisher[util.List[_ <: 
Event]]] = {
     handlersByClass
       .get(c.getClass)
       .map(commandHandler =>
-        Try(
-          commandHandler
-            .asInstanceOf[CommandHandler[c.type]]
-            .handle(c)))
+        commandHandler
+          .asInstanceOf[CommandHandler[c.type]]
+          .handle(c))
   }
 }
\ No newline at end of file
diff --git 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index 9d527d3..ca8d753 100644
--- 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -19,22 +19,34 @@
 package org.apache.james.eventsourcing
 
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, 
EventStoreFailedException}
+import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
 
+import reactor.core.scala.publisher.{SFlux, SMono}
+
 object EventBus {
   private val LOGGER = LoggerFactory.getLogger(classOf[EventBus])
 }
 
 class EventBus @Inject() (eventStore: EventStore, subscribers: 
Set[Subscriber]) {
   @throws[EventStoreFailedException]
-  def publish(events: List[Event]): Unit = {
-    eventStore.appendAll(events)
-    events
-      .flatMap((event: Event) => subscribers.map(subscriber => (event, 
subscriber)))
-      .foreach {case (event, subscriber) => handle(event, subscriber)}
+  def publish(events: Iterable[Event]): SMono[Void] = {
+    SMono(eventStore.appendAll(events))
+        .`then`(runHandlers(events, subscribers))
+
   }
 
+  def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): 
SMono[Void] = {
+    SFlux.fromIterable(events.flatMap((event: Event) => 
subscribers.map(subscriber => (event, subscriber))))
+      .flatMap(infos => runHandler(infos._1, infos._2))
+      .`then`()
+      .`then`(SMono.empty)
+  }
+
+  def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] = 
SMono.fromCallable(() => handle(event, subscriber)).`then`(SMono.empty)
+
   private def handle(event : Event, subscriber: Subscriber) : Unit = {
     try {
       subscriber.handle(event)
diff --git 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
index 2f3b71b..5cc52f2 100644
--- 
a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
+++ 
b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
@@ -19,6 +19,7 @@
 package org.apache.james.eventsourcing
 
 import org.apache.james.eventsourcing.eventstore.EventStore
+import org.reactivestreams.Publisher
 
 
 object EventSourcingSystem {
@@ -36,5 +37,5 @@ class EventSourcingSystem(handlers: Set[CommandHandler[_ <: 
Command]],
   private val eventBus = new EventBus(eventStore, subscribers)
   private val commandDispatcher = new CommandDispatcher(eventBus, handlers)
 
-  def dispatch(c: Command): Unit = commandDispatcher.dispatch(c)
+  def dispatch(c: Command): Publisher[Void] = commandDispatcher.dispatch(c)
 }
\ No newline at end of file
diff --git 
a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
 
b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
index 948867a..5555907 100644
--- 
a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
+++ 
b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
@@ -18,7 +18,8 @@
  ****************************************************************/
 package org.apache.james.eventsourcing
 
-import com.google.common.base.Splitter
+import java.util.{List => JavaList}
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 import org.junit.jupiter.api.Test
@@ -26,8 +27,12 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doThrow, mock, when}
 import org.mockito.internal.matchers.InstanceOf
 import org.mockito.internal.progress.ThreadSafeMockingProgress
+import org.reactivestreams.Publisher
+
+import com.google.common.base.Splitter
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.SMono
 
-import scala.collection.immutable.List
 import scala.jdk.CollectionConverters._
 
 object EventSourcingSystemTest {
@@ -59,7 +64,7 @@ trait EventSourcingSystemTest {
   def dispatchShouldApplyCommandHandlerThenCallSubscribers(eventStore: 
EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     
assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
   }
 
@@ -70,7 +75,7 @@ trait EventSourcingSystemTest {
       Set(simpleDispatcher(eventStore)),
       Set((_: Event) => throw new RuntimeException, subscriber),
       eventStore)
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     
assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
   }
 
@@ -78,13 +83,13 @@ trait EventSourcingSystemTest {
   def throwingStoreShouldNotLeadToPublishing() : Unit = {
     val eventStore = mock(classOf[EventStore])
     doThrow(new 
RuntimeException).when(eventStore).appendAll(EventSourcingSystemTest.anyScalaList)
-    when(eventStore.getEventsOfAggregate(any)).thenReturn(History.empty)
+    
when(eventStore.getEventsOfAggregate(any)).thenReturn(SMono.just(History.empty))
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(
       Set(simpleDispatcher(eventStore)),
       Set((_: Event) => throw new RuntimeException, subscriber),
       eventStore)
-    assertThatThrownBy(() => eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1)))
+    assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block())
       .isInstanceOf(classOf[RuntimeException])
     assertThat(subscriber.getData.asJava).isEmpty()
   }
@@ -93,17 +98,17 @@ trait EventSourcingSystemTest {
   def dispatchShouldApplyCommandHandlerThenStoreGeneratedEvents(eventStore: 
EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     val expectedEvent = TestEvent(EventId.first, 
EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1)
-    
assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent)
+    
assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent)
   }
 
   @Test
   def dispatchShouldCallSubscriberForSubsequentCommands(eventStore: 
EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block()
     
assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1,
 EventSourcingSystemTest.PAYLOAD_2)
   }
 
@@ -111,18 +116,18 @@ trait EventSourcingSystemTest {
   def dispatchShouldStoreEventsForSubsequentCommands(eventStore: EventStore) : 
Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(simpleDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))
-    eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_2))).block()
     val expectedEvent1 = TestEvent(EventId.first, 
EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_1)
     val expectedEvent2 = TestEvent(expectedEvent1.eventId.next, 
EventSourcingSystemTest.AGGREGATE_ID, EventSourcingSystemTest.PAYLOAD_2)
-    
assertThat(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID).getEventsJava).containsOnly(expectedEvent1,
 expectedEvent2)
+    
assertThat(SMono(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)).block().getEventsJava).containsOnly(expectedEvent1,
 expectedEvent2)
   }
 
   @Test
   def dispatcherShouldBeAbleToReturnSeveralEvents(eventStore: EventStore) : 
Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand("This 
is a test"))
+    Mono.from(eventSourcingSystem.dispatch(new 
EventSourcingSystemTest.MyCommand("This is a test"))).block()
     assertThat(subscriber.getData.asJava).containsExactly("This", "is", "a", 
"test")
   }
 
@@ -130,7 +135,7 @@ trait EventSourcingSystemTest {
   def unknownCommandsShouldBeIgnored(eventStore: EventStore) : Unit = {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new 
EventSourcingSystem(Set(wordCuttingDispatcher(eventStore)), Set(subscriber), 
eventStore)
-    assertThatThrownBy(() => eventSourcingSystem.dispatch(new Command() {}))
+    assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new 
Command() {})).block())
       .isInstanceOf(classOf[CommandDispatcher.UnknownCommandException])
   }
 
@@ -146,22 +151,22 @@ trait EventSourcingSystemTest {
   def simpleDispatcher(eventStore: EventStore) = new 
CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = 
classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): 
List[TestEvent] = {
-      val history = 
eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)
-      List(TestEvent(history.getNextEventId, 
EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload))
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): 
Publisher[JavaList[_ <: Event]] = {
+      
SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
+          .map(history => Seq(TestEvent(history.getNextEventId, 
EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)).asJava)
     }
   }
 
   def wordCuttingDispatcher(eventStore: EventStore) = new 
CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = 
classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): 
List[TestEvent] = {
-      val history = 
eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID)
-      val eventIdIncrementer = new 
EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId)
-      Splitter.on(" ").splitToList(myCommand.getPayload)
-        .asScala
-        .toList
-        .map((word: String) => TestEvent(eventIdIncrementer.next, 
EventSourcingSystemTest.AGGREGATE_ID, word))
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): 
Publisher[JavaList[_ <: Event]] = {
+      
SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
+        .map(history => new 
EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId))
+        .map(eventIdIncrementer => Splitter.on(" 
").splitToList(myCommand.getPayload)
+          .asScala
+          .toList
+          .map((word: String) => TestEvent(eventIdIncrementer.next, 
EventSourcingSystemTest.AGGREGATE_ID, word)).asJava)
     }
   }
 }
\ No newline at end of file
diff --git 
a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
 
b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 79b80bc..5440ab0 100644
--- 
a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ 
b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -19,7 +19,8 @@
 package org.apache.james.eventsourcing
 
 object Event {
-  def belongsToSameAggregate(events: List[_ <: Event]): Boolean = events
+  def belongsToSameAggregate(events: Iterable[_ <: Event]): Boolean = events
+    .toSeq
     .view
     .map(event => event.getAggregateId)
     .distinct
diff --git a/event-sourcing/event-store-api/pom.xml 
b/event-sourcing/event-store-api/pom.xml
index 44064ce..2bffd38 100644
--- a/event-sourcing/event-store-api/pom.xml
+++ b/event-sourcing/event-store-api/pom.xml
@@ -52,6 +52,15 @@
             <artifactId>guavate</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.reactivestreams</groupId>
+            <artifactId>reactive-streams</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
         </dependency>
diff --git 
a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
 
b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
index b90c314..fcfac47 100644
--- 
a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
+++ 
b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/EventStore.scala
@@ -22,20 +22,19 @@ import org.apache.james.eventsourcing.{AggregateId, Event}
 
 import scala.annotation.varargs
 import scala.jdk.CollectionConverters._
+import org.reactivestreams.Publisher
 
 trait EventStore {
-  def append(event: Event): Unit = appendAll(List(event))
+  def append(event: Event): Publisher[Void] = appendAll(List(event))
 
   @varargs
-  def appendAll(events: Event*): Unit = appendAll(events.toList)
-
-  def appendAll(events: java.util.List[Event]): Unit = 
appendAll(events.asScala.toList)
+  def appendAll(events: Event*): Publisher[Void] = appendAll(events.toList)
 
   /**
    * This method should check that no input event has an id already stored and 
throw otherwise
    * It should also check that all events belong to the same aggregate
    */
-  def appendAll(events: List[Event]): Unit
+  def appendAll(events: Iterable[Event]): Publisher[Void]
 
-  def getEventsOfAggregate(aggregateId: AggregateId): History
+  def getEventsOfAggregate(aggregateId: AggregateId): Publisher[History]
 }
\ No newline at end of file
diff --git 
a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
 
b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
index f519a9c..23f454b 100644
--- 
a/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
+++ 
b/event-sourcing/event-store-api/src/test/scala/org/apache/james/eventsourcing/eventstore/EventStoreContract.scala
@@ -22,6 +22,9 @@ import org.apache.james.eventsourcing.{EventId, 
TestAggregateId, TestEvent}
 import org.assertj.core.api.Assertions.{assertThat, assertThatCode, 
assertThatThrownBy}
 import org.junit.jupiter.api.Test
 
+import reactor.core.scala.publisher
+import reactor.core.scala.publisher.SMono
+
 object EventStoreContract {
   val AGGREGATE_1 = TestAggregateId(1)
   val AGGREGATE_2 = TestAggregateId(2)
@@ -38,35 +41,35 @@ trait EventStoreContract {
   def appendShouldThrowWhenEventFromSeveralAggregates(testee: EventStore) : 
Unit = {
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, 
"first")
     val event2 = TestEvent(event1.eventId.next, 
EventStoreContract.AGGREGATE_2, "second")
-    assertThatThrownBy(() => testee.appendAll(event1, event2))
+    assertThatThrownBy(() => SMono(testee.appendAll(event1, event2)).block())
       .isInstanceOf(classOf[IllegalArgumentException])
   }
 
   @Test
   def appendShouldDoNothingOnEmptyEventList(testee: EventStore) : Unit =
-    assertThatCode(() => testee.appendAll())
+    assertThatCode(() => SMono(testee.appendAll()).block())
       .doesNotThrowAnyException()
 
   @Test
   def appendShouldThrowWhenTryingToRewriteHistory(testee: EventStore) : Unit = 
{
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, 
"first")
-    testee.append(event1)
+    SMono(testee.append(event1)).block()
     val event2 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, 
"second")
     assertThatThrownBy(
-      () => testee.append(event2))
+      () => SMono(testee.append(event2)).block())
       .isInstanceOf(classOf[EventStoreFailedException])
   }
 
   @Test
   def getEventsOfAggregateShouldReturnEmptyHistoryWhenUnknown(testee: 
EventStore) : Unit =
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    
assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.empty)
 
   @Test
   def getEventsOfAggregateShouldReturnAppendedEvent(testee: EventStore) : Unit 
= {
     val event = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, 
"first")
-    testee.append(event)
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    SMono(testee.append(event)).block()
+    
assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.of(event))
   }
 
@@ -74,9 +77,9 @@ trait EventStoreContract {
   def getEventsOfAggregateShouldReturnAppendedEvents(testee: EventStore) : 
Unit = {
     val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, 
"first")
     val event2 = TestEvent(event1.eventId.next, 
EventStoreContract.AGGREGATE_1, "second")
-    testee.append(event1)
-    testee.append(event2)
-    assertThat(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1))
+    SMono(testee.append(event1)).block()
+    SMono(testee.append(event2)).block()
+    
assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
       .isEqualTo(History.of(event1, event2))
   }
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/pom.xml 
b/event-sourcing/event-store-cassandra/pom.xml
index 8bfd47a..e083ea4 100644
--- a/event-sourcing/event-store-cassandra/pom.xml
+++ b/event-sourcing/event-store-cassandra/pom.xml
@@ -80,8 +80,7 @@
         </dependency>
         <dependency>
             <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-scala-extensions_2.13</artifactId>
-            <version>0.5.0</version>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
         </dependency>
         <dependency>
             <groupId>net.javacrumbs.json-unit</groupId>
diff --git 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
index 6c41973..d9ab8ed 100644
--- 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
+++ 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
@@ -20,25 +20,35 @@ package org.apache.james.eventsourcing.eventstore.cassandra
 
 import com.google.common.base.Preconditions
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, 
EventStoreFailedException, History}
 import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends 
EventStore {
-  override def appendAll(events: List[Event]): Unit = {
+  override def appendAll(events: Iterable[Event]): Publisher[Void] = {
     if (events.nonEmpty) {
       doAppendAll(events)
+    } else {
+      SMono.empty
     }
   }
 
-  private def doAppendAll(events: List[Event]): Unit = {
+  private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
     Preconditions.checkArgument(Event.belongsToSameAggregate(events))
-    val success: Boolean = eventStoreDao.appendAll(events).block()
-    if (!success) {
-      throw EventStoreFailedException("Concurrent update to the EventStore 
detected")
-    }
+    eventStoreDao.appendAll(events)
+      .filter(success => success)
+      .single()
+      .onErrorMap({
+        case _: NoSuchElementException => 
EventStoreFailedException("Concurrent update to the EventStore detected")
+        case e => e
+      })
+      .`then`(SMono.empty)
   }
 
-  override def getEventsOfAggregate(aggregateId: AggregateId): History = {
-    eventStoreDao.getEventsOfAggregate(aggregateId).block()
+  override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] 
= {
+    eventStoreDao.getEventsOfAggregate(aggregateId)
   }
-}
\ No newline at end of file
+}
diff --git 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
index 95a9239..9fda3a4 100644
--- 
a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
+++ 
b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
@@ -50,7 +50,7 @@ class EventStoreDao @Inject() (val session: Session, val 
jsonEventSerializer: Js
       .where(QueryBuilder.eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID))))
   }
 
-  private[cassandra] def appendAll(events: List[Event]): SMono[Boolean] = {
+  private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] = {
     val batch: BatchStatement = new BatchStatement
     events.foreach((event: Event) => batch.add(insertEvent(event)))
     SMono(cassandraAsyncExecutor.executeReturnApplied(batch))
@@ -74,7 +74,7 @@ class EventStoreDao @Inject() (val session: Session, val 
jsonEventSerializer: Js
     val listEvents: SMono[List[Event]] = events.collectSeq()
       .map(_.toList)
 
-    listEvents.map(History.of)
+    listEvents.map(History.of(_))
   }
 
   private def toEvent(row: Row): Event = {
diff --git a/event-sourcing/event-store-memory/pom.xml 
b/event-sourcing/event-store-memory/pom.xml
index d4720c8..8e58626 100644
--- a/event-sourcing/event-store-memory/pom.xml
+++ b/event-sourcing/event-store-memory/pom.xml
@@ -65,6 +65,10 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
             <scope>test</scope>
diff --git 
a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
 
b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
index c4f9621..caf254a 100644
--- 
a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
+++ 
b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala
@@ -23,34 +23,44 @@ import java.util.concurrent.atomic.AtomicReference
 import com.google.common.base.Preconditions
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 class InMemoryEventStore() extends EventStore {
   private val storeRef: AtomicReference[Map[AggregateId, History]] =
     new AtomicReference(Map().withDefault(_ => History.empty))
 
-  override def appendAll(events: List[Event]): Unit = if (events.nonEmpty) 
doAppendAll(events)
+  override def appendAll(events: Iterable[Event]): Publisher[Void] = {
+    if (events.nonEmpty) {
+      SMono.fromCallable(() => doAppendAll(events)).`then`()
+    } else {
+      SMono.empty
+    }
+  }
 
-  override def getEventsOfAggregate(aggregateId: AggregateId): History = {
+  override def getEventsOfAggregate(aggregateId: AggregateId): 
Publisher[History] = {
     Preconditions.checkNotNull(aggregateId)
-    storeRef.get()(aggregateId)
+    SMono.fromCallable(() => storeRef.get()(aggregateId))
   }
 
-  private def doAppendAll(events: Seq[Event]): Unit = {
+  private def doAppendAll(events: Iterable[Event]): Boolean = {
     val aggregateId: AggregateId = getAggregateId(events)
     storeRef.updateAndGet(store => {
       val updatedHistory = History.of(store(aggregateId).getEvents ++ events)
       store.updated(aggregateId, updatedHistory)
     })
+    true
   }
 
-  private def getAggregateId(events: Seq[Event]): AggregateId = {
+  private def getAggregateId(events: Iterable[Event]): AggregateId = {
     Preconditions.checkArgument(events.nonEmpty)
     val aggregateId = events.head.getAggregateId
     Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events))
     aggregateId
   }
 
-  private def belongsToSameAggregate(aggregateId: AggregateId, events: 
Seq[Event]) =
+  private def belongsToSameAggregate(aggregateId: AggregateId, events: 
Iterable[Event]) =
     events.forall(_.getAggregateId.equals(aggregateId))
 
 }
\ No newline at end of file
diff --git 
a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
 
b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
index 83f09a4..551644c 100644
--- 
a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
+++ 
b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
@@ -21,14 +21,16 @@ package org.apache.james.mailbox.quota.mailing.commands;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.eventstore.History;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
 import 
org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
 import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds;
+import org.reactivestreams.Publisher;
 
-public class DetectThresholdCrossingHandler implements 
CommandHandlerJava<DetectThresholdCrossing> {
+import reactor.core.publisher.Mono;
+
+public class DetectThresholdCrossingHandler implements 
CommandHandler<DetectThresholdCrossing> {
 
     private final EventStore eventStore;
     private final QuotaMailingListenerConfiguration 
quotaMailingListenerConfiguration;
@@ -41,15 +43,15 @@ public class DetectThresholdCrossingHandler implements 
CommandHandlerJava<Detect
     }
 
     @Override
-    public List<? extends Event> handleJava(DetectThresholdCrossing command) {
+    public Publisher<List<? extends Event>> handle(DetectThresholdCrossing 
command) {
         return loadAggregate(command)
-            .detectThresholdCrossing(quotaMailingListenerConfiguration, 
command);
+            .map(aggregate -> 
aggregate.detectThresholdCrossing(quotaMailingListenerConfiguration, command));
     }
 
-    private UserQuotaThresholds loadAggregate(DetectThresholdCrossing command) 
{
+    private Mono<UserQuotaThresholds> loadAggregate(DetectThresholdCrossing 
command) {
         UserQuotaThresholds.Id aggregateId = 
UserQuotaThresholds.Id.from(command.getUsername(), listenerName);
-        History history = eventStore.getEventsOfAggregate(aggregateId);
-        return UserQuotaThresholds.fromEvents(aggregateId, history);
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> UserQuotaThresholds.fromEvents(aggregateId, 
history));
     }
 
     @Override
diff --git 
a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
 
b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
index d4e7d5d..577e2bc 100644
--- 
a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
+++ 
b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
@@ -42,6 +42,8 @@ import org.apache.mailet.MailetContext;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class QuotaThresholdCrossingListener implements 
MailboxListener.GroupMailboxListener {
     public static class QuotaThresholdCrossingListenerGroup extends Group {
 
@@ -74,12 +76,12 @@ public class QuotaThresholdCrossingListener implements 
MailboxListener.GroupMail
     @Override
     public void event(Event event) {
         if (event instanceof QuotaUsageUpdatedEvent) {
-            handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) event);
+            handleEvent(event.getUsername(), (QuotaUsageUpdatedEvent) 
event).block();
         }
     }
 
-    private void handleEvent(Username username, QuotaUsageUpdatedEvent event) {
-        eventSourcingSystem.dispatch(
-            new DetectThresholdCrossing(username, event.getCountQuota(), 
event.getSizeQuota(), event.getInstant()));
+    private Mono<Void> handleEvent(Username username, QuotaUsageUpdatedEvent 
event) {
+        return Mono.from(eventSourcingSystem.dispatch(
+            new DetectThresholdCrossing(username, event.getCountQuota(), 
event.getSizeQuota(), event.getInstant())));
     }
 }
diff --git a/pom.xml b/pom.xml
index 03ad478..ac8afa0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2184,6 +2184,11 @@
                 <version>${feign-form.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.projectreactor</groupId>
+                <artifactId>reactor-scala-extensions_${scala.base}</artifactId>
+                <version>0.5.0</version>
+            </dependency>
+            <dependency>
                 <groupId>io.netty</groupId>
                 <artifactId>netty</artifactId>
                 <version>${netty.version}</version>
diff --git 
a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
 
b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
index ccd7223..e18a5c8 100644
--- 
a/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
+++ 
b/server/data/data-api/src/main/java/org/apache/james/dlp/api/DLPConfigurationLoader.java
@@ -20,7 +20,8 @@
 package org.apache.james.dlp.api;
 
 import org.apache.james.core.Domain;
+import org.reactivestreams.Publisher;
 
 public interface DLPConfigurationLoader {
-    DLPRules list(Domain domain);
+    Publisher<DLPRules> list(Domain domain);
 }
diff --git 
a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
 
b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
index 6ed3b0d..9f3b9d6 100644
--- 
a/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
+++ 
b/server/data/data-api/src/test/java/org/apache/james/dlp/api/DLPConfigurationStoreContract.java
@@ -30,6 +30,7 @@ import org.apache.james.core.Domain;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.ImmutableList;
+import reactor.core.publisher.Mono;
 
 public interface DLPConfigurationStoreContract {
 
@@ -37,7 +38,7 @@ public interface DLPConfigurationStoreContract {
 
     @Test
     default void listShouldReturnEmptyWhenNone(DLPConfigurationStore 
dlpConfigurationStore) {
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST))
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block())
             .isEmpty();
     }
 
@@ -45,7 +46,7 @@ public interface DLPConfigurationStoreContract {
     default void listShouldReturnExistingEntries(DLPConfigurationStore 
dlpConfigurationStore) {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, 
RULE_2);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE,
 RULE_2);
     }
 
     @Test
@@ -53,7 +54,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(OTHER_DOMAIN, RULE_2);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -62,7 +63,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.clear(Domain.LOCALHOST);
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty();
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty();
     }
 
     @Test
@@ -78,7 +79,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.clear(Domain.LOCALHOST);
 
-        
assertThat(dlpConfigurationStore.list(OTHER_DOMAIN)).containsOnly(RULE_2);
+        
assertThat(Mono.from(dlpConfigurationStore.list(OTHER_DOMAIN)).block()).containsOnly(RULE_2);
     }
 
     @Test
@@ -89,7 +90,7 @@ public interface DLPConfigurationStoreContract {
 
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -97,7 +98,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -105,7 +106,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE, RULE_2);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE, 
RULE_2);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE,
 RULE_2);
     }
 
     @Test
@@ -119,7 +120,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE_UPDATED);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE_UPDATED);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE_UPDATED);
     }
 
     @Test
@@ -127,7 +128,7 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
 
-        
assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).containsOnly(RULE);
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).containsOnly(RULE);
     }
 
     @Test
@@ -135,6 +136,6 @@ public interface DLPConfigurationStoreContract {
         dlpConfigurationStore.store(Domain.LOCALHOST, RULE);
         dlpConfigurationStore.store(Domain.LOCALHOST, new 
DLPRules(ImmutableList.of()));
 
-        assertThat(dlpConfigurationStore.list(Domain.LOCALHOST)).isEmpty();
+        
assertThat(Mono.from(dlpConfigurationStore.list(Domain.LOCALHOST)).block()).isEmpty();
     }
 }
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
index c8aec6a..6e780f9 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/FilteringManagement.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.james.core.Username;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableList;
 
@@ -38,6 +39,6 @@ public interface FilteringManagement {
         defineRulesForUser(username, ImmutableList.of());
     }
 
-    List<Rule> listRulesForUser(Username username);
+    Publisher<Rule> listRulesForUser(Username username);
 
 }
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
index 4931e07..9378a9f 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
@@ -21,11 +21,14 @@ package org.apache.james.jmap.api.filtering.impl;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class DefineRulesCommandHandler implements 
CommandHandlerJava<DefineRulesCommand> {
+import reactor.core.publisher.Mono;
+
+public class DefineRulesCommandHandler implements 
CommandHandler<DefineRulesCommand> {
 
     private final EventStore eventStore;
 
@@ -39,14 +42,11 @@ public class DefineRulesCommandHandler implements 
CommandHandlerJava<DefineRules
     }
 
     @Override
-    public List<? extends Event> handleJava(DefineRulesCommand storeCommand) {
+    public Publisher<List<? extends Event>> handle(DefineRulesCommand 
storeCommand) {
         FilteringAggregateId aggregateId = new 
FilteringAggregateId(storeCommand.getUsername());
 
-        return FilteringAggregate
-            .load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .defineRules(storeCommand.getRules());
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+        .map(history -> FilteringAggregate.load(aggregateId, 
history).defineRules(storeCommand.getRules()));
     }
 
 }
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index d8dad9f..429cc79 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -29,10 +29,14 @@ import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.jmap.api.filtering.FilteringManagement;
 import org.apache.james.jmap.api.filtering.Rule;
+import org.reactivestreams.Publisher;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EventSourcingFilteringManagement implements FilteringManagement {
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = 
ImmutableSet.of();
@@ -51,19 +55,16 @@ public class EventSourcingFilteringManagement implements 
FilteringManagement {
 
     @Override
     public void defineRulesForUser(Username username, List<Rule> rules) {
-        eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules));
+        Mono.from(eventSourcingSystem.dispatch(new 
DefineRulesCommand(username, rules))).block();
     }
 
     @Override
-    public List<Rule> listRulesForUser(Username username) {
+    public Publisher<Rule> listRulesForUser(Username username) {
         Preconditions.checkNotNull(username);
 
         FilteringAggregateId aggregateId = new FilteringAggregateId(username);
 
-        return FilteringAggregate
-            .load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .listRules();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .flatMapMany(history -> 
Flux.fromIterable(FilteringAggregate.load(aggregateId, history).listRules()));
     }
 }
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
index d959f23..7a355d4 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
@@ -35,6 +35,8 @@ import org.apache.james.eventsourcing.eventstore.EventStore;
 import 
org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.junit.jupiter.api.Test;
 
+import reactor.core.publisher.Flux;
+
 public interface FilteringManagementContract {
 
     String BART_SIMPSON_CARTOON = "[email protected]";
@@ -46,7 +48,7 @@ public interface FilteringManagementContract {
 
     @Test
     default void listingRulesForUnknownUserShouldReturnEmptyList(EventStore 
eventStore) {
-        
assertThat(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME))
+        
assertThat(Flux.from(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME)).toStream())
             .isEmpty();
     }
 
@@ -63,7 +65,7 @@ public interface FilteringManagementContract {
 
         testee.defineRulesForUser(USERNAME, RULE_1, RULE_2);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_1, RULE_2);
     }
 
@@ -74,7 +76,7 @@ public interface FilteringManagementContract {
         testee.defineRulesForUser(USERNAME, RULE_1, RULE_2);
         testee.defineRulesForUser(USERNAME, RULE_2, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_2, RULE_1);
     }
 
@@ -108,7 +110,7 @@ public interface FilteringManagementContract {
         FilteringManagement testee = 
instantiateFilteringManagement(eventStore);
         testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_3, RULE_2, RULE_1);
     }
 
@@ -119,7 +121,7 @@ public interface FilteringManagementContract {
         testee.defineRulesForUser(USERNAME, RULE_3, RULE_2, RULE_1);
         testee.clearRulesForUser(USERNAME);
 
-        assertThat(testee.listRulesForUser(USERNAME)).isEmpty();
+        
assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream()).isEmpty();
     }
 
     @Test
@@ -128,7 +130,7 @@ public interface FilteringManagementContract {
 
         testee.defineRulesForUser(USERNAME, RULE_FROM, RULE_RECIPIENT, 
RULE_SUBJECT, RULE_TO, RULE_1);
 
-        assertThat(testee.listRulesForUser(USERNAME))
+        assertThat(Flux.from(testee.listRulesForUser(USERNAME)).toStream())
             .containsExactly(RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, 
RULE_1);
     }
 
diff --git 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
index 957a86d..48d88ba 100644
--- 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
+++ 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java
@@ -37,10 +37,13 @@ import 
org.apache.james.dlp.eventsourcing.commands.StoreCommandHandler;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.util.streams.Iterables;
+import org.reactivestreams.Publisher;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class EventSourcingDLPConfigurationStore implements 
DLPConfigurationStore {
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = 
ImmutableSet.of();
@@ -60,29 +63,29 @@ public class EventSourcingDLPConfigurationStore implements 
DLPConfigurationStore
     }
 
     @Override
-    public DLPRules list(Domain domain) {
+    public Publisher<DLPRules> list(Domain domain) {
 
         DLPAggregateId aggregateId = new DLPAggregateId(domain);
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .retrieveRules();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> DLPDomainConfiguration.load(aggregateId, 
history).retrieveRules());
     }
 
     @Override
     public void store(Domain domain, DLPRules rules) {
-        eventSourcingSystem.dispatch(new StoreCommand(domain, rules));
+        Mono.from(eventSourcingSystem.dispatch(new StoreCommand(domain, 
rules))).block();
     }
 
     @Override
     public void clear(Domain domain) {
-        eventSourcingSystem.dispatch(new ClearCommand(domain));
+        Mono.from(eventSourcingSystem.dispatch(new 
ClearCommand(domain))).block();
     }
 
     @Override
     public Optional<DLPConfigurationItem> fetch(Domain domain, Id ruleId) {
-        return Iterables.toStream(list(domain))
+        return Mono.from(list(domain))
+                .flatMapMany(rules -> Flux.fromIterable(rules.getItems()))
+                .toStream()
                 .filter((DLPConfigurationItem item) -> 
item.getId().equals(ruleId))
                 .findFirst();
     }
diff --git 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
index c59f3ce..e9ed63e 100644
--- 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
+++ 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> {
+import reactor.core.publisher.Mono;
+
+public class ClearCommandHandler implements CommandHandler<ClearCommand> {
 
     private final EventStore eventStore;
 
@@ -41,12 +44,10 @@ public class ClearCommandHandler implements 
CommandHandlerJava<ClearCommand> {
     }
 
     @Override
-    public List<? extends Event> handleJava(ClearCommand clearCommand) {
+    public Publisher<List<? extends Event>> handle(ClearCommand clearCommand) {
         DLPAggregateId aggregateId = new 
DLPAggregateId(clearCommand.getDomain());
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .clear();
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+            .map(history -> DLPDomainConfiguration.load(aggregateId, 
history).clear());
     }
 }
diff --git 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
index e24bd83..0d628d5 100644
--- 
a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
+++ 
b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
@@ -23,11 +23,14 @@ import java.util.List;
 
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> {
+import reactor.core.publisher.Mono;
+
+public class StoreCommandHandler implements CommandHandler<StoreCommand> {
 
     private final EventStore eventStore;
 
@@ -41,12 +44,10 @@ public class StoreCommandHandler implements 
CommandHandlerJava<StoreCommand> {
     }
 
     @Override
-    public List<? extends Event> handleJava(StoreCommand storeCommand) {
+    public Publisher<List<? extends Event>> handle(StoreCommand storeCommand) {
         DLPAggregateId aggregateId = new 
DLPAggregateId(storeCommand.getDomain());
 
-        return DLPDomainConfiguration.load(
-                aggregateId,
-                eventStore.getEventsOfAggregate(aggregateId))
-            .store(storeCommand.getRules());
+        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+        .map(history -> DLPDomainConfiguration.load(aggregateId, 
history).store(storeCommand.getRules()));
     }
 }
diff --git 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
index 08e3c4d..3d1f95f 100644
--- 
a/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
+++ 
b/server/mailet/mailets/src/main/java/org/apache/james/transport/matchers/dlp/DlpRulesLoader.java
@@ -25,6 +25,8 @@ import org.apache.james.core.Domain;
 import org.apache.james.dlp.api.DLPConfigurationStore;
 import org.apache.james.dlp.api.DLPRules;
 
+import reactor.core.publisher.Mono;
+
 public interface DlpRulesLoader {
 
     DlpDomainRules load(Domain domain);
@@ -40,7 +42,7 @@ public interface DlpRulesLoader {
 
         @Override
         public DlpDomainRules load(Domain domain) {
-          return toRules(configurationStore.list(domain));
+          return toRules(Mono.from(configurationStore.list(domain)).block());
         }
 
         private DlpDomainRules toRules(DLPRules items) {
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
index 5bb238c..618d862 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/draft/methods/GetFilterMethod.java
@@ -19,14 +19,12 @@
 
 package org.apache.james.jmap.draft.methods;
 
-import java.util.List;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.filtering.FilteringManagement;
-import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.draft.model.GetFilterRequest;
 import org.apache.james.jmap.draft.model.GetFilterResponse;
 import org.apache.james.jmap.draft.model.MethodCallId;
@@ -37,8 +35,12 @@ import org.apache.james.util.MDCBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class GetFilterMethod implements Method {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(GetFilterMethod.class);
 
@@ -93,17 +95,18 @@ public class GetFilterMethod implements Method {
     }
 
     private Stream<JmapResponse> retrieveFilter(MethodCallId methodCallId, 
Username username) {
-        List<Rule> rules = filteringManagement.listRulesForUser(username);
-
-        GetFilterResponse getFilterResponse = GetFilterResponse.builder()
-            .rules(rules)
-            .build();
-
-        return Stream.of(JmapResponse.builder()
-            .methodCallId(methodCallId)
-            .response(getFilterResponse)
-            .responseName(RESPONSE_NAME)
-            .build());
+        return Flux.from(filteringManagement.listRulesForUser(username))
+            .collect(Guavate.toImmutableList())
+            .map(rules -> GetFilterResponse.builder()
+                .rules(rules)
+                .build())
+            .map(getFilterResponse -> JmapResponse.builder()
+                .methodCallId(methodCallId)
+                .response(getFilterResponse)
+                .responseName(RESPONSE_NAME)
+                .build())
+            .flatMapMany(Mono::just)
+            .toStream();
     }
 
     private JmapResponse unKnownError(MethodCallId methodCallId) {
diff --git 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
index 0a0d58f..efd5fc7 100644
--- 
a/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
+++ 
b/server/protocols/jmap-draft/src/main/java/org/apache/james/jmap/mailet/filter/JMAPFiltering.java
@@ -36,6 +36,10 @@ import org.apache.mailet.base.GenericMailet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.steveash.guavate.Guavate;
+
+import reactor.core.publisher.Flux;
+
 /**
  * Mailet for applying JMAP filtering to incoming email.
  *
@@ -77,7 +81,9 @@ public class JMAPFiltering extends GenericMailet {
     }
 
     private void findFirstApplicableRule(Username username, Mail mail) {
-        List<Rule> filteringRules = 
filteringManagement.listRulesForUser(username);
+        List<Rule> filteringRules = 
Flux.from(filteringManagement.listRulesForUser(username))
+            .collect(Guavate.toImmutableList())
+            .block();
         RuleMatcher ruleMatcher = new RuleMatcher(filteringRules);
         Stream<Rule> matchingRules = ruleMatcher.findApplicableRules(mail);
 
diff --git 
a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
 
b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
index 9dc3e32..e197a5a 100644
--- 
a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
+++ 
b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/DLPConfigurationRoutes.java
@@ -55,6 +55,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import reactor.core.publisher.Mono;
 import spark.HaltException;
 import spark.Request;
 import spark.Service;
@@ -166,7 +167,7 @@ public class DLPConfigurationRoutes implements Routes {
     public void defineList(Service service) {
         service.get(SPECIFIC_DLP_RULE_DOMAIN, (request, response) -> {
             Domain senderDomain = parseDomain(request);
-            DLPRules dlpConfigurations = 
dlpConfigurationStore.list(senderDomain);
+            DLPRules dlpConfigurations = 
Mono.from(dlpConfigurationStore.list(senderDomain)).block();
 
             DLPConfigurationDTO dto = 
DLPConfigurationDTO.toDTO(dlpConfigurations);
             response.status(HttpStatus.OK_200);
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
index a9ca76a..ba2c621 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 
-import java.util.Optional;
-
 import javax.inject.Inject;
 
 import org.apache.james.eventsourcing.AggregateId;
@@ -32,6 +30,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class EventsourcingConfigurationManagement {
     static final String CONFIGURATION_AGGREGATE_KEY = 
"CassandraMailQueueViewConfiguration";
     static final AggregateId CONFIGURATION_AGGREGATE_ID = () -> 
CONFIGURATION_AGGREGATE_KEY;
@@ -51,15 +51,16 @@ public class EventsourcingConfigurationManagement {
     }
 
     @VisibleForTesting
-    Optional<CassandraMailQueueViewConfiguration> load() {
-        return ConfigurationAggregate
-            .load(CONFIGURATION_AGGREGATE_ID, 
eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
-            .getCurrentConfiguration();
+    Mono<CassandraMailQueueViewConfiguration> load() {
+        return 
Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
+            .flatMap(history -> Mono.justOrEmpty(ConfigurationAggregate
+                .load(CONFIGURATION_AGGREGATE_ID, history)
+                .getCurrentConfiguration()));
     }
 
     public void registerConfiguration(CassandraMailQueueViewConfiguration 
newConfiguration) {
         Preconditions.checkNotNull(newConfiguration);
 
-        eventSourcingSystem.dispatch(new 
RegisterConfigurationCommand(newConfiguration, CONFIGURATION_AGGREGATE_ID));
+        Mono.from(eventSourcingSystem.dispatch(new 
RegisterConfigurationCommand(newConfiguration, 
CONFIGURATION_AGGREGATE_ID))).block();
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
index 33c61e9..bf54102 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
@@ -21,11 +21,14 @@ package 
org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 
 import java.util.List;
 
+import org.apache.james.eventsourcing.CommandHandler;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.eventstore.EventStore;
-import org.apache.james.eventsourcing.javaapi.CommandHandlerJava;
+import org.reactivestreams.Publisher;
 
-class RegisterConfigurationCommandHandler implements 
CommandHandlerJava<RegisterConfigurationCommand> {
+import reactor.core.publisher.Mono;
+
+class RegisterConfigurationCommandHandler implements 
CommandHandler<RegisterConfigurationCommand> {
 
     private final EventStore eventStore;
 
@@ -39,9 +42,10 @@ class RegisterConfigurationCommandHandler implements 
CommandHandlerJava<Register
     }
 
     @Override
-    public List<? extends Event> handleJava(RegisterConfigurationCommand 
command) {
-        return ConfigurationAggregate
-            .load(command.getAggregateId(), 
eventStore.getEventsOfAggregate(command.getAggregateId()))
-            .registerConfiguration(command.getConfiguration());
+    public Publisher<List<? extends Event>> 
handle(RegisterConfigurationCommand command) {
+        return 
Mono.from(eventStore.getEventsOfAggregate(command.getAggregateId()))
+            .map(history -> ConfigurationAggregate
+                .load(command.getAggregateId(), history)
+                .registerConfiguration(command.getConfiguration()));
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
index 1518a1d..5d191bf 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java
@@ -31,6 +31,8 @@ import 
org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+
 class EventsourcingConfigurationManagementTest {
 
     @RegisterExtension
@@ -69,7 +71,7 @@ class EventsourcingConfigurationManagementTest {
     void loadShouldReturnEmptyIfNoConfigurationStored(EventStore eventStore) {
         EventsourcingConfigurationManagement testee = 
createConfigurationManagement(eventStore);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .isEmpty();
     }
 
@@ -80,7 +82,7 @@ class EventsourcingConfigurationManagementTest {
         testee.registerConfiguration(SECOND_CONFIGURATION);
         testee.registerConfiguration(THIRD_CONFIGURATION);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(THIRD_CONFIGURATION);
     }
 
@@ -125,7 +127,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(increaseOneBucketConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(increaseOneBucketConfiguration);
     }
 
@@ -135,7 +137,7 @@ class EventsourcingConfigurationManagementTest {
 
         testee.registerConfiguration(FIRST_CONFIGURATION);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(FIRST_CONFIGURATION);
     }
 
@@ -189,7 +191,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -209,7 +211,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -229,7 +231,7 @@ class EventsourcingConfigurationManagementTest {
             .build();
         testee.registerConfiguration(decreaseTwiceSliceWindowConfiguration);
 
-        assertThat(testee.load())
+        assertThat(Mono.from(testee.load()).blockOptional())
             .contains(decreaseTwiceSliceWindowConfiguration);
     }
 
@@ -239,7 +241,8 @@ class EventsourcingConfigurationManagementTest {
         testee.registerConfiguration(FIRST_CONFIGURATION);
         testee.registerConfiguration(FIRST_CONFIGURATION);
 
-        assertThat(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID)
+        
assertThat(Mono.from(eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID))
+            .block()
             .getEventsJava())
             .hasSize(1);
     }
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index de68b24..dbdce52 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -140,22 +140,22 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private Mono<Task> deserialize(String json, TaskId taskId) {
         return Mono.fromCallable(() -> taskSerializer.deserialize(json))
-            .doOnError(error -> {
+            .onErrorResume(error -> {
                 String errorMessage = String.format("Unable to deserialize 
submitted Task %s", taskId.asString());
                 LOGGER.error(errorMessage, error);
-                worker.fail(taskId, Optional.empty(), errorMessage, error);
-            })
-            .onErrorResume(error -> Mono.empty());
+                return Mono.from(worker.fail(taskId, Optional.empty(), 
errorMessage, error))
+                    .then(Mono.empty());
+            });
     }
 
     private Mono<Task.Result> executeOnWorker(TaskId taskId, Task task) {
         return worker.executeTask(new TaskWithId(taskId, task))
-            .doOnError(error -> {
+            .onErrorResume(error -> {
                 String errorMessage = String.format("Unable to run submitted 
Task %s", taskId.asString());
                 LOGGER.warn(errorMessage, error);
-                worker.fail(taskId, task.details(), errorMessage, error);
-            })
-            .onErrorResume(error -> Mono.empty());
+                return Mono.from(worker.fail(taskId, task.details(), 
errorMessage, error))
+                    .then(Mono.empty());
+            });
     }
 
     private void listenToCancelRequests() {
diff --git 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
index 93d8f75..71e6214 100644
--- 
a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
+++ 
b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/ImmediateWorker.java
@@ -27,6 +27,7 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManagerWorker;
 import org.apache.james.task.TaskWithId;
+import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
@@ -39,8 +40,8 @@ class ImmediateWorker implements TaskManagerWorker {
 
     @Override
     public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
-        tasks.add(taskWithId);
-        return Mono.fromCallable(() -> taskWithId.getTask().run())
+        return Mono.fromRunnable(() -> tasks.add(taskWithId))
+            .then(Mono.fromCallable(() -> taskWithId.getTask().run()))
             .doOnNext(result -> results.add(result))
             .subscribeOn(Schedulers.elastic());
     }
@@ -50,8 +51,9 @@ class ImmediateWorker implements TaskManagerWorker {
     }
 
     @Override
-    public void fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason) {
-        failedTasks.add(taskId);
+    public Publisher<Void> fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason) {
+        return Mono.fromRunnable(() -> failedTasks.add(taskId))
+            .then();
     }
 
     @Override
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 3d2da18..8d983b0 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -30,10 +30,13 @@ import java.util.function.Consumer;
 import javax.annotation.PreDestroy;
 import javax.inject.Inject;
 
+import org.reactivestreams.Publisher;
+
 import com.github.steveash.guavate.Guavate;
 import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
 public class MemoryTaskManager implements TaskManager {
@@ -54,40 +57,41 @@ public class MemoryTaskManager implements TaskManager {
         }
 
         @Override
-        public void started(TaskId taskId) {
-            updaterFactory.apply(taskId).accept(details -> 
details.started(hostname));
+        public Publisher<Void> started(TaskId taskId) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.started(hostname)));
         }
 
         @Override
-        public void completed(TaskId taskId, Task.Result result, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> details.completed(additionalInformation));
+        public Publisher<Void> completed(TaskId taskId, Task.Result result, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.completed(additionalInformation)));
         }
 
         @Override
-        public void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable t) {
-            failed(taskId, additionalInformation);
+        public Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable t) {
+            return failed(taskId, additionalInformation);
         }
 
         @Override
-        public void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
Throwable t) {
-            failed(taskId, additionalInformation);
+        public Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
Throwable t) {
+            return failed(taskId, additionalInformation);
          }
 
         @Override
-        public void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> details.failed(additionalInformation));
+        public Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> details.failed(additionalInformation)));
         }
 
         @Override
-        public void cancelled(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
-            updaterFactory.apply(taskId)
-                .accept(details -> 
details.cancelEffectively(additionalInformation));
+        public Publisher<Void> cancelled(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation) {
+            return Mono.fromRunnable(() -> updaterFactory.apply(taskId)
+                .accept(details -> 
details.cancelEffectively(additionalInformation)));
         }
 
         @Override
-        public void updated(TaskId taskId, 
TaskExecutionDetails.AdditionalInformation additionalInformation) {
+        public Publisher<Void> updated(TaskId taskId, 
TaskExecutionDetails.AdditionalInformation additionalInformation) {
             //The memory task manager doesn't use polling to update its 
additionalInformation.
             throw new IllegalStateException();
         }
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 9bdad89..b024c4c 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.concurrent.NamedThreadFactory;
+import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,20 +70,20 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
             return Mono.using(
                 () -> pollAdditionalInformation(taskWithId).subscribe(),
                 ignored -> Mono.fromFuture(future)
-                    .doOnError(exception -> handleExecutionError(taskWithId, 
listener, exception))
-                    .onErrorReturn(Task.Result.PARTIAL),
+                    .onErrorResume(exception -> 
Mono.from(handleExecutionError(taskWithId, listener, exception))
+                            .thenReturn(Task.Result.PARTIAL)),
                 Disposable::dispose);
         } else {
-            listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
-            return Mono.empty();
+            return Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details()))
+                .then(Mono.empty());
         }
     }
 
-    private void handleExecutionError(TaskWithId taskWithId, Listener 
listener, Throwable exception) {
+    private Publisher<Void> handleExecutionError(TaskWithId taskWithId, 
Listener listener, Throwable exception) {
         if (exception instanceof CancellationException) {
-            listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
+            return listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
         } else {
-            listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), exception);
+            return listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), exception);
         }
     }
 
@@ -91,7 +92,7 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
             .delayElement(pollingInterval, Schedulers.elastic())
             .repeat()
             .<TaskExecutionDetails.AdditionalInformation>handle((maybeDetails, 
sink) -> maybeDetails.ifPresent(sink::next))
-            .doOnNext(information -> listener.updated(taskWithId.getId(), 
information));
+            .flatMap(information -> 
Mono.from(listener.updated(taskWithId.getId(), 
information)).thenReturn(information));
     }
 
 
@@ -101,27 +102,27 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
                 .addContext(Task.TASK_ID, taskWithId.getId())
                 .addContext(Task.TASK_TYPE, taskWithId.getTask().type())
                 .addContext(Task.TASK_DETAILS, taskWithId.getTask().details()),
-            () -> run(taskWithId, listener));
+            () -> run(taskWithId, listener).block());
     }
 
-    private Task.Result run(TaskWithId taskWithId, Listener listener) {
-        listener.started(taskWithId.getId());
-        try {
-            return taskWithId.getTask()
-                .run()
-                .onComplete(result -> listener.completed(taskWithId.getId(), 
result, taskWithId.getTask().details()))
-                .onFailure(() -> {
-                    LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
-                    listener.failed(taskWithId.getId(), 
taskWithId.getTask().details());
-                });
-        } catch (InterruptedException e) {
-            listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details());
-            return Task.Result.PARTIAL;
-        } catch (Exception e) {
-            LOGGER.error("Error while running task {}", taskWithId.getId(), e);
-            listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), e);
-            return Task.Result.PARTIAL;
-        }
+    private Mono<Task.Result> run(TaskWithId taskWithId, Listener listener) {
+        return Mono.from(listener.started(taskWithId.getId()))
+            .then(Mono.fromCallable(() -> runTask(taskWithId, listener)))
+            .onErrorResume(InterruptedException.class, e -> 
Mono.from(listener.cancelled(taskWithId.getId(), 
taskWithId.getTask().details())).thenReturn(Task.Result.PARTIAL))
+            .onErrorResume(Exception.class, e -> {
+                LOGGER.error("Error while running task {}", 
taskWithId.getId(), e);
+                return Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details(), e)).thenReturn(Task.Result.PARTIAL);
+            });
+    }
+
+    private Task.Result runTask(TaskWithId taskWithId, Listener listener) 
throws InterruptedException {
+        return taskWithId.getTask()
+            .run()
+            .onComplete(result -> 
Mono.from(listener.completed(taskWithId.getId(), result, 
taskWithId.getTask().details())).block())
+            .onFailure(() -> {
+                LOGGER.error("Task was partially performed. Check logs for 
more details. Taskid : " + taskWithId.getId());
+                Mono.from(listener.failed(taskWithId.getId(), 
taskWithId.getTask().details())).block();
+            });
     }
 
     @Override
@@ -133,8 +134,8 @@ public class SerialTaskManagerWorker implements 
TaskManagerWorker {
     }
 
     @Override
-    public void fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason) {
-        listener.failed(taskId, additionalInformation, errorMessage, reason);
+    public Publisher<Void> fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason) {
+        return listener.failed(taskId, additionalInformation, errorMessage, 
reason);
     }
 
     @Override
diff --git 
a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
 
b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
index c6475e0..c29069e 100644
--- 
a/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ 
b/server/task/task-memory/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -21,29 +21,31 @@ package org.apache.james.task;
 import java.io.Closeable;
 import java.util.Optional;
 
+import org.reactivestreams.Publisher;
+
 import reactor.core.publisher.Mono;
 
 public interface TaskManagerWorker extends Closeable {
 
     interface Listener {
-        void started(TaskId taskId);
+        Publisher<Void> started(TaskId taskId);
 
-        void completed(TaskId taskId, Task.Result result, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> completed(TaskId taskId, Task.Result result, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable t);
+        Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable t);
 
-        void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
Throwable t);
+        Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
Throwable t);
 
-        void failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> failed(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void cancelled(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
+        Publisher<Void> cancelled(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation);
 
-        void updated(TaskId taskId, TaskExecutionDetails.AdditionalInformation 
additionalInformation);
+        Publisher<Void> updated(TaskId taskId, 
TaskExecutionDetails.AdditionalInformation additionalInformation);
     }
 
     Mono<Task.Result> executeTask(TaskWithId taskWithId);
 
     void cancelTask(TaskId taskId);
 
-    void fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason);
+    Publisher<Void> fail(TaskId taskId, 
Optional<TaskExecutionDetails.AdditionalInformation> additionalInformation, 
String errorMessage, Throwable reason);
 }
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 39e0ea4..de682c1 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -1,4 +1,4 @@
-/** **************************************************************
+/*****************************************************************
   * Licensed to the Apache Software Foundation (ASF) under one   *
   * or more contributor license agreements.  See the NOTICE file *
   * distributed with this work for additional information        *
@@ -6,87 +6,92 @@
   * to you under the Apache License, Version 2.0 (the            *
   * "License"); you may not use this file except in compliance   *
   * with the License.  You may obtain a copy of the License at   *
-  * *
-  * http://www.apache.org/licenses/LICENSE-2.0                 *
-  * *
+  *                                                              *
+  * http://www.apache.org/licenses/LICENSE-2.0                   *
+  *                                                              *
   * Unless required by applicable law or agreed to in writing,   *
   * software distributed under the License is distributed on an  *
   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
   * KIND, either express or implied.  See the License for the    *
   * specific language governing permissions and limitations      *
   * under the License.                                           *
-  * ***************************************************************/
+  ****************************************************************/
 package org.apache.james.task.eventsourcing
 
-import java.util
+import java.util.List
 
 import org.apache.james.eventsourcing.eventstore.History
 import org.apache.james.eventsourcing.{CommandHandler, Event}
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{Hostname, TaskId}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
+
+import scala.jdk.CollectionConverters._
 
 sealed abstract class TaskCommandHandler[T <: TaskCommand] extends 
CommandHandler[T] {
 
-  def loadAggregate(loadHistory: TaskAggregateId => History, taskId: TaskId): 
TaskAggregate = {
+  def loadAggregate(loadHistory: TaskAggregateId => SMono[History], taskId: 
TaskId): SMono[TaskAggregate] = {
     val aggregateId = TaskAggregateId(taskId)
-    TaskAggregate.fromHistory(aggregateId, loadHistory(aggregateId))
+    loadHistory(aggregateId).map(TaskAggregate.fromHistory(aggregateId, _))
   }
 }
 
-class CreateCommandHandler(private val loadHistory: TaskAggregateId => 
History, hostname: Hostname) extends TaskCommandHandler[Create] {
+class CreateCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History], hostname: Hostname) extends TaskCommandHandler[Create] {
   override def handledClass: Class[Create] = classOf[Create]
 
-  override def handle(command: Create): List[_ <: Event] = {
-    TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname)
+  override def handle(command: Create): Publisher[List[_ <: Event]] = {
+    SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), 
command.task, hostname).asJava)
   }
 }
 
-class StartCommandHandler(private val loadHistory: TaskAggregateId => History,
+class StartCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History],
                           private val hostname: Hostname) extends 
TaskCommandHandler[Start] {
   override def handledClass: Class[Start] = classOf[Start]
 
-  override def handle(command: Start): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).start(hostname)
+  override def handle(command: Start): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.start(hostname).asJava)
   }
 }
 
-class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => 
History,
+class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History],
                                   private val hostname: Hostname) extends 
TaskCommandHandler[RequestCancel] {
   override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
 
-  override def handle(command: RequestCancel): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).requestCancel(hostname)
+  override def handle(command: RequestCancel): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, 
command.id).map(_.requestCancel(hostname).asJava)
   }
 }
 
-class CompleteCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Complete] {
+class CompleteCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History]) extends TaskCommandHandler[Complete] {
   override def handledClass: Class[Complete] = classOf[Complete]
 
-  override def handle(command: Complete): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).complete(command.result, 
command.additionalInformation)
+  override def handle(command: Complete): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, command.id).map(_.complete(command.result, 
command.additionalInformation).asJava)
   }
 }
 
-class CancelCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[Cancel] {
+class CancelCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History]) extends TaskCommandHandler[Cancel] {
   override def handledClass: Class[Cancel] = classOf[Cancel]
 
-  override def handle(command: Cancel): List[_ <: Event] = {
-    loadAggregate(loadHistory, 
command.id).cancel(command.additionalInformation)
+  override def handle(command: Cancel): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, 
command.id).map(_.cancel(command.additionalInformation).asJava)
   }
 }
 
-class FailCommandHandler(private val loadHistory: TaskAggregateId => History) 
extends TaskCommandHandler[Fail] {
+class FailCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History]) extends TaskCommandHandler[Fail] {
   override def handledClass: Class[Fail] = classOf[Fail]
 
-  override def handle(command: Fail): List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).fail(command.additionalInformation, 
command.errorMessage, command.exception)
+  override def handle(command: Fail): Publisher[List[_ <: Event]] = {
+    loadAggregate(loadHistory, 
command.id).map(_.fail(command.additionalInformation, command.errorMessage, 
command.exception).asJava)
   }
 }
 
-class UpdateCommandHandler(private val loadHistory: TaskAggregateId => 
History) extends TaskCommandHandler[UpdateAdditionalInformation] {
+class UpdateCommandHandler(private val loadHistory: TaskAggregateId => 
SMono[History]) extends TaskCommandHandler[UpdateAdditionalInformation] {
   override def handledClass: Class[UpdateAdditionalInformation] = 
classOf[UpdateAdditionalInformation]
 
-  override def handle(command: UpdateAdditionalInformation): List[_ <: Event] 
= {
-    loadAggregate(loadHistory, 
command.id).update(command.additionalInformation)
+  override def handle(command: UpdateAdditionalInformation): Publisher[List[_ 
<: Event]] = {
+    loadAggregate(loadHistory, 
command.id).map(_.update(command.additionalInformation).asJava)
   }
 }
\ No newline at end of file
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index ad5d848..d9e6fa6 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -25,13 +25,16 @@ import java.util
 import com.google.common.annotations.VisibleForTesting
 import javax.annotation.PreDestroy
 import javax.inject.Inject
+
 import org.apache.james.eventsourcing.eventstore.{EventStore, History}
 import org.apache.james.eventsourcing.{AggregateId, EventSourcingSystem, 
Subscriber}
 import org.apache.james.lifecycle.api.Startable
 import org.apache.james.task.TaskManager.ReachedTimeoutException
 import org.apache.james.task._
 import org.apache.james.task.eventsourcing.TaskCommand._
+
 import reactor.core.publisher.{Flux, Mono}
+import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 
 class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing](
@@ -52,7 +55,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
 
   import scala.jdk.CollectionConverters._
 
-  private val loadHistory: AggregateId => History = 
eventStore.getEventsOfAggregate _
+  private val loadHistory: AggregateId => SMono[History] = aggregateId => 
SMono(eventStore.getEventsOfAggregate(aggregateId))
   private val eventSourcingSystem = new EventSourcingSystem(
     handlers = Set(
       new CreateCommandHandler(loadHistory, hostname),
@@ -75,7 +78,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
   override def submit(task: Task): TaskId = {
     val taskId = TaskId.generateTaskId
     val command = Create(taskId, task)
-    eventSourcingSystem.dispatch(command)
+    SMono(eventSourcingSystem.dispatch(command)).block()
     taskId
   }
 
@@ -93,7 +96,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting 
private[eventsourcing]
 
   override def cancel(id: TaskId): Unit = {
     val command = RequestCancel(id)
-    eventSourcingSystem.dispatch(command)
+    SMono(eventSourcingSystem.dispatch(command)).block()
   }
 
   @throws(classOf[TaskNotFoundException])
diff --git 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index aa93d82..1edbf03 100644
--- 
a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ 
b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -26,28 +26,31 @@ import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.task.Task.Result
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
+import org.reactivestreams.Publisher
+
+import reactor.core.scala.publisher.SMono
 
 import scala.compat.java8.OptionConverters._
 
 case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) 
extends TaskManagerWorker.Listener {
 
-  override def started(taskId: TaskId): Unit = 
eventSourcingSystem.dispatch(Start(taskId))
+  override def started(taskId: TaskId): Publisher[Void] = 
eventSourcingSystem.dispatch(Start(taskId))
 
-  override def completed(taskId: TaskId, result: Result, 
additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): 
Unit =
+  override def completed(taskId: TaskId, result: Result, 
additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): 
Publisher[Void] =
     eventSourcingSystem.dispatch(Complete(taskId, result, 
additionalInformation.asScala))
 
-  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: 
Throwable): Unit =
+  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: 
Throwable): Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, 
Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))
 
-  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Unit =
+  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): 
Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, 
None, Some(Throwables.getStackTraceAsString(t))))
 
-  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
+  override def failed(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
     eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, 
None, None))
 
-  override def cancelled(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
+  override def cancelled(taskId: TaskId, additionalInformation: 
Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
     eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala 
))
 
-  override def updated(taskId: TaskId, additionalInformation: 
TaskExecutionDetails.AdditionalInformation): Unit =
+  override def updated(taskId: TaskId, additionalInformation: 
TaskExecutionDetails.AdditionalInformation): Publisher[Void] =
     eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, 
additionalInformation))
 }
\ No newline at end of file
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index a83392f..5c4f23d 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -56,6 +57,13 @@ class SerialTaskManagerWorkerTest {
     @BeforeEach
     void beforeEach() {
         listener = mock(TaskManagerWorker.Listener.class);
+        when(listener.started(any())).thenReturn(Mono.empty());
+        when(listener.cancelled(any(), any())).thenReturn(Mono.empty());
+        when(listener.completed(any(), any(), any())).thenReturn(Mono.empty());
+        when(listener.updated(any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any(), any())).thenReturn(Mono.empty());
+        when(listener.failed(any(), any(), any(), 
any())).thenReturn(Mono.empty());
         worker = new SerialTaskManagerWorker(listener, 
UPDATE_INFORMATION_POLLING_DURATION);
     }
 
diff --git 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index dcc727c..852551c 100644
--- 
a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ 
b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -41,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import reactor.core.publisher.Mono;
+
 @ExtendWith(CountDownLatchExtension.class)
 class EventSourcingTaskManagerTest implements TaskManagerContract {
     ConditionFactory CALMLY_AWAIT = Awaitility
@@ -79,7 +81,7 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
     void createdTaskShouldKeepOriginHostname() {
         TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> 
Task.Result.COMPLETED));
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
-        
assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+        
assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof Created)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME);
@@ -91,7 +93,7 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
 
         CALMLY_AWAIT.untilAsserted(() ->
-            
assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+            
assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof Started)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME));
@@ -107,7 +109,7 @@ class EventSourcingTaskManagerTest implements 
TaskManagerContract {
 
         TaskAggregateId aggregateId = new TaskAggregateId(taskId);
         CALMLY_AWAIT.untilAsserted(() ->
-            
assertThat(eventStore.getEventsOfAggregate(aggregateId).getEventsJava())
+            
assertThat(Mono.from(eventStore.getEventsOfAggregate(aggregateId)).block().getEventsJava())
                 .filteredOn(event -> event instanceof CancelRequested)
                 .extracting("hostname")
                 .containsOnly(HOSTNAME));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to