This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 71fa3cc45e56756fb4e25856897fbce99a8accec Author: datph <[email protected]> AuthorDate: Thu Feb 21 11:18:31 2019 +0700 MAILBOX-380 Implement CassandraEventDeadLetters --- .../james/mailbox/events/EventDeadLetters.java | 5 ++ .../mailbox/events/EventDeadLettersContract.java | 7 +-- mailbox/event/event-cassandra/pom.xml | 42 +++++++++++++++++ .../mailbox/events/CassandraEventDeadLetters.java} | 39 +++++----------- .../events/CassandraEventDeadLettersDAO.java | 2 +- .../events/CassandraEventDeadLettersTest.java | 53 ++++++++++++++++++++++ .../mailbox/events/MemoryEventDeadLetters.java | 4 -- 7 files changed, 117 insertions(+), 35 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java index ed3dee5..c505918 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java @@ -23,6 +23,11 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface EventDeadLetters { + + String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null"; + String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null"; + String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null"; + Mono<Void> store(Group registeredGroup, Event failDeliveredEvent); Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId); diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java index b0d21d9..d1d6a37 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java @@ -167,7 +167,7 @@ interface EventDeadLettersContract { .operation((threadNumber, step) -> { Event.EventId eventId = Event.EventId.random(); storedEventIds.put(threadNumber, eventId); - eventDeadLetters.store(groups.get(threadNumber), event(eventId)); + eventDeadLetters.store(groups.get(threadNumber), event(eventId)).subscribe(); }) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) @@ -175,7 +175,7 @@ interface EventDeadLettersContract { groups.forEach((groupId, group) -> { Group storedGroup = groups.get(groupId); - assertThat(eventDeadLetters.failedEventIds(storedGroup).toStream()) + assertThat(eventDeadLetters.failedEventIds(storedGroup).collectList().block()) .hasSameElementsAs(storedEventIds.get(groupId)); }); } @@ -265,7 +265,8 @@ interface EventDeadLettersContract { ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { int operationIndex = threadNumber * OPERATION_COUNT + step; - eventDeadLetters.remove(groups.get(threadNumber), storedEventIds.get(operationIndex)); + eventDeadLetters.remove(groups.get(threadNumber), storedEventIds.get(operationIndex)) + .subscribe(); }) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) diff --git a/mailbox/event/event-cassandra/pom.xml b/mailbox/event/event-cassandra/pom.xml index 1bc6222..b3cc19e 100644 --- a/mailbox/event/event-cassandra/pom.xml +++ b/mailbox/event/event-cassandra/pom.xml @@ -39,12 +39,54 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>apache-james-backends-cassandra</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>apache-james-mailbox-api</artifactId> </dependency> <dependency> <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-api</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-cassandra</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> <artifactId>apache-james-mailbox-event-json</artifactId> </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.platform</groupId> + <artifactId>junit-platform-launcher</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>testcontainers</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java similarity index 62% copy from mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java copy to mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java index 45fff05..6530d1d 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java @@ -19,25 +19,20 @@ package org.apache.james.mailbox.events; +import javax.inject.Inject; + import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Multimaps; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; - -public class MemoryEventDeadLetters implements EventDeadLetters { - private static final String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null"; - private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null"; - private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null"; +public class CassandraEventDeadLetters implements EventDeadLetters { - private final Multimap<Group, Event> deadLetters; + private final CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO; - public MemoryEventDeadLetters() { - this.deadLetters = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + @Inject + public CassandraEventDeadLetters(CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO) { + this.cassandraEventDeadLettersDAO = cassandraEventDeadLettersDAO; } @Override @@ -45,9 +40,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); - return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent)) - .subscribeWith(MonoProcessor.create()) - .then(); + return cassandraEventDeadLettersDAO.store(registeredGroup, failDeliveredEvent); } @Override @@ -55,12 +48,7 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next() - .doOnNext(event -> deadLetters.remove(registeredGroup, event)) - .subscribeWith(MonoProcessor.create()) - .then(); + return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, failDeliveredEventId); } @Override @@ -68,21 +56,18 @@ public class MemoryEventDeadLetters implements EventDeadLetters { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next(); + return cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, failDeliveredEventId); } @Override public Flux<Event.EventId> failedEventIds(Group registeredGroup) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - return Flux.fromIterable(deadLetters.get(registeredGroup)) - .map(Event::getEventId); + return cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup); } @Override public Flux<Group> groupsWithFailedEvents() { - return Flux.fromIterable(deadLetters.keySet()); + return cassandraEventDeadLettersDAO.retrieveAllGroups(); } } diff --git a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java index 0eab33b..2056f55 100644 --- a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java +++ b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java @@ -110,7 +110,7 @@ public class CassandraEventDeadLettersDAO { return executor.executeSingleRow(selectEventStatement.bind() .setString(GROUP, group.asString()) .setUUID(EVENT_ID, failedEventId.getId())) - .map(row -> deserializeEvent(row.getString(EVENT))); + .map(row -> deserializeEvent(row.getString(EVENT))); } Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) { diff --git a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java new file mode 100644 index 0000000..2c108d4 --- /dev/null +++ b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.events; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.event.json.EventSerializer; +import org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventContract; +import org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventsContract; +import org.apache.james.mailbox.events.EventDeadLettersContract.GroupsWithFailedEventsContract; +import org.apache.james.mailbox.events.EventDeadLettersContract.RemoveContract; +import org.apache.james.mailbox.events.EventDeadLettersContract.StoreContract; +import org.apache.james.mailbox.model.TestId; +import org.apache.james.mailbox.model.TestMessageId; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +class CassandraEventDeadLettersTest implements StoreContract, RemoveContract, FailedEventContract, FailedEventsContract, + GroupsWithFailedEventsContract { + + @RegisterExtension + static CassandraClusterExtension cassandraClusterExtension = new CassandraClusterExtension(CassandraEventDeadLettersModule.MODULE); + + private CassandraEventDeadLetters eventDeadLetters; + + @BeforeEach + void setUp(CassandraCluster cassandraCluster) { + EventSerializer eventSerializer = new EventSerializer(new TestId.Factory(), new TestMessageId.Factory()); + eventDeadLetters = new CassandraEventDeadLetters(new CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer)); + } + + @Override + public EventDeadLetters eventDeadLetters() { + return eventDeadLetters; + } +} diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java index 45fff05..fcddc5e 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java @@ -30,10 +30,6 @@ import reactor.core.publisher.MonoProcessor; public class MemoryEventDeadLetters implements EventDeadLetters { - private static final String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null"; - private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null"; - private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null"; - private final Multimap<Group, Event> deadLetters; public MemoryEventDeadLetters() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
