http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java new file mode 100644 index 0000000..0e8fd5f --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.java @@ -0,0 +1,64 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import java.util.List; + +import javax.inject.Inject; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; +import org.apache.james.eventsourcing.eventstore.History; + +import com.google.common.base.Preconditions; + +public class CassandraEventStore implements EventStore { + + private final EventStoreDao eventStoreDao; + + @Inject + public CassandraEventStore(EventStoreDao eventStoreDao) { + this.eventStoreDao = eventStoreDao; + } + + @Override + public void appendAll(List<Event> events) { + if (events.isEmpty()) { + return; + } + doAppendAll(events); + } + + public void doAppendAll(List<Event> events) { + Preconditions.checkArgument(Event.belongsToSameAggregate(events)); + + boolean success = eventStoreDao.appendAll(events).join(); + if (!success) { + throw new EventStoreFailedException(); + } + } + + @Override + public History getEventsOfAggregate(AggregateId aggregateId) { + return eventStoreDao.getEventsOfAggregate(aggregateId); + } +}
http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.java new file mode 100644 index 0000000..85dab3c --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.java @@ -0,0 +1,62 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import java.util.List; + +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.backends.cassandra.components.CassandraTable; +import org.apache.james.backends.cassandra.components.CassandraType; +import org.apache.james.backends.cassandra.utils.CassandraConstants; + +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.google.common.collect.ImmutableList; + +public class CassandraEventStoreModule implements CassandraModule { + private final List<CassandraTable> tables; + private final List<CassandraType> types; + + public CassandraEventStoreModule() { + tables = ImmutableList.of( + new CassandraTable(CassandraEventStoreTable.EVENTS_TABLE, + SchemaBuilder.createTable(CassandraEventStoreTable.EVENTS_TABLE) + .ifNotExists() + .addPartitionKey(CassandraEventStoreTable.AGGREGATE_ID, DataType.varchar()) + .addClusteringColumn(CassandraEventStoreTable.EVENT_ID, DataType.cint()) + .addColumn(CassandraEventStoreTable.EVENT, DataType.text()) + .withOptions() + .comment("Store events of a EventSourcing aggregate") + .caching(SchemaBuilder.KeyCaching.ALL, + SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))); + types = ImmutableList.of(); + } + + @Override + public List<CassandraTable> moduleTables() { + return tables; + } + + @Override + public List<CassandraType> moduleTypes() { + return types; + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.java new file mode 100644 index 0000000..fb4575b --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.java @@ -0,0 +1,27 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +public interface CassandraEventStoreTable { + String EVENTS_TABLE = "eventStore"; + String AGGREGATE_ID = "aggregateId"; + String EVENT = "event"; + String EVENT_ID = "eventId"; +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java new file mode 100644 index 0000000..feb6ef7 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.java @@ -0,0 +1,122 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.AGGREGATE_ID; +import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.EVENT; +import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.EVENTS_TABLE; +import static org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.EVENT_ID; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import javax.inject.Inject; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.History; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.github.steveash.guavate.Guavate; + +public class EventStoreDao { + private final CassandraUtils cassandraUtils; + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final PreparedStatement insert; + private final PreparedStatement select; + private final JsonEventSerializer jsonEventSerializer; + + @Inject + public EventStoreDao(Session session, CassandraUtils cassandraUtils, JsonEventSerializer jsonEventSerializer) { + this.cassandraUtils = cassandraUtils; + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.jsonEventSerializer = jsonEventSerializer; + this.insert = prepareInsert(session); + this.select = prepareSelect(session); + } + + private PreparedStatement prepareInsert(Session session) { + return session.prepare(insertInto(EVENTS_TABLE) + .value(AGGREGATE_ID, bindMarker(AGGREGATE_ID)) + .value(EVENT_ID, bindMarker(EVENT_ID)) + .value(EVENT, bindMarker(EVENT)) + .ifNotExists()); + } + + private PreparedStatement prepareSelect(Session session) { + return session.prepare(select() + .from(EVENTS_TABLE) + .where(eq(AGGREGATE_ID, bindMarker(AGGREGATE_ID)))); + } + + public CompletableFuture<Boolean> appendAll(List<Event> events) { + BatchStatement batch = new BatchStatement(); + events.forEach(event -> batch.add(insertEvent(event))); + return cassandraAsyncExecutor.executeReturnApplied(batch); + } + + private BoundStatement insertEvent(Event event) { + try { + return insert + .bind() + .setString(AGGREGATE_ID, event.getAggregateId().asAggregateKey()) + .setInt(EVENT_ID, event.eventId().serialize()) + .setString(EVENT, jsonEventSerializer.serialize(event)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + public History getEventsOfAggregate(AggregateId aggregateId) { + return toHistory( + cassandraAsyncExecutor.execute( + select.bind() + .setString(AGGREGATE_ID, aggregateId.asAggregateKey())) + .join()); + } + + private History toHistory(ResultSet resultSet) { + List<Event> events = cassandraUtils.convertToStream(resultSet) + .map(this::toEvent) + .collect(Guavate.toImmutableList()); + return History.of(events); + } + + private Event toEvent(Row row) { + try { + return jsonEventSerializer.deserialize(row.getString(EVENT)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializer.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializer.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializer.java new file mode 100644 index 0000000..dea4a5e --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializer.java @@ -0,0 +1,98 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import javax.inject.Inject; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.github.steveash.guavate.Guavate; +import com.google.common.collect.ImmutableSet; + +public class JsonEventSerializer { + public static class UnknownEventException extends RuntimeException { + public UnknownEventException(String message) { + super(message); + } + } + + private final Map<Class<? extends Event>, EventDTOModule> eventClassToModule; + private final Map<String, EventDTOModule> typeToModule; + private final ObjectMapper objectMapper; + + @Inject + public JsonEventSerializer(Set<EventDTOModule> modules) { + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new Jdk8Module()); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_ABSENT); + + typeToModule = modules.stream() + .collect(Guavate.toImmutableMap( + EventDTOModule::getType, + Function.identity())); + + eventClassToModule = modules.stream() + .collect(Guavate.toImmutableMap( + EventDTOModule::getEventClass, + Function.identity())); + } + + public JsonEventSerializer(EventDTOModule... modules) { + this(ImmutableSet.copyOf(modules)); + } + + public String serialize(Event event) throws JsonProcessingException { + Object dto = Optional.ofNullable(eventClassToModule.get(event.getClass())) + .orElseThrow(() -> new UnknownEventException("unknown event class " + event.getClass())) + .toDTO(event); + return objectMapper.writeValueAsString(dto); + } + + public Event deserialize(String value) throws IOException { + JsonNode jsonNode = objectMapper.readTree(value); + + String type = jsonNode.path("type").asText(); + + EventDTO dto = objectMapper.readValue( + objectMapper.treeAsTokens(jsonNode), + retrieveDTOClass(type)); + return dto.toEvent(); + } + + public Class<? extends EventDTO> retrieveDTOClass(String type) { + return Optional.ofNullable(typeToModule.get(type)) + .map(EventDTOModule::getDTOClass) + .orElseThrow(() -> new UnknownEventException("unknown event type " + type)); + } + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTO.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTO.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTO.java new file mode 100644 index 0000000..461212a --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTO.java @@ -0,0 +1,26 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; + +public interface EventDTO { + Event toEvent(); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTOModule.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTOModule.java b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTOModule.java new file mode 100644 index 0000000..feaa5ef --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/main/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/EventDTOModule.java @@ -0,0 +1,32 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; + +public interface EventDTOModule { + String getType(); + + Class<? extends EventDTO> getDTOClass(); + + Class<? extends Event> getEventClass(); + + EventDTO toDTO(Event event); +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventSourcingSystemTest.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventSourcingSystemTest.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventSourcingSystemTest.java new file mode 100644 index 0000000..e538b93 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventSourcingSystemTest.java @@ -0,0 +1,28 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import org.apache.james.eventsourcing.EventSourcingSystemTest; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(CassandraEventStoreExtension.class) +public class CassandraEventSourcingSystemTest implements EventSourcingSystemTest { + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java new file mode 100644 index 0000000..73754a9 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java @@ -0,0 +1,30 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import org.apache.james.eventsourcing.eventstore.cassandra.dto.TestEventDTOModule; + +import com.google.common.collect.ImmutableSet; + +public class CassandraEventStoreExtension extends CassandraGenericEventStoreExtension { + public CassandraEventStoreExtension() { + super(ImmutableSet.of(new TestEventDTOModule())); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.java new file mode 100644 index 0000000..6d240c3 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.java @@ -0,0 +1,28 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import org.apache.james.eventsourcing.eventstore.EventStoreTest; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(CassandraEventStoreExtension.class) +class CassandraEventStoreTest implements EventStoreTest { + +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraGenericEventStoreExtension.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraGenericEventStoreExtension.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraGenericEventStoreExtension.java new file mode 100644 index 0000000..699153d --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraGenericEventStoreExtension.java @@ -0,0 +1,87 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import java.util.Set; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.DockerCassandraExtension; +import org.apache.james.backends.cassandra.DockerCassandraExtension.DockerCassandra; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class CassandraGenericEventStoreExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback, ParameterResolver { + private final DockerCassandraExtension dockerCassandraExtension; + private final Set<EventDTOModule> modules; + private CassandraCluster cassandra; + private DockerCassandra dockerCassandra; + private EventStoreDao eventStoreDao; + + public CassandraGenericEventStoreExtension(Set<EventDTOModule> modules) { + this.modules = modules; + dockerCassandraExtension = new DockerCassandraExtension(); + } + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + dockerCassandraExtension.beforeAll(context); + dockerCassandra = dockerCassandraExtension.getDockerCassandra(); + } + + @Override + public void afterAll(ExtensionContext context) throws Exception { + dockerCassandraExtension.afterAll(context); + } + + @Override + public void beforeEach(ExtensionContext context) { + cassandra = CassandraCluster.create( + new CassandraEventStoreModule(), dockerCassandra.getIp(), dockerCassandra.getBindingPort()); + + JsonEventSerializer jsonEventSerializer = new JsonEventSerializer(modules); + + eventStoreDao = new EventStoreDao(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION, + jsonEventSerializer); + } + + @Override + public void afterEach(ExtensionContext context) { + cassandra.close(); + } + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == EventStore.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return new CassandraEventStore(eventStoreDao); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializerTest.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializerTest.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializerTest.java new file mode 100644 index 0000000..1664ab1 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/JsonEventSerializerTest.java @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra; + +import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.TestAggregateId; +import org.apache.james.eventsourcing.TestEvent; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.OtherEvent; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.OtherTestEventDTOModule; +import org.apache.james.eventsourcing.eventstore.cassandra.dto.TestEventDTOModule; +import org.junit.jupiter.api.Test; + +class JsonEventSerializerTest { + public static final EventId EVENT_ID = EventId.fromSerialized(0); + public static final TestAggregateId AGGREGATE_ID = TestAggregateId.testId(1); + + public static final OtherEvent OTHER_EVENT = new OtherEvent(EVENT_ID, AGGREGATE_ID, 1); + public static final TestEvent TEST_EVENT = new TestEvent(EVENT_ID, AGGREGATE_ID, "first"); + + public static final String TEST_EVENT_JSON = "{\"type\":\"TestType\",\"data\":\"first\",\"eventId\":0,\"aggregate\":1}"; + public static final String OTHER_EVENT_JSON = "{\"type\":\"other-type\",\"data\":1,\"eventId\":0,\"aggregate\":1}"; + + @Test + void shouldDeserializeKnownEvent() throws Exception { + assertThat(new JsonEventSerializer(new TestEventDTOModule()) + .deserialize(TEST_EVENT_JSON)) + .isEqualTo(TEST_EVENT); + } + + @Test + void shouldThrowWhenDeserializeUnknownEvent() { + assertThatThrownBy(() -> new JsonEventSerializer() + .deserialize(TEST_EVENT_JSON)) + .isInstanceOf(JsonEventSerializer.UnknownEventException.class); + } + + @Test + void serializeShouldHandleAllKnownEvents() throws Exception { + JsonEventSerializer jsonEventSerializer = new JsonEventSerializer( + new TestEventDTOModule(), + new OtherTestEventDTOModule()); + + assertThatJson( + jsonEventSerializer.serialize(OTHER_EVENT)) + .isEqualTo(OTHER_EVENT_JSON); + + assertThatJson( + jsonEventSerializer.serialize(TEST_EVENT)) + .isEqualTo(TEST_EVENT_JSON); + } + + @Test + void deserializeShouldHandleAllKnownEvents() throws Exception { + JsonEventSerializer jsonEventSerializer = new JsonEventSerializer( + new TestEventDTOModule(), + new OtherTestEventDTOModule()); + + assertThatJson( + jsonEventSerializer.deserialize(OTHER_EVENT_JSON)) + .isEqualTo(OTHER_EVENT); + + assertThatJson( + jsonEventSerializer.deserialize(TEST_EVENT_JSON)) + .isEqualTo(TEST_EVENT); + } + + @Test + void shouldSerializeKnownEvent() throws Exception { + assertThatJson(new JsonEventSerializer(new TestEventDTOModule()) + .serialize(TEST_EVENT)) + .isEqualTo(TEST_EVENT_JSON); + } + + @Test + void shouldThrowWhenSerializeUnknownEvent() { + assertThatThrownBy(() -> new JsonEventSerializer() + .serialize(TEST_EVENT)) + .isInstanceOf(JsonEventSerializer.UnknownEventException.class); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherEvent.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherEvent.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherEvent.java new file mode 100644 index 0000000..1b21a36 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherEvent.java @@ -0,0 +1,50 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.TestAggregateId; + +public class OtherEvent implements Event { + private final EventId eventId; + private final TestAggregateId aggregateId; + private final long payload; + + public OtherEvent(EventId eventId, TestAggregateId aggregateId, long payload) { + this.eventId = eventId; + this.aggregateId = aggregateId; + this.payload = payload; + } + + @Override + public EventId eventId() { + return eventId; + } + + @Override + public TestAggregateId getAggregateId() { + return aggregateId; + } + + public long getPayload() { + return payload; + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTO.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTO.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTO.java new file mode 100644 index 0000000..d4739f8 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTO.java @@ -0,0 +1,72 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.TestAggregateId; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class OtherTestEventDTO implements EventDTO { + private final String type; + private final long data; + private final int eventId; + private final int aggregate; + + @JsonCreator + public OtherTestEventDTO( + @JsonProperty("type") String type, + @JsonProperty("data") long data, + @JsonProperty("eventId") int eventId, + @JsonProperty("aggregate") int aggregate) { + this.type = type; + this.data = data; + this.eventId = eventId; + this.aggregate = aggregate; + } + + public String getType() { + return type; + } + + public long getData() { + return data; + } + + public long getEventId() { + return eventId; + } + + public int getAggregate() { + return aggregate; + } + + @JsonIgnore + @Override + public Event toEvent() { + return new OtherEvent( + EventId.fromSerialized(eventId), + TestAggregateId.testId(aggregate), + data); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTOModule.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTOModule.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTOModule.java new file mode 100644 index 0000000..d025add --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/OtherTestEventDTOModule.java @@ -0,0 +1,55 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; +import org.testcontainers.shaded.com.google.common.base.Preconditions; + +public class OtherTestEventDTOModule implements EventDTOModule { + + public static final String OTHER_TYPE = "other-type"; + + @Override + public String getType() { + return OTHER_TYPE; + } + + @Override + public Class<? extends EventDTO> getDTOClass() { + return OtherTestEventDTO.class; + } + + @Override + public Class<? extends Event> getEventClass() { + return OtherEvent.class; + } + + @Override + public EventDTO toDTO(Event event) { + Preconditions.checkArgument(event instanceof OtherEvent); + OtherEvent otherEvent = (OtherEvent) event; + + return new OtherTestEventDTO( + OTHER_TYPE, + otherEvent.getPayload(), + otherEvent.eventId().serialize(), + otherEvent.getAggregateId().getId()); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.java new file mode 100644 index 0000000..709c709 --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.java @@ -0,0 +1,73 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.EventId; +import org.apache.james.eventsourcing.TestAggregateId; +import org.apache.james.eventsourcing.TestEvent; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class TestEventDTO implements EventDTO { + private final String type; + private final String data; + private final int eventId; + private final int aggregate; + + @JsonCreator + public TestEventDTO( + @JsonProperty("type") String type, + @JsonProperty("data") String data, + @JsonProperty("eventId") int eventId, + @JsonProperty("aggregate") int aggregate) { + this.type = type; + this.data = data; + this.eventId = eventId; + this.aggregate = aggregate; + } + + public String getType() { + return type; + } + + public String getData() { + return data; + } + + public long getEventId() { + return eventId; + } + + public int getAggregate() { + return aggregate; + } + + @JsonIgnore + @Override + public Event toEvent() { + return new TestEvent( + EventId.fromSerialized(eventId), + TestAggregateId.testId(aggregate), + data); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModule.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModule.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModule.java new file mode 100644 index 0000000..f0613ee --- /dev/null +++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModule.java @@ -0,0 +1,56 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.cassandra.dto; + +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.TestEvent; +import org.testcontainers.shaded.com.google.common.base.Preconditions; + +public class TestEventDTOModule implements EventDTOModule { + + public static final String TEST_TYPE = "TestType"; + + @Override + public String getType() { + return TEST_TYPE; + } + + @Override + public Class<? extends EventDTO> getDTOClass() { + return TestEventDTO.class; + } + + @Override + public Class<? extends Event> getEventClass() { + return TestEvent.class; + } + + @Override + public EventDTO toDTO(Event event) { + Preconditions.checkArgument(event instanceof TestEvent); + + TestEvent testEvent = (TestEvent) event; + return new TestEventDTO( + TEST_TYPE, + testEvent.getData(), + testEvent.eventId().serialize(), + testEvent.getAggregateId().getId()); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-memory/pom.xml ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-memory/pom.xml b/event-sourcing/event-store-memory/pom.xml new file mode 100644 index 0000000..c23c93a --- /dev/null +++ b/event-sourcing/event-store-memory/pom.xml @@ -0,0 +1,86 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.james</groupId> + <artifactId>event-sourcing</artifactId> + <version>3.1.0-SNAPSHOT</version> + </parent> + + <artifactId>event-sourcing-event-store-memory</artifactId> + + <name>Apache James :: Event sourcing :: Event Store :: Memory</name> + <description>Memory implementation for James Event Store</description> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-core</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-event-store-api</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-event-store-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-pojo</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java b/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java new file mode 100644 index 0000000..69ccf5f --- /dev/null +++ b/event-sourcing/event-store-memory/src/main/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStore.java @@ -0,0 +1,99 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.memory; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.james.eventsourcing.AggregateId; +import org.apache.james.eventsourcing.Event; +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.apache.james.eventsourcing.eventstore.EventStoreFailedException; +import org.apache.james.eventsourcing.eventstore.History; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +public class InMemoryEventStore implements EventStore { + + private final ConcurrentHashMap<AggregateId, History> store; + + public InMemoryEventStore() { + this.store = new ConcurrentHashMap<>(); + } + + @Override + public void appendAll(List<Event> events) { + if (events.isEmpty()) { + return; + } + AggregateId aggregateId = getAggregateId(events); + + if (!store.containsKey(aggregateId)) { + appendToEmptyHistory(aggregateId, events); + } else { + appendToExistingHistory(aggregateId, events); + } + } + + private AggregateId getAggregateId(List<? extends Event> events) { + Preconditions.checkArgument(!events.isEmpty()); + Preconditions.checkArgument(Event.belongsToSameAggregate(events)); + + return events.stream() + .map(Event::getAggregateId) + .findFirst() + .get(); + } + + private void appendToEmptyHistory(AggregateId aggregateId, List<Event> events) { + History newHistory = History.of(events); + + History previousHistory = store.putIfAbsent(aggregateId, newHistory); + if (previousHistory != null) { + throw new EventStoreFailedException(); + } + } + + private void appendToExistingHistory(AggregateId aggregateId, List<? extends Event> events) { + History currentHistory = store.get(aggregateId); + List<Event> updatedEvents = updatedEvents(currentHistory, events); + History updatedHistory = History.of(updatedEvents); + + boolean isReplaced = store.replace(aggregateId, currentHistory, updatedHistory); + if (!isReplaced) { + throw new EventStoreFailedException(); + } + } + + private List<Event> updatedEvents(History currentHistory, List<? extends Event> newEvents) { + return ImmutableList.<Event>builder() + .addAll(currentHistory.getEvents()) + .addAll(newEvents) + .build(); + } + + @Override + public History getEventsOfAggregate(AggregateId aggregateId) { + return Optional.ofNullable(store.get(aggregateId)) + .orElse(History.empty()); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventSourcingSystemTest.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventSourcingSystemTest.java b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventSourcingSystemTest.java new file mode 100644 index 0000000..4a07c40 --- /dev/null +++ b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventSourcingSystemTest.java @@ -0,0 +1,28 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.memory; + +import org.apache.james.eventsourcing.EventSourcingSystemTest; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(InMemoryEventStoreExtension.class) +public class InMemoryEventSourcingSystemTest implements EventSourcingSystemTest { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreExtension.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreExtension.java b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreExtension.java new file mode 100644 index 0000000..03f61bc --- /dev/null +++ b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreExtension.java @@ -0,0 +1,39 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.memory; + +import org.apache.james.eventsourcing.eventstore.EventStore; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.ParameterContext; +import org.junit.jupiter.api.extension.ParameterResolutionException; +import org.junit.jupiter.api.extension.ParameterResolver; + +public class InMemoryEventStoreExtension implements ParameterResolver { + + @Override + public boolean supportsParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return (parameterContext.getParameter().getType() == EventStore.class); + } + + @Override + public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext) throws ParameterResolutionException { + return new InMemoryEventStore(); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreTest.java ---------------------------------------------------------------------- diff --git a/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreTest.java b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreTest.java new file mode 100644 index 0000000..11cd197 --- /dev/null +++ b/event-sourcing/event-store-memory/src/test/java/org/apache/james/eventsourcing/eventstore/memory/InMemoryEventStoreTest.java @@ -0,0 +1,28 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.eventsourcing.eventstore.memory; + +import org.apache.james.eventsourcing.eventstore.EventStoreTest; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(InMemoryEventStoreExtension.class) +public class InMemoryEventStoreTest implements EventStoreTest { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/event-sourcing/pom.xml ---------------------------------------------------------------------- diff --git a/event-sourcing/pom.xml b/event-sourcing/pom.xml new file mode 100644 index 0000000..920700b --- /dev/null +++ b/event-sourcing/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.james</groupId> + <artifactId>james-project</artifactId> + <version>3.1.0-SNAPSHOT</version> + </parent> + + <packaging>pom</packaging> + <artifactId>event-sourcing</artifactId> + + <name>Apache James :: Event Sourcing</name> + + <modules> + <module>event-sourcing-core</module> + <module>event-sourcing-pojo</module> + <module>event-store-api</module> + <module>event-store-cassandra</module> + <module>event-store-memory</module> + </modules> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/mailbox/plugin/quota-mailing-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/plugin/quota-mailing-cassandra/pom.xml b/mailbox/plugin/quota-mailing-cassandra/pom.xml index b6b3b2c..bc3b954 100644 --- a/mailbox/plugin/quota-mailing-cassandra/pom.xml +++ b/mailbox/plugin/quota-mailing-cassandra/pom.xml @@ -36,6 +36,27 @@ <dependencies> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-event-store-api</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-event-store-cassandra</artifactId> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>event-sourcing-event-store-cassandra</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>apache-james-backends-cassandra</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStore.java ---------------------------------------------------------------------- diff --git a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStore.java b/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStore.java deleted file mode 100644 index 61c5a06..0000000 --- a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStore.java +++ /dev/null @@ -1,62 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing.cassandra; - -import java.util.List; - -import javax.inject.Inject; - -import org.apache.james.eventsourcing.AggregateId; -import org.apache.james.eventsourcing.Event; -import org.apache.james.eventsourcing.EventStore; - -import com.google.common.base.Preconditions; - -public class CassandraEventStore implements EventStore { - - private final EventStoreDao eventStoreDao; - - @Inject - public CassandraEventStore(EventStoreDao eventStoreDao) { - this.eventStoreDao = eventStoreDao; - } - - @Override - public void appendAll(List<Event> events) { - if (events.isEmpty()) { - return; - } - doAppendAll(events); - } - - public void doAppendAll(List<Event> events) { - Preconditions.checkArgument(Event.belongsToSameAggregate(events)); - - boolean success = eventStoreDao.appendAll(events).join(); - if (!success) { - throw new EventStoreFailedException(); - } - } - - @Override - public History getEventsOfAggregate(AggregateId aggregateId) { - return eventStoreDao.getEventsOfAggregate(aggregateId); - } -} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreModule.java ---------------------------------------------------------------------- diff --git a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreModule.java b/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreModule.java deleted file mode 100644 index ed15cb2..0000000 --- a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreModule.java +++ /dev/null @@ -1,62 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing.cassandra; - -import java.util.List; - -import org.apache.james.backends.cassandra.components.CassandraModule; -import org.apache.james.backends.cassandra.components.CassandraTable; -import org.apache.james.backends.cassandra.components.CassandraType; -import org.apache.james.backends.cassandra.utils.CassandraConstants; - -import com.datastax.driver.core.DataType; -import com.datastax.driver.core.schemabuilder.SchemaBuilder; -import com.google.common.collect.ImmutableList; - -public class CassandraEventStoreModule implements CassandraModule { - private final List<CassandraTable> tables; - private final List<CassandraType> types; - - public CassandraEventStoreModule() { - tables = ImmutableList.of( - new CassandraTable(CassandraEventStoreTable.EVENTS_TABLE, - SchemaBuilder.createTable(CassandraEventStoreTable.EVENTS_TABLE) - .ifNotExists() - .addPartitionKey(CassandraEventStoreTable.AGGREGATE_ID, DataType.varchar()) - .addClusteringColumn(CassandraEventStoreTable.EVENT_ID, DataType.cint()) - .addColumn(CassandraEventStoreTable.EVENT, DataType.text()) - .withOptions() - .comment("Store events of a EventSourcing aggregate") - .caching(SchemaBuilder.KeyCaching.ALL, - SchemaBuilder.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))); - types = ImmutableList.of(); - } - - @Override - public List<CassandraTable> moduleTables() { - return tables; - } - - @Override - public List<CassandraType> moduleTypes() { - return types; - } - -} http://git-wip-us.apache.org/repos/asf/james-project/blob/189490a4/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreTable.java ---------------------------------------------------------------------- diff --git a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreTable.java b/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreTable.java deleted file mode 100644 index c90b81e..0000000 --- a/mailbox/plugin/quota-mailing-cassandra/src/main/java/org/apache/james/eventsourcing/cassandra/CassandraEventStoreTable.java +++ /dev/null @@ -1,27 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.eventsourcing.cassandra; - -public interface CassandraEventStoreTable { - String EVENTS_TABLE = "eventStore"; - String AGGREGATE_ID = "aggregateId"; - String EVENT = "event"; - String EVENT_ID = "eventId"; -} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org