This is an automated email from the ASF dual-hosted git repository. matthieu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9db5a36687c8d23ab2accce1b9b10247265f2046 Author: Rémi KOWALSKI <rkowal...@linagora.com> AuthorDate: Mon Dec 9 17:00:36 2019 +0100 JAMES-3009 convert to scala event sourcing core --- event-sourcing/event-sourcing-core/pom.xml | 17 +++- .../james/eventsourcing/CommandDispatcher.java | 112 --------------------- .../apache/james/eventsourcing/CommandHandler.java | 14 +-- .../org/apache/james/eventsourcing/EventBus.java | 65 ------------ .../CommandHandlerJava.java} | 23 ++--- .../james/eventsourcing/CommandDispatcher.scala | 92 +++++++++++++++++ .../org/apache/james/eventsourcing/EventBus.scala} | 41 ++++---- .../james/eventsourcing/EventSourcingSystem.scala} | 35 +++---- .../apache/james/eventsourcing/Subscriber.scala} | 15 ++- .../eventsourcing/DataCollectorSubscriber.java | 1 - .../eventsourcing/EventSourcingSystemTest.java | 51 +++++----- .../eventsourcing/DataCollectorSubscriber.scala} | 20 ++-- .../eventstore/memory/InMemoryEventStore.scala | 14 ++- .../mailing/aggregates/UserQuotaThresholds.java | 4 +- .../commands/DetectThresholdCrossingHandler.java | 6 +- .../listeners/QuotaThresholdCrossingListener.java | 5 +- .../eventstore/CassandraEventStoreModule.java | 2 +- ...ventSourcingDLPConfigurationStoreExtension.java | 2 +- .../filtering/impl/DefineRulesCommandHandler.java | 6 +- .../impl/EventSourcingFilteringManagement.java | 2 +- .../api/filtering/impl/FilteringAggregate.java | 3 +- .../EventSourcingDLPConfigurationStore.java | 2 +- .../aggregates/DLPDomainConfiguration.java | 3 +- .../commands/ClearCommandHandler.java | 6 +- .../commands/StoreCommandHandler.java | 6 +- .../configuration/ConfigurationAggregate.java | 3 +- .../EventsourcingConfigurationManagement.java | 2 +- .../RegisterConfigurationCommandHandler.java | 6 +- .../RabbitMQMailQueueConfigurationChangeTest.java | 2 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 2 +- .../EventsourcingConfigurationManagementTest.java | 6 +- .../CassandraTaskExecutionDetailsProjection.scala | 13 +-- .../distributed/RabbitMQWorkQueueSupplier.scala | 1 + .../eventsourcing/distributed/TaskEventDTO.scala | 17 ++-- .../distributed/DistributedTaskManagerTest.java | 2 +- .../james/task/eventsourcing/CommandHandlers.scala | 14 +-- .../eventsourcing/EventSourcingTaskManager.scala | 5 +- .../eventsourcing/ScalaEventSourcingSystem.scala | 25 ----- .../james/task/eventsourcing/TaskAggregate.scala | 48 ++++----- .../TaskExecutionDetailsProjection.scala | 3 +- .../task/eventsourcing/WorkQueueSupplier.scala | 1 + .../task/eventsourcing/WorkerStatusListener.scala | 1 + .../EventSourcingTaskManagerTest.java | 8 +- .../task/eventsourcing/TaskAggregateTest.java | 21 ++-- 44 files changed, 319 insertions(+), 408 deletions(-) diff --git a/event-sourcing/event-sourcing-core/pom.xml b/event-sourcing/event-sourcing-core/pom.xml index 830fe5a..1a47ffc 100644 --- a/event-sourcing/event-sourcing-core/pom.xml +++ b/event-sourcing/event-sourcing-core/pom.xml @@ -68,6 +68,21 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-java8-compat_${scala.base}</artifactId> + </dependency> </dependencies> - + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> </project> \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandDispatcher.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandDispatcher.java deleted file mode 100644 index 9efc3c0..0000000 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandDispatcher.java +++ /dev/null @@ -1,112 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Supplier; -import java.util.stream.IntStream; - -import javax.inject.Inject; - -import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; - -import com.github.steveash.guavate.Guavate; -import org.apache.james.eventsourcing.Command; -import org.apache.james.eventsourcing.Event; - -public class CommandDispatcher { - - private static final int MAX_RETRY = 10; - - public static class UnknownCommandException extends RuntimeException { - private final Command command; - - public UnknownCommandException(Command command) { - super(String.format("Unknown command %s", command)); - this.command = command; - } - - public Command getCommand() { - return command; - } - } - - public static class TooManyRetries extends RuntimeException { - private final Command command; - private final int retries; - - - public TooManyRetries(Command command, int retries) { - super(String.format("Too much retries for command %s. Store failure after %d retries", command, retries)); - this.command = command; - this.retries = retries; - } - - - public Command getCommand() { - return command; - } - - public int getRetries() { - return retries; - } - } - - private final EventBus eventBus; - @SuppressWarnings("rawtypes") - private final Map<Class, CommandHandler> handlers; - - @Inject - public CommandDispatcher(EventBus eventBus, Set<CommandHandler<?>> handlers) { - this.eventBus = eventBus; - this.handlers = handlers.stream() - .collect(Guavate.toImmutableMap(CommandHandler::handledClass, handler -> handler)); - } - - public void dispatch(Command c) { - trySeveralTimes(() -> tryDispatch(c)) - .orElseThrow(() -> new TooManyRetries(c, MAX_RETRY)); - } - - private Optional<Integer> trySeveralTimes(Supplier<Boolean> singleTry) { - return IntStream.range(0, MAX_RETRY) - .boxed() - .filter(any -> singleTry.get()) - .findFirst(); - } - - @SuppressWarnings("unchecked") - private boolean tryDispatch(Command c) { - try { - List<Event> events = - Optional.ofNullable(handlers.get(c.getClass())) - .map(f -> f.handle(c)) - .orElseThrow(() -> new UnknownCommandException(c)); - - eventBus.publish(events); - return true; - } catch (EventStoreFailedException e) { - return false; - } - } -} diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java index cb51ae9..6e3645b 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java +++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java @@ -1,4 +1,4 @@ -/**************************************************************** +/*************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -16,16 +16,12 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ - package org.apache.james.eventsourcing; -import java.util.List; - -import org.apache.james.eventsourcing.Event; - -public interface CommandHandler<C> { +import scala.collection.immutable.List; - Class<C> handledClass(); +public interface CommandHandler<C extends Command> { + Class<C> handledClass(); - List<? extends Event> handle(C c); + List<? extends Event> handle(C command); } diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventBus.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventBus.java deleted file mode 100644 index 5d17bd2..0000000 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventBus.java +++ /dev/null @@ -1,65 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing; - -import java.util.List; -import java.util.Set; - -import javax.inject.Inject; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableSet; -import org.apache.james.eventsourcing.Event; - -public class EventBus { - - private static final Logger LOGGER = LoggerFactory.getLogger(EventBus.class); - - private final EventStore eventStore; - private final Set<Subscriber> subscribers; - - @Inject - public EventBus(EventStore eventStore, Set<Subscriber> subscribers) { - this.eventStore = eventStore; - this.subscribers = ImmutableSet.copyOf(subscribers); - } - - public void publish(List<Event> events) throws EventStoreFailedException { - eventStore.appendAll(events); - events.stream() - .flatMap(event -> subscribers.stream().map(subscriber -> Pair.of(event, subscriber))) - .forEach(this::handle); - } - - private void handle(Pair<Event, Subscriber> pair) { - Subscriber subscriber = pair.getRight(); - Event event = pair.getLeft(); - try { - subscriber.handle(event); - } catch (Exception e) { - LOGGER.error("Error while calling {} for {}", subscriber, event, e); - } - } -} diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java similarity index 69% copy from event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java copy to event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java index 53c7f2a..ce9cfc9 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java +++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/javaapi/CommandHandlerJava.java @@ -16,24 +16,19 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ - -package org.apache.james.eventsourcing; - -import java.util.Set; - -import org.apache.james.eventsourcing.eventstore.EventStore; +package org.apache.james.eventsourcing.javaapi; import org.apache.james.eventsourcing.Command; +import org.apache.james.eventsourcing.CommandHandler; +import org.apache.james.eventsourcing.Event; -public class EventSourcingSystem { - private final CommandDispatcher commandDispatcher; +import scala.jdk.javaapi.CollectionConverters; - public EventSourcingSystem(Set<CommandHandler<?>> handlers, Set<Subscriber> subscribers, EventStore eventStore) { - EventBus eventBus = new EventBus(eventStore, subscribers); - this.commandDispatcher = new CommandDispatcher(eventBus, handlers); - } +public interface CommandHandlerJava<C extends Command> extends CommandHandler<C> { + + java.util.List<? extends Event> handleJava(C command); - public void dispatch(Command c) { - commandDispatcher.dispatch(c); + default scala.collection.immutable.List<? extends Event> handle(C command) { + return CollectionConverters.asScala(handleJava(command)).toList(); } } diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala new file mode 100644 index 0000000..8dde47f --- /dev/null +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala @@ -0,0 +1,92 @@ + /*************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.eventsourcing + +import com.google.common.base.Preconditions +import javax.inject.Inject +import org.apache.james.eventsourcing.eventstore.EventStoreFailedException + +import scala.util.{Failure, Success, Try} + +object CommandDispatcher { + private val MAX_RETRY = 10 + private val ONLY_ONE_HANDLER_PRECONDITION: Object = "There should exist only one handler by command" + + case class UnknownCommandException(command: Command) + extends RuntimeException(String.format("Unknown command %s", command)) { + def getCommand: Command = command + } + + case class TooManyRetries(command: Command, retries: Int) + extends RuntimeException(String.format("Too much retries for command %s. Store failure after %d retries", command, retries)) { + def getCommand: Command = command + + def getRetries: Int = retries + } + +} + +class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandler[_ <: Command]]) { + Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION) + + def dispatch(c: Command): Unit = { + trySeveralTimes(() => tryDispatch(c)) + .getOrElse(() => throw CommandDispatcher.TooManyRetries(c, CommandDispatcher.MAX_RETRY)) + } + + private def hasOnlyOneHandlerByCommand(handlers: Set[CommandHandler[_ <: Command]]): Boolean = + handlers.groupBy(_.handledClass) + .values + .forall(_.size == 1) + + private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: Command]] = + handlers.map(handler => (handler.handledClass, handler)).toMap + + + private def trySeveralTimes(singleTry: () => Boolean): Option[Unit] = + 0.until(CommandDispatcher.MAX_RETRY) + .find(_ => singleTry()) + .map(_ => ()) + + + private def tryDispatch(c: Command): Boolean = { + val maybeEvents: Option[Try[List[_ <: Event]]] = handleCommand(c) + maybeEvents match { + case Some(eventsTry) => + eventsTry + .flatMap(events => Try(eventBus.publish(events))) match { + case Success(_) => true + case Failure(_: EventStoreFailedException) => false + case Failure(e) => throw e + } + case _ => + throw CommandDispatcher.UnknownCommandException(c) + } + } + + private def handleCommand(c: Command): Option[Try[List[_ <: Event]]] = { + handlersByClass + .get(c.getClass) + .map(commandHandler => + Try( + commandHandler + .asInstanceOf[CommandHandler[c.type]] + .handle(c))) + } +} \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala similarity index 54% copy from event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java copy to event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala index 53c7f2a..9d527d3 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala @@ -1,4 +1,4 @@ -/**************************************************************** + /*************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -16,24 +16,31 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ +package org.apache.james.eventsourcing -package org.apache.james.eventsourcing; +import javax.inject.Inject +import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException} +import org.slf4j.LoggerFactory -import java.util.Set; - -import org.apache.james.eventsourcing.eventstore.EventStore; - -import org.apache.james.eventsourcing.Command; - -public class EventSourcingSystem { - private final CommandDispatcher commandDispatcher; +object EventBus { + private val LOGGER = LoggerFactory.getLogger(classOf[EventBus]) +} - public EventSourcingSystem(Set<CommandHandler<?>> handlers, Set<Subscriber> subscribers, EventStore eventStore) { - EventBus eventBus = new EventBus(eventStore, subscribers); - this.commandDispatcher = new CommandDispatcher(eventBus, handlers); - } +class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) { + @throws[EventStoreFailedException] + def publish(events: List[Event]): Unit = { + eventStore.appendAll(events) + events + .flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))) + .foreach {case (event, subscriber) => handle(event, subscriber)} + } - public void dispatch(Command c) { - commandDispatcher.dispatch(c); + private def handle(event : Event, subscriber: Subscriber) : Unit = { + try { + subscriber.handle(event) + } catch { + case e: Exception => + EventBus.LOGGER.error("Error while calling {} for {}", subscriber, event, e) } -} + } +} \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala similarity index 55% rename from event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java rename to event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala index 53c7f2a..2f3b71b 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/EventSourcingSystem.java +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala @@ -1,4 +1,4 @@ -/**************************************************************** + /*************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -16,24 +16,25 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ +package org.apache.james.eventsourcing -package org.apache.james.eventsourcing; +import org.apache.james.eventsourcing.eventstore.EventStore -import java.util.Set; -import org.apache.james.eventsourcing.eventstore.EventStore; - -import org.apache.james.eventsourcing.Command; - -public class EventSourcingSystem { - private final CommandDispatcher commandDispatcher; +object EventSourcingSystem { + def fromJava(handlers: java.util.Set[CommandHandler[_ <: Command]], + subscribers: java.util.Set[Subscriber], + eventStore: EventStore) : EventSourcingSystem = { + import scala.jdk.CollectionConverters._ + new EventSourcingSystem(handlers.asScala.toSet, subscribers.asScala.toSet, eventStore) + } +} - public EventSourcingSystem(Set<CommandHandler<?>> handlers, Set<Subscriber> subscribers, EventStore eventStore) { - EventBus eventBus = new EventBus(eventStore, subscribers); - this.commandDispatcher = new CommandDispatcher(eventBus, handlers); - } +class EventSourcingSystem(handlers: Set[CommandHandler[_ <: Command]], + subscribers: Set[Subscriber], + eventStore: EventStore) { + private val eventBus = new EventBus(eventStore, subscribers) + private val commandDispatcher = new CommandDispatcher(eventBus, handlers) - public void dispatch(Command c) { - commandDispatcher.dispatch(c); - } -} + def dispatch(c: Command): Unit = commandDispatcher.dispatch(c) +} \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/Subscriber.java b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala similarity index 79% rename from event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/Subscriber.java rename to event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala index 57a9f35..ad6cf0a 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/Subscriber.java +++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * * * - * http://www.apache.org/licenses/LICENSE-2.0 * + * http://www.apache.org/licenses/LICENSE-2.0 * * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * @@ -15,12 +15,9 @@ * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - ****************************************************************/ + * ***************************************************************/ +package org.apache.james.eventsourcing -package org.apache.james.eventsourcing; - -import org.apache.james.eventsourcing.Event; - -public interface Subscriber { - void handle(Event event); -} +trait Subscriber { + def handle(event: Event) : Unit +} \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/DataCollectorSubscriber.java b/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/DataCollectorSubscriber.java index d1f65e6..85fccd1 100644 --- a/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/DataCollectorSubscriber.java +++ b/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/DataCollectorSubscriber.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.List; import com.google.common.collect.ImmutableList; -import org.apache.james.eventsourcing.Event; public class DataCollectorSubscriber implements Subscriber { diff --git a/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/EventSourcingSystemTest.java b/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/EventSourcingSystemTest.java index 6dbe68a..c247186 100644 --- a/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/EventSourcingSystemTest.java +++ b/event-sourcing/event-sourcing-core/src/test/java/org/apache/james/eventsourcing/EventSourcingSystemTest.java @@ -22,24 +22,21 @@ package org.apache.james.eventsourcing; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.List; - import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.History; import org.junit.jupiter.api.Test; +import org.mockito.internal.matchers.InstanceOf; +import org.mockito.internal.progress.ThreadSafeMockingProgress; import com.github.steveash.guavate.Guavate; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.james.eventsourcing.Command; -import org.apache.james.eventsourcing.Event; -import org.apache.james.eventsourcing.EventId; +import scala.collection.immutable.List; import scala.jdk.javaapi.CollectionConverters; public interface EventSourcingSystemTest { @@ -63,7 +60,7 @@ public interface EventSourcingSystemTest { @Test default void dispatchShouldApplyCommandHandlerThenCallSubscribers(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); @@ -76,7 +73,7 @@ public interface EventSourcingSystemTest { @Test default void throwingSubscribersShouldNotAbortSubscriberChain(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of( events -> { @@ -91,13 +88,13 @@ public interface EventSourcingSystemTest { } @Test - default void throwingStoreShouldNotLeadToPusblishing() { + default void throwingStoreShouldNotLeadToPublishing() { EventStore eventStore = mock(EventStore.class); - doThrow(new RuntimeException()).when(eventStore).appendAll(anyList()); + doThrow(new RuntimeException()).when(eventStore).appendAll(anyScalaList()); when(eventStore.getEventsOfAggregate(any())).thenReturn(History.empty()); DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of( events -> { @@ -115,7 +112,7 @@ public interface EventSourcingSystemTest { @Test default void dispatchShouldApplyCommandHandlerThenStoreGeneratedEvents(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); @@ -130,7 +127,7 @@ public interface EventSourcingSystemTest { @Test default void dispatchShouldCallSubscriberForSubsequentCommands(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); @@ -144,7 +141,7 @@ public interface EventSourcingSystemTest { @Test default void dispatchShouldStoreEventsForSubsequentCommands(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(simpleDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); @@ -161,7 +158,7 @@ public interface EventSourcingSystemTest { @Test default void dispatcherShouldBeAbleToReturnSeveralEvents(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(wordCuttingDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); @@ -174,12 +171,13 @@ public interface EventSourcingSystemTest { @Test default void unknownCommandsShouldBeIgnored(EventStore eventStore) { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); - EventSourcingSystem eventSourcingSystem = new EventSourcingSystem( + EventSourcingSystem eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(wordCuttingDispatcher(eventStore)), ImmutableSet.of(subscriber), eventStore); - assertThatThrownBy(() -> eventSourcingSystem.dispatch(new Command() {})) + assertThatThrownBy(() -> eventSourcingSystem.dispatch(new Command() { + })) .isInstanceOf(CommandDispatcher.UnknownCommandException.class); } @@ -188,7 +186,7 @@ public interface EventSourcingSystemTest { DataCollectorSubscriber subscriber = new DataCollectorSubscriber(); assertThatThrownBy(() -> - new EventSourcingSystem( + EventSourcingSystem.fromJava( ImmutableSet.of(wordCuttingDispatcher(eventStore), simpleDispatcher(eventStore)), ImmutableSet.of(subscriber), @@ -204,13 +202,13 @@ public interface EventSourcingSystemTest { } @Override - public List<? extends Event> handle(MyCommand myCommand) { + public scala.collection.immutable.List<? extends Event> handle(MyCommand myCommand) { History history = eventStore.getEventsOfAggregate(AGGREGATE_ID); - return ImmutableList.of(new TestEvent( + return CollectionConverters.asScala(ImmutableList.of(new TestEvent( history.getNextEventId(), AGGREGATE_ID, - myCommand.getPayload())); + myCommand.getPayload()))).toList(); } }; } @@ -223,19 +221,19 @@ public interface EventSourcingSystemTest { } @Override - public List<? extends Event> handle(MyCommand myCommand) { + public scala.collection.immutable.List<? extends Event> handle(MyCommand myCommand) { History history = eventStore.getEventsOfAggregate(AGGREGATE_ID); EventIdIncrementer eventIdIncrementer = new EventIdIncrementer(history.getNextEventId()); - return Splitter.on(" ") + return CollectionConverters.asScala(Splitter.on(" ") .splitToList(myCommand.getPayload()) .stream() .map(word -> new TestEvent( eventIdIncrementer.next(), AGGREGATE_ID, word)) - .collect(Guavate.toImmutableList()); + .collect(Guavate.toImmutableList())).toList(); } }; } @@ -253,4 +251,9 @@ public interface EventSourcingSystemTest { } } + static <T> List<T> anyScalaList() { + ThreadSafeMockingProgress.mockingProgress().getArgumentMatcherStorage().reportMatcher(new InstanceOf(scala.collection.immutable.List.class, "<any scala List>")); + return scala.collection.immutable.List.<T>newBuilder().result(); + } + } \ No newline at end of file diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala similarity index 74% copy from event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java copy to event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala index cb51ae9..58e3fc9 100644 --- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java +++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala @@ -1,4 +1,4 @@ -/**************************************************************** + /*************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -16,16 +16,16 @@ * specific language governing permissions and limitations * * under the License. * ****************************************************************/ +package org.apache.james.eventsourcing -package org.apache.james.eventsourcing; +import scala.collection.mutable -import java.util.List; +class DataCollectorSubscriber() extends Subscriber { + private val data = new mutable.ListBuffer[String] -import org.apache.james.eventsourcing.Event; + override def handle(event: Event): Unit = event match { + case event: TestEvent => data += event.getData + } -public interface CommandHandler<C> { - - Class<C> handledClass(); - - List<? extends Event> handle(C c); -} + def getData: List[String] = data.toList +} \ No newline at end of file diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala index b689e0d..c4f9621 100644 --- a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala +++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala @@ -6,29 +6,27 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - * ***************************************************************/ + ****************************************************************/ package org.apache.james.eventsourcing.eventstore.memory -import java.util import java.util.concurrent.atomic.AtomicReference import com.google.common.base.Preconditions import org.apache.james.eventsourcing.eventstore.{EventStore, History} import org.apache.james.eventsourcing.{AggregateId, Event} -import scala.jdk.CollectionConverters._ - class InMemoryEventStore() extends EventStore { - private val storeRef: AtomicReference[Map[AggregateId, History]] = new AtomicReference(Map().withDefault(_ => History.empty)) + private val storeRef: AtomicReference[Map[AggregateId, History]] = + new AtomicReference(Map().withDefault(_ => History.empty)) override def appendAll(events: List[Event]): Unit = if (events.nonEmpty) doAppendAll(events) diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java index d3162cf..6838e9b 100644 --- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java +++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java @@ -48,6 +48,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; +import scala.jdk.javaapi.CollectionConverters; public class UserQuotaThresholds { @@ -126,7 +127,8 @@ public class UserQuotaThresholds { private UserQuotaThresholds(Id aggregateId, History history) { this.aggregateId = aggregateId; this.history = history; - this.events = history.getEvents().stream() + this.events = CollectionConverters.asJava(history.getEvents()) + .stream() .map(QuotaThresholdChangedEvent.class::cast) .collect(Collectors.toList()); } diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java index be2a518..83f09a4 100644 --- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java +++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java @@ -21,14 +21,14 @@ package org.apache.james.mailbox.quota.mailing.commands; import java.util.List; -import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; import org.apache.james.eventsourcing.eventstore.History; +import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration; import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds; -public class DetectThresholdCrossingHandler implements CommandHandler<DetectThresholdCrossing> { +public class DetectThresholdCrossingHandler implements CommandHandlerJava<DetectThresholdCrossing> { private final EventStore eventStore; private final QuotaMailingListenerConfiguration quotaMailingListenerConfiguration; @@ -41,7 +41,7 @@ public class DetectThresholdCrossingHandler implements CommandHandler<DetectThre } @Override - public List<? extends Event> handle(DetectThresholdCrossing command) { + public List<? extends Event> handleJava(DetectThresholdCrossing command) { return loadAggregate(command) .detectThresholdCrossing(quotaMailingListenerConfiguration, command); } diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java index 886e49f..d4e7d5d 100644 --- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java +++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java @@ -24,6 +24,7 @@ import javax.inject.Inject; import org.apache.commons.configuration2.HierarchicalConfiguration; import org.apache.commons.configuration2.tree.ImmutableNode; import org.apache.james.core.Username; +import org.apache.james.eventsourcing.Command; import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.EventSourcingSystem; import org.apache.james.eventsourcing.Subscriber; @@ -60,9 +61,9 @@ public class QuotaThresholdCrossingListener implements MailboxListener.GroupMail public QuotaThresholdCrossingListener(MailetContext mailetContext, UsersRepository usersRepository, FileSystem fileSystem, EventStore eventStore, QuotaMailingListenerConfiguration config) { - ImmutableSet<CommandHandler<?>> handlers = ImmutableSet.of(new DetectThresholdCrossingHandler(eventStore, config)); + ImmutableSet<CommandHandler<? extends Command>> handlers = ImmutableSet.of(new DetectThresholdCrossingHandler(eventStore, config)); ImmutableSet<Subscriber> subscribers = ImmutableSet.of(new QuotaThresholdMailer(mailetContext, usersRepository, fileSystem, config)); - eventSourcingSystem = new EventSourcingSystem(handlers, subscribers, eventStore); + eventSourcingSystem = EventSourcingSystem.fromJava(handlers, subscribers, eventStore); } @Override diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/eventstore/CassandraEventStoreModule.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/eventstore/CassandraEventStoreModule.java index 34c5f29..aa48baf 100644 --- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/eventstore/CassandraEventStoreModule.java +++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/modules/eventstore/CassandraEventStoreModule.java @@ -38,7 +38,7 @@ public class CassandraEventStoreModule extends AbstractModule { Multibinder.newSetBinder(binder(), CassandraModule.class) .addBinding() - .toInstance(org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule.MODULE); + .toInstance(org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule.MODULE()); Multibinder.newSetBinder(binder(), new TypeLiteral<EventDTOModule<?, ?>>() {}); } diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/dlp/eventsourcing/cassandra/CassandraEventSourcingDLPConfigurationStoreExtension.java b/server/data/data-cassandra/src/test/java/org/apache/james/dlp/eventsourcing/cassandra/CassandraEventSourcingDLPConfigurationStoreExtension.java index d438db7..1dedaf4 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/dlp/eventsourcing/cassandra/CassandraEventSourcingDLPConfigurationStoreExtension.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/dlp/eventsourcing/cassandra/CassandraEventSourcingDLPConfigurationStoreExtension.java @@ -48,7 +48,7 @@ public class CassandraEventSourcingDLPConfigurationStoreExtension implements Bef public void beforeAll(ExtensionContext context) throws Exception { dockerCassandraExtension.beforeAll(context); cassandra = CassandraCluster.create( - CassandraEventStoreModule.MODULE, + CassandraEventStoreModule.MODULE(), dockerCassandraExtension.getDockerCassandra().getHost()); } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java index 9bc82be..4931e07 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java @@ -21,11 +21,11 @@ package org.apache.james.jmap.api.filtering.impl; import java.util.List; -import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; -public class DefineRulesCommandHandler implements CommandHandler<DefineRulesCommand> { +public class DefineRulesCommandHandler implements CommandHandlerJava<DefineRulesCommand> { private final EventStore eventStore; @@ -39,7 +39,7 @@ public class DefineRulesCommandHandler implements CommandHandler<DefineRulesComm } @Override - public List<? extends Event> handle(DefineRulesCommand storeCommand) { + public List<? extends Event> handleJava(DefineRulesCommand storeCommand) { FilteringAggregateId aggregateId = new FilteringAggregateId(storeCommand.getUsername()); return FilteringAggregate diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java index fadd7a0..d8dad9f 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java @@ -42,7 +42,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement { @Inject public EventSourcingFilteringManagement(EventStore eventStore) { - this.eventSourcingSystem = new EventSourcingSystem( + this.eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(new DefineRulesCommandHandler(eventStore)), NO_SUBSCRIBER, eventStore); diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java index 33970f2..4a52e1e 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java @@ -27,6 +27,7 @@ import org.apache.james.jmap.api.filtering.Rule; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import scala.jdk.javaapi.CollectionConverters; public class FilteringAggregate { @@ -58,7 +59,7 @@ public class FilteringAggregate { private FilteringAggregate(FilteringAggregateId aggregateId, History history) { this.aggregateId = aggregateId; this.state = State.initial(); - history.getEvents().forEach(this::apply); + CollectionConverters.asJava(history.getEvents()).forEach(this::apply); this.history = history; } diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java index 1aeba39..957a86d 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/EventSourcingDLPConfigurationStore.java @@ -50,7 +50,7 @@ public class EventSourcingDLPConfigurationStore implements DLPConfigurationStore @Inject public EventSourcingDLPConfigurationStore(EventStore eventStore) { - this.eventSourcingSystem = new EventSourcingSystem( + this.eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of( new ClearCommandHandler(eventStore), new StoreCommandHandler(eventStore)), diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java index 385b8fd..70d7a70 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java @@ -37,6 +37,7 @@ import com.github.steveash.guavate.Guavate; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import scala.jdk.javaapi.CollectionConverters; public class DLPDomainConfiguration { @@ -74,7 +75,7 @@ public class DLPDomainConfiguration { private DLPDomainConfiguration(DLPAggregateId aggregateId, History history) { this.aggregateId = aggregateId; this.state = State.initial(); - history.getEvents().forEach(this::apply); + CollectionConverters.asJava(history.getEvents()).forEach(this::apply); this.history = history; } diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java index a0b0c18..c59f3ce 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java @@ -23,11 +23,11 @@ import java.util.List; import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId; import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration; -import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; -public class ClearCommandHandler implements CommandHandler<ClearCommand> { +public class ClearCommandHandler implements CommandHandlerJava<ClearCommand> { private final EventStore eventStore; @@ -41,7 +41,7 @@ public class ClearCommandHandler implements CommandHandler<ClearCommand> { } @Override - public List<? extends Event> handle(ClearCommand clearCommand) { + public List<? extends Event> handleJava(ClearCommand clearCommand) { DLPAggregateId aggregateId = new DLPAggregateId(clearCommand.getDomain()); return DLPDomainConfiguration.load( diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java index dc81548..e24bd83 100644 --- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java +++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java @@ -23,11 +23,11 @@ import java.util.List; import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId; import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration; -import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; -public class StoreCommandHandler implements CommandHandler<StoreCommand> { +public class StoreCommandHandler implements CommandHandlerJava<StoreCommand> { private final EventStore eventStore; @@ -41,7 +41,7 @@ public class StoreCommandHandler implements CommandHandler<StoreCommand> { } @Override - public List<? extends Event> handle(StoreCommand storeCommand) { + public List<? extends Event> handleJava(StoreCommand storeCommand) { DLPAggregateId aggregateId = new DLPAggregateId(storeCommand.getDomain()); return DLPDomainConfiguration.load( diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java index ffa3106..5f95de3 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java @@ -28,6 +28,7 @@ import org.apache.james.eventsourcing.eventstore.History; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import scala.jdk.javaapi.CollectionConverters; class ConfigurationAggregate { @@ -65,7 +66,7 @@ class ConfigurationAggregate { this.history = history; this.state = State.initial(); - history.getEvents().forEach(this::apply); + CollectionConverters.asJava(history.getEvents()).forEach(this::apply); } List<? extends Event> registerConfiguration(CassandraMailQueueViewConfiguration configuration) { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java index d3a9c5b..a9ca76a 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagement.java @@ -43,7 +43,7 @@ public class EventsourcingConfigurationManagement { @Inject public EventsourcingConfigurationManagement(EventStore eventStore) { - this.eventSourcingSystem = new EventSourcingSystem( + this.eventSourcingSystem = EventSourcingSystem.fromJava( ImmutableSet.of(new RegisterConfigurationCommandHandler(eventStore)), NO_SUBSCRIBER, eventStore); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java index a679823..33c61e9 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java @@ -21,11 +21,11 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration; import java.util.List; -import org.apache.james.eventsourcing.CommandHandler; import org.apache.james.eventsourcing.Event; import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.javaapi.CommandHandlerJava; -class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConfigurationCommand> { +class RegisterConfigurationCommandHandler implements CommandHandlerJava<RegisterConfigurationCommand> { private final EventStore eventStore; @@ -39,7 +39,7 @@ class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConf } @Override - public List<? extends Event> handle(RegisterConfigurationCommand command) { + public List<? extends Event> handleJava(RegisterConfigurationCommand command) { return ConfigurationAggregate .load(command.getAggregateId(), eventStore.getEventsOfAggregate(command.getAggregateId())) .registerConfiguration(command.getConfiguration()); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index f363509..e0b9910 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -81,7 +81,7 @@ class RabbitMQMailQueueConfigurationChangeTest { CassandraSchemaVersionModule.MODULE, CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, - CassandraEventStoreModule.MODULE)); + CassandraEventStoreModule.MODULE())); @RegisterExtension static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ(); diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index a745662..61d6e5b 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -88,7 +88,7 @@ class RabbitMQMailQueueTest { static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( CassandraBlobModule.MODULE, CassandraMailQueueViewModule.MODULE, - CassandraEventStoreModule.MODULE, + CassandraEventStoreModule.MODULE(), CassandraSchemaVersionModule.MODULE)); @RegisterExtension diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java index 960c421..a4f10bf 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/EventsourcingConfigurationManagementTest.java @@ -33,6 +33,8 @@ import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import scala.jdk.javaapi.CollectionConverters; + class EventsourcingConfigurationManagementTest { @RegisterExtension @@ -241,9 +243,9 @@ class EventsourcingConfigurationManagementTest { testee.registerConfiguration(FIRST_CONFIGURATION); testee.registerConfiguration(FIRST_CONFIGURATION); - List<Event> eventsStored = eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID) + scala.collection.immutable.List<Event> eventsStored = eventStore.getEventsOfAggregate(CONFIGURATION_AGGREGATE_ID) .getEvents(); - assertThat(eventsStored) + assertThat(CollectionConverters.asJava(eventsStored)) .hasSize(1); } } \ No newline at end of file diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala index 51ae992..6a1e58b 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala @@ -1,4 +1,4 @@ -/** ************************************************************** + /*************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - * ***************************************************************/ + ****************************************************************/ package org.apache.james.task.eventsourcing.cassandra @@ -26,7 +26,8 @@ import org.apache.james.task.{TaskExecutionDetails, TaskId} import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ -class CassandraTaskExecutionDetailsProjection @Inject()(cassandraTaskExecutionDetailsProjectionDAO: CassandraTaskExecutionDetailsProjectionDAO) extends TaskExecutionDetailsProjection { +class CassandraTaskExecutionDetailsProjection @Inject()(cassandraTaskExecutionDetailsProjectionDAO: CassandraTaskExecutionDetailsProjectionDAO) + extends TaskExecutionDetailsProjection { override def load(taskId: TaskId): Option[TaskExecutionDetails] = cassandraTaskExecutionDetailsProjectionDAO.readDetails(taskId).blockOptional().asScala diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala index ee82bcb..7feed07 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueueSupplier.scala @@ -23,6 +23,7 @@ import java.time.Duration import com.google.common.annotations.VisibleForTesting import javax.inject.Inject import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool +import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.server.task.json.JsonTaskSerializer import org.apache.james.task.SerialTaskManagerWorker import org.apache.james.task.eventsourcing.{WorkQueueSupplier, WorkerStatusListener} diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala index ed7b9e6..cbc9885 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala @@ -20,12 +20,11 @@ package org.apache.james.task.eventsourcing.distributed import java.util.Optional -import java.util.function.Function import com.fasterxml.jackson.annotation.JsonProperty import org.apache.james.eventsourcing.EventId +import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO import org.apache.james.json.DTOConverter -import org.apache.james.server.task.json.JsonTaskSerializer import org.apache.james.server.task.json.dto.{AdditionalInformationDTO, TaskDTO} import org.apache.james.task.TaskExecutionDetails.AdditionalInformation import org.apache.james.task.eventsourcing._ @@ -68,7 +67,7 @@ case class CreatedDTO(@JsonProperty("type") typeName: String, object CreatedDTO { def fromDomainObject(taskConverter: TaskConverter)(event: Created, typeName: String): CreatedDTO = { val taskDTO = taskConverter.toDTO(event.task).orElseThrow(() => new NestedTaskDTOSerializerNotFound(event.task)) - CreatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), taskDTO, event.hostname.asString) + CreatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, taskDTO, event.hostname.asString) } } @@ -82,7 +81,7 @@ case class StartedDTO(@JsonProperty("type") typeName: String, object StartedDTO { def fromDomainObject(event: Started, typeName: String): StartedDTO = - StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), event.hostname.asString) + StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, event.hostname.asString) } case class CancelRequestedDTO(@JsonProperty("type") typeName: String, @@ -95,7 +94,7 @@ case class CancelRequestedDTO(@JsonProperty("type") typeName: String, object CancelRequestedDTO { def fromDomainObject(event: CancelRequested, typeName: String): CancelRequestedDTO = - CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), event.hostname.asString) + CancelRequestedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, event.hostname.asString) } case class CompletedDTO(@JsonProperty("type") typeName: String, @@ -121,7 +120,7 @@ object CompletedDTO { .additionalInformation .asJava .map(domainObject => dtoConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject))) - CompletedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), resultToString(event.result), additionalInformationDTO) + CompletedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, resultToString(event.result), additionalInformationDTO) } private def resultToString(result: Task.Result): String = result match { @@ -149,7 +148,7 @@ object FailedDTO { val additionalInformationDTO: Optional[AdditionalInformationDTO] = event .additionalInformation .asJava.map(domainObject => dtoConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject))) - FailedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), additionalInformationDTO, event.errorMessage.asJava, event.exception.asJava) + FailedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO, event.errorMessage.asJava, event.exception.asJava) } } @@ -171,7 +170,7 @@ object CancelledDTO { .additionalInformation .asJava .map(domainObject => additionalInformationConverter.toDTO(domainObject).orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(domainObject))) - CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), additionalInformationDTO) + CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO) } } @@ -193,6 +192,6 @@ object AdditionalInformationUpdatedDTO { val additionalInformationDTO = additionalInformationConverter .toDTO(event.additionalInformation) .orElseThrow(() => new NestedAdditionalInformationDTOSerializerNotFound(event.additionalInformation)) - AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), additionalInformationDTO) + AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize, additionalInformationDTO) } } diff --git a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java index d479199..3bc1135 100644 --- a/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java +++ b/server/task/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/DistributedTaskManagerTest.java @@ -119,7 +119,7 @@ class DistributedTaskManagerTest implements TaskManagerContract { static final CassandraClusterExtension CASSANDRA_CLUSTER = new CassandraClusterExtension( CassandraModule.aggregateModules( CassandraSchemaVersionModule.MODULE, - CassandraEventStoreModule.MODULE, + CassandraEventStoreModule.MODULE(), CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE())); diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala index 9db161e..39e0ea4 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala @@ -36,7 +36,7 @@ sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandle class CreateCommandHandler(private val loadHistory: TaskAggregateId => History, hostname: Hostname) extends TaskCommandHandler[Create] { override def handledClass: Class[Create] = classOf[Create] - override def handle(command: Create): util.List[_ <: Event] = { + override def handle(command: Create): List[_ <: Event] = { TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname) } } @@ -45,7 +45,7 @@ class StartCommandHandler(private val loadHistory: TaskAggregateId => History, private val hostname: Hostname) extends TaskCommandHandler[Start] { override def handledClass: Class[Start] = classOf[Start] - override def handle(command: Start): util.List[_ <: Event] = { + override def handle(command: Start): List[_ <: Event] = { loadAggregate(loadHistory, command.id).start(hostname) } } @@ -54,7 +54,7 @@ class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => Hi private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] { override def handledClass: Class[RequestCancel] = classOf[RequestCancel] - override def handle(command: RequestCancel): util.List[_ <: Event] = { + override def handle(command: RequestCancel): List[_ <: Event] = { loadAggregate(loadHistory, command.id).requestCancel(hostname) } } @@ -62,7 +62,7 @@ class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => Hi class CompleteCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Complete] { override def handledClass: Class[Complete] = classOf[Complete] - override def handle(command: Complete): util.List[_ <: Event] = { + override def handle(command: Complete): List[_ <: Event] = { loadAggregate(loadHistory, command.id).complete(command.result, command.additionalInformation) } } @@ -70,7 +70,7 @@ class CompleteCommandHandler(private val loadHistory: TaskAggregateId => History class CancelCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Cancel] { override def handledClass: Class[Cancel] = classOf[Cancel] - override def handle(command: Cancel): util.List[_ <: Event] = { + override def handle(command: Cancel): List[_ <: Event] = { loadAggregate(loadHistory, command.id).cancel(command.additionalInformation) } } @@ -78,7 +78,7 @@ class CancelCommandHandler(private val loadHistory: TaskAggregateId => History) class FailCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[Fail] { override def handledClass: Class[Fail] = classOf[Fail] - override def handle(command: Fail): util.List[_ <: Event] = { + override def handle(command: Fail): List[_ <: Event] = { loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception) } } @@ -86,7 +86,7 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] { override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation] - override def handle(command: UpdateAdditionalInformation): util.List[_ <: Event] = { + override def handle(command: UpdateAdditionalInformation): List[_ <: Event] = { loadAggregate(loadHistory, command.id).update(command.additionalInformation) } } \ No newline at end of file diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala index 9d8eaaa..87d8d55 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala @@ -25,12 +25,11 @@ import java.util import com.google.common.annotations.VisibleForTesting import javax.annotation.PreDestroy import javax.inject.Inject +import org.apache.james.eventsourcing.{AggregateId, EventSourcingSystem, Subscriber} import org.apache.james.eventsourcing.eventstore.{EventStore, History} -import org.apache.james.eventsourcing.Subscriber import org.apache.james.lifecycle.api.Startable import org.apache.james.task.TaskManager.ReachedTimeoutException import org.apache.james.task._ -import eventsourcing.AggregateId import org.apache.james.task.eventsourcing.TaskCommand._ import reactor.core.publisher.{Flux, Mono} import reactor.core.scheduler.Schedulers @@ -54,7 +53,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing] import scala.jdk.CollectionConverters._ private val loadHistory: AggregateId => History = eventStore.getEventsOfAggregate _ - private val eventSourcingSystem = ScalaEventSourcingSystem( + private val eventSourcingSystem = new EventSourcingSystem( handlers = Set( new CreateCommandHandler(loadHistory, hostname), new StartCommandHandler(loadHistory, hostname), diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala deleted file mode 100644 index 14665c1..0000000 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/ScalaEventSourcingSystem.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** ************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - * ***************************************************************/ -package org.apache.james.task.eventsourcing - -object ScalaEventSourcingSystem { - import scala.jdk.CollectionConverters._ - def apply(handlers: Set[CommandHandler[_]], subscribers: Set[Subscriber], eventStore: EventStore): EventSourcingSystem = - new EventSourcingSystem(handlers.asJava, subscribers.asJava, eventStore) -} diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala index 9ea48fe..04c36bf 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * @@ -18,8 +18,6 @@ * ***************************************************************/ package org.apache.james.task.eventsourcing -import java.util - import org.apache.james.eventsourcing.eventstore.History import org.apache.james.eventsourcing.{Event, EventId} import org.apache.james.task.Task.Result @@ -27,24 +25,18 @@ import org.apache.james.task.TaskExecutionDetails.AdditionalInformation import org.apache.james.task.TaskManager.Status import org.apache.james.task.{Hostname, Task} -import scala.jdk.CollectionConverters._ - class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) { - val initialEvent = history.getEvents.asScala.headOption match { + val initialEvent = history.getEvents.headOption match { case Some(created @ Created(_, _, _, _)) => created case _ => throw new IllegalArgumentException("History must start with Created event") } private val currentDecisionProjection: DecisionProjection = history .getEvents - .asScala - .toSeq .tail .foldLeft(DecisionProjection.initial(initialEvent))((decision, event) => decision.update(event)) - private def optionToJavaList[T](element: Option[T]): util.List[T] = element.toList.asJava - private def createEventIfNotFinished(event: EventId => Event): Option[Event] = { if (!currentDecisionProjection.status.isFinished) { Some(event(history.getNextEventId)) @@ -54,37 +46,37 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor private def createEvent(event: EventId => Event): Option[Event] = Some(event(history.getNextEventId)) - private def createEventIfNotFinishedAsJavaList(event: EventId => Event): util.List[Event] = optionToJavaList(createEventIfNotFinished(event)) + private def createEventIfNotFinishedAsList(event: EventId => Event): List[Event] = createEventIfNotFinished(event).toList - private[eventsourcing] def start(hostname: Hostname): util.List[Event] = - createEventIfNotFinishedAsJavaList(Started(aggregateId, _, hostname)) + private[eventsourcing] def start(hostname: Hostname): List[Event] = + createEventIfNotFinished(Started(aggregateId, _, hostname)).toList - private[eventsourcing] def requestCancel(hostname: Hostname): util.List[Event] = - createEventIfNotFinishedAsJavaList(CancelRequested(aggregateId, _, hostname)) + private[eventsourcing] def requestCancel(hostname: Hostname): List[Event] = + createEventIfNotFinishedAsList(CancelRequested(aggregateId, _, hostname)) - private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = - optionToJavaList(currentDecisionProjection.status match { + private[eventsourcing] def update(additionalInformation: AdditionalInformation): List[Event] = + (currentDecisionProjection.status match { case Status.IN_PROGRESS if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) case Status.CANCEL_REQUESTED if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation)) case _ => None - }) + }).toList - private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = - createEventIfNotFinishedAsJavaList(Completed(aggregateId, _, result, additionalInformation)) + private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): List[Event] = + createEventIfNotFinishedAsList(Completed(aggregateId, _, result, additionalInformation)) - private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): util.List[Event] = - createEventIfNotFinishedAsJavaList(Failed(aggregateId, _, additionalInformation, errorMessage, exception)) + private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): List[Event] = + createEventIfNotFinishedAsList(Failed(aggregateId, _, additionalInformation, errorMessage, exception)) - private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): util.List[Event] = - createEventIfNotFinishedAsJavaList(Cancelled(aggregateId, _, additionalInformation)) + private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): List[Event] = + createEventIfNotFinishedAsList(Cancelled(aggregateId, _, additionalInformation)) } object TaskAggregate { def fromHistory(aggregateId: TaskAggregateId, history: History): TaskAggregate = new TaskAggregate(aggregateId, history) - def create(aggregateId: TaskAggregateId, task: Task, hostname: Hostname): util.List[Event] = { - List[Event](Created(aggregateId, EventId.first(), task, hostname)).asJava + def create(aggregateId: TaskAggregateId, task: Task, hostname: Hostname): List[Event] = { + List[Event](Created(aggregateId, EventId.first, task, hostname)) } } diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala index 0791a10..42fb4ab 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala @@ -20,9 +20,10 @@ package org.apache.james.task.eventsourcing import java.util.concurrent.ConcurrentHashMap +import org.apache.james.eventsourcing.Subscriber import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId} -import scala.compat.java8.OptionConverters._ +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ trait TaskExecutionDetailsProjection { diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala index c8ba092..0265359 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala @@ -18,6 +18,7 @@ * ***************************************************************/ package org.apache.james.task.eventsourcing +import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.task.WorkQueue @FunctionalInterface diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala index 5324514..aa93d82 100644 --- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala +++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala @@ -22,6 +22,7 @@ package org.apache.james.task.eventsourcing import java.util.Optional import com.google.common.base.Throwables +import org.apache.james.eventsourcing.EventSourcingSystem import org.apache.james.task.Task.Result import org.apache.james.task.eventsourcing.TaskCommand._ import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker} diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java index edf2d7c..1655171 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java @@ -41,6 +41,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import scala.jdk.javaapi.CollectionConverters; + @ExtendWith(CountDownLatchExtension.class) class EventSourcingTaskManagerTest implements TaskManagerContract { ConditionFactory CALMLY_AWAIT = Awaitility @@ -79,7 +81,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { void createdTaskShouldKeepOriginHostname() { TaskId taskId = taskManager.submit(new MemoryReferenceTask(() -> Task.Result.COMPLETED)); TaskAggregateId aggregateId = new TaskAggregateId(taskId); - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + assertThat(CollectionConverters.asJava(eventStore.getEventsOfAggregate(aggregateId).getEvents())) .filteredOn(event -> event instanceof Created) .extracting("hostname") .containsOnly(HOSTNAME); @@ -91,7 +93,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskAggregateId aggregateId = new TaskAggregateId(taskId); CALMLY_AWAIT.untilAsserted(() -> - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + assertThat(CollectionConverters.asJava(eventStore.getEventsOfAggregate(aggregateId).getEvents())) .filteredOn(event -> event instanceof Started) .extracting("hostname") .containsOnly(HOSTNAME)); @@ -107,7 +109,7 @@ class EventSourcingTaskManagerTest implements TaskManagerContract { TaskAggregateId aggregateId = new TaskAggregateId(taskId); CALMLY_AWAIT.untilAsserted(() -> - assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents()) + assertThat(CollectionConverters.asJava(eventStore.getEventsOfAggregate(aggregateId).getEvents())) .filteredOn(event -> event instanceof CancelRequested) .extracting("hostname") .containsOnly(HOSTNAME)); diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java index b9bdbb1..3892934 100644 --- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java +++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java @@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test; import com.github.steveash.guavate.Guavate; import com.google.common.collect.Streams; import scala.Option; +import scala.jdk.javaapi.CollectionConverters; class TaskAggregateTest { @@ -71,7 +72,8 @@ class TaskAggregateTest { eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))).isEmpty(); + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) + .isEmpty(); } @Test @@ -81,7 +83,7 @@ class TaskAggregateTest { eventId -> Started.apply(ID, eventId, HOSTNAME) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))) + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))); } @@ -95,7 +97,7 @@ class TaskAggregateTest { TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); Instant newEventTime = TaskAggregateTest.timestamp.plusSeconds(3); MemoryReferenceWithCounterTask.AdditionalInformation youngerAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, newEventTime); - assertThat(aggregate.update(youngerAdditionalInformation)) + assertThat(CollectionConverters.asJava(aggregate.update(youngerAdditionalInformation))) .isNotEmpty() .anySatisfy(event -> assertThat(event) .isInstanceOfSatisfying(AdditionalInformationUpdated.class, @@ -111,7 +113,7 @@ class TaskAggregateTest { ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); MemoryReferenceWithCounterTask.AdditionalInformation olderAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp.minusSeconds(3)); - assertThat(aggregate.update(olderAdditionalInformation)).isEmpty(); + assertThat(CollectionConverters.asJava(aggregate.update(olderAdditionalInformation))).isEmpty(); } @Test @@ -122,7 +124,7 @@ class TaskAggregateTest { eventId -> CancelRequested.apply(ID, eventId, HOSTNAME) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))) + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) .containsExactly(AdditionalInformationUpdated.apply(ID, history.getNextEventId(), new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))); } @@ -135,7 +137,8 @@ class TaskAggregateTest { eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))).isEmpty(); + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) + .isEmpty(); } @Test @@ -147,7 +150,8 @@ class TaskAggregateTest { eventId -> Failed.apply(ID, eventId, Option.empty(), Option.empty(), Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))).isEmpty(); + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) + .isEmpty(); } @Test @@ -159,6 +163,7 @@ class TaskAggregateTest { eventId -> Cancelled.apply(ID, eventId, Option.empty()) ); TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history); - assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp))).isEmpty(); + assertThat(CollectionConverters.asJava(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp)))) + .isEmpty(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org