This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit fff0427a01dc79252f5ff6373c24101da683bf32 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Mon Nov 25 15:33:50 2019 +0100 [Refactoring] rewrite InMemoryEventStore in Scala --- event-sourcing/event-store-memory/pom.xml | 17 ++++ .../eventstore/memory/InMemoryEventStore.java | 99 ---------------------- .../eventstore/memory/InMemoryEventStore.scala | 58 +++++++++++++ 3 files changed, 75 insertions(+), 99 deletions(-) diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml index aa57dd3..6f4b932 100644 --- a/event-sourcing/event-store-memory/pom.xml +++ b/event-sourcing/event-store-memory/pom.xml @@ -69,6 +69,23 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + </dependency> + <dependency> + <groupId>org.scala-lang.modules</groupId> + <artifactId>scala-java8-compat_${scala.base}</artifactId> + </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </project> \ No newline at end of file diff --git a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java b/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java deleted file mode 100644 index de65ca6..0000000 --- a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java +++ /dev/null @@ -1,99 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing.eventstore.memory; - -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.james.eventsourcing.AggregateId; -import org.apache.james.eventsourcing.Event; -import org.apache.james.eventsourcing.eventstore.EventStore; -import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; -import org.apache.james.eventsourcing.eventstore.History; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -public class InMemoryEventStore implements EventStore { - - private final ConcurrentHashMap<AggregateId, History> store; - - public InMemoryEventStore() { - this.store = new ConcurrentHashMap<>(); - } - - @Override - public void appendAll(List<Event> events) { - if (events.isEmpty()) { - return; - } - AggregateId aggregateId = getAggregateId(events); - - if (!store.containsKey(aggregateId)) { - appendToEmptyHistory(aggregateId, events); - } else { - appendToExistingHistory(aggregateId, events); - } - } - - private AggregateId getAggregateId(List<? extends Event> events) { - Preconditions.checkArgument(!events.isEmpty()); - Preconditions.checkArgument(Event.belongsToSameAggregate(events)); - - return events.stream() - .map(Event::getAggregateId) - .findFirst() - .get(); - } - - private void appendToEmptyHistory(AggregateId aggregateId, List<Event> events) { - History newHistory = History.of(events); - - History previousHistory = store.putIfAbsent(aggregateId, newHistory); - if (previousHistory != null) { - throw new EventStoreFailedException("Concurrent update to the EventStore detected"); - } - } - - private void appendToExistingHistory(AggregateId aggregateId, List<? extends Event> events) { - History currentHistory = store.get(aggregateId); - List<Event> updatedEvents = updatedEvents(currentHistory, events); - History updatedHistory = History.of(updatedEvents); - - boolean isReplaced = store.replace(aggregateId, currentHistory, updatedHistory); - if (!isReplaced) { - throw new EventStoreFailedException("Concurrent update to the EventStore detected"); - } - } - - private List<Event> updatedEvents(History currentHistory, List<? extends Event> newEvents) { - return ImmutableList.<Event>builder() - .addAll(currentHistory.getEvents()) - .addAll(newEvents) - .build(); - } - - @Override - public History getEventsOfAggregate(AggregateId aggregateId) { - return Optional.ofNullable(store.get(aggregateId)) - .orElse(History.empty()); - } -} diff --git a/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala new file mode 100644 index 0000000..d7bf451 --- /dev/null +++ b/event-sourcing/event-store-memory/src/main/scala/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.scala @@ -0,0 +1,58 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + * ***************************************************************/ +package org.apache.james.eventsourcing.eventstore.memory + +import java.util +import java.util.concurrent.atomic.AtomicReference + +import com.google.common.base.Preconditions +import org.apache.james.eventsourcing.eventstore.{EventStore, History} +import org.apache.james.eventsourcing.{AggregateId, Event} + +import scala.collection.JavaConverters._ + +class InMemoryEventStore() extends EventStore { + private val storeRef: AtomicReference[Map[AggregateId, History]] = new AtomicReference(Map().withDefault(_ => History.empty())) + + override def appendAll(events: util.List[Event]): Unit = if (!events.isEmpty) appendAll(events.asScala) + + override def getEventsOfAggregate(aggregateId: AggregateId): History = { + Preconditions.checkNotNull(aggregateId) + storeRef.get()(aggregateId) + } + + def appendAll(events: Seq[Event]): Unit = { + val aggregateId: AggregateId = getAggregateId(events) + storeRef.updateAndGet(store => { + val updatedHistory = History.of((store(aggregateId).getEvents.asScala ++ events).asJava) + store.updated(aggregateId, updatedHistory) + }) + } + + private def getAggregateId(events: Seq[Event]): AggregateId = { + Preconditions.checkArgument(events.nonEmpty) + val aggregateId = events.head.getAggregateId + Preconditions.checkArgument(belongsToSameAggregate(aggregateId, events)) + aggregateId + } + + private def belongsToSameAggregate(aggregateId: AggregateId, events: Seq[Event]) = + events.forall(_.getAggregateId.equals(aggregateId)) + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org