This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 71fa3cc45e56756fb4e25856897fbce99a8accec
Author: datph <[email protected]>
AuthorDate: Thu Feb 21 11:18:31 2019 +0700

    MAILBOX-380 Implement CassandraEventDeadLetters
---
 .../james/mailbox/events/EventDeadLetters.java     |  5 ++
 .../mailbox/events/EventDeadLettersContract.java   |  7 +--
 mailbox/event/event-cassandra/pom.xml              | 42 +++++++++++++++++
 .../mailbox/events/CassandraEventDeadLetters.java} | 39 +++++-----------
 .../events/CassandraEventDeadLettersDAO.java       |  2 +-
 .../events/CassandraEventDeadLettersTest.java      | 53 ++++++++++++++++++++++
 .../mailbox/events/MemoryEventDeadLetters.java     |  4 --
 7 files changed, 117 insertions(+), 35 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
index ed3dee5..c505918 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
@@ -23,6 +23,11 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface EventDeadLetters {
+
+    String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
+    String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be 
null";
+    String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId 
cannot be null";
+
     Mono<Void> store(Group registeredGroup, Event failDeliveredEvent);
 
     Mono<Void> remove(Group registeredGroup, Event.EventId 
failDeliveredEventId);
diff --git 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
index b0d21d9..d1d6a37 100644
--- 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
+++ 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -167,7 +167,7 @@ interface EventDeadLettersContract {
                 .operation((threadNumber, step) -> {
                     Event.EventId eventId = Event.EventId.random();
                     storedEventIds.put(threadNumber, eventId);
-                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId));
+                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId)).subscribe();
                 })
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
@@ -175,7 +175,7 @@ interface EventDeadLettersContract {
 
             groups.forEach((groupId, group) -> {
                 Group storedGroup = groups.get(groupId);
-                
assertThat(eventDeadLetters.failedEventIds(storedGroup).toStream())
+                
assertThat(eventDeadLetters.failedEventIds(storedGroup).collectList().block())
                     .hasSameElementsAs(storedEventIds.get(groupId));
             });
         }
@@ -265,7 +265,8 @@ interface EventDeadLettersContract {
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, step) -> {
                     int operationIndex = threadNumber * OPERATION_COUNT + step;
-                    eventDeadLetters.remove(groups.get(threadNumber), 
storedEventIds.get(operationIndex));
+                    eventDeadLetters.remove(groups.get(threadNumber), 
storedEventIds.get(operationIndex))
+                        .subscribe();
                 })
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
diff --git a/mailbox/event/event-cassandra/pom.xml 
b/mailbox/event/event-cassandra/pom.xml
index 1bc6222..b3cc19e 100644
--- a/mailbox/event/event-cassandra/pom.xml
+++ b/mailbox/event/event-cassandra/pom.xml
@@ -39,12 +39,54 @@
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-backends-cassandra</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>apache-james-mailbox-api</artifactId>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-api</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>apache-james-mailbox-cassandra</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
             <artifactId>apache-james-mailbox-event-json</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-launcher</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
similarity index 62%
copy from 
mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
copy to 
mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
index 45fff05..6530d1d 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLetters.java
@@ -19,25 +19,20 @@
 
 package org.apache.james.mailbox.events;
 
+import javax.inject.Inject;
+
 import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.publisher.MonoProcessor;
-
-public class MemoryEventDeadLetters implements EventDeadLetters {
 
-    private static final String REGISTERED_GROUP_CANNOT_BE_NULL = 
"registeredGroup cannot be null";
-    private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = 
"failDeliveredEvent cannot be null";
-    private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = 
"failDeliveredEventId cannot be null";
+public class CassandraEventDeadLetters implements EventDeadLetters {
 
-    private final Multimap<Group, Event> deadLetters;
+    private final CassandraEventDeadLettersDAO cassandraEventDeadLettersDAO;
 
-    public MemoryEventDeadLetters() {
-        this.deadLetters = 
Multimaps.synchronizedSetMultimap(HashMultimap.create());
+    @Inject
+    public CassandraEventDeadLetters(CassandraEventDeadLettersDAO 
cassandraEventDeadLettersDAO) {
+        this.cassandraEventDeadLettersDAO = cassandraEventDeadLettersDAO;
     }
 
     @Override
@@ -45,9 +40,7 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEvent != null, 
FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
 
-        return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, 
failDeliveredEvent))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        return cassandraEventDeadLettersDAO.store(registeredGroup, 
failDeliveredEvent);
     }
 
     @Override
@@ -55,12 +48,7 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next()
-            .doOnNext(event -> deadLetters.remove(registeredGroup, event))
-            .subscribeWith(MonoProcessor.create())
-            .then();
+        return cassandraEventDeadLettersDAO.removeEvent(registeredGroup, 
failDeliveredEventId);
     }
 
     @Override
@@ -68,21 +56,18 @@ public class MemoryEventDeadLetters implements 
EventDeadLetters {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .filter(event -> event.getEventId().equals(failDeliveredEventId))
-            .next();
+        return 
cassandraEventDeadLettersDAO.retrieveFailedEvent(registeredGroup, 
failDeliveredEventId);
     }
 
     @Override
     public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
 
-        return Flux.fromIterable(deadLetters.get(registeredGroup))
-            .map(Event::getEventId);
+        return 
cassandraEventDeadLettersDAO.retrieveEventIdsWithGroup(registeredGroup);
     }
 
     @Override
     public Flux<Group> groupsWithFailedEvents() {
-        return Flux.fromIterable(deadLetters.keySet());
+        return cassandraEventDeadLettersDAO.retrieveAllGroups();
     }
 }
diff --git 
a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
index 0eab33b..2056f55 100644
--- 
a/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
+++ 
b/mailbox/event/event-cassandra/src/main/java/org/apache/james/mailbox/events/CassandraEventDeadLettersDAO.java
@@ -110,7 +110,7 @@ public class CassandraEventDeadLettersDAO {
         return executor.executeSingleRow(selectEventStatement.bind()
                 .setString(GROUP, group.asString())
                 .setUUID(EVENT_ID, failedEventId.getId()))
-                .map(row -> deserializeEvent(row.getString(EVENT)));
+            .map(row -> deserializeEvent(row.getString(EVENT)));
     }
 
     Flux<Event.EventId> retrieveEventIdsWithGroup(Group group) {
diff --git 
a/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java
 
b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java
new file mode 100644
index 0000000..2c108d4
--- /dev/null
+++ 
b/mailbox/event/event-cassandra/src/test/java/org/apache/james/mailbox/events/CassandraEventDeadLettersTest.java
@@ -0,0 +1,53 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.events;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.event.json.EventSerializer;
+import 
org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventContract;
+import 
org.apache.james.mailbox.events.EventDeadLettersContract.FailedEventsContract;
+import 
org.apache.james.mailbox.events.EventDeadLettersContract.GroupsWithFailedEventsContract;
+import org.apache.james.mailbox.events.EventDeadLettersContract.RemoveContract;
+import org.apache.james.mailbox.events.EventDeadLettersContract.StoreContract;
+import org.apache.james.mailbox.model.TestId;
+import org.apache.james.mailbox.model.TestMessageId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class CassandraEventDeadLettersTest implements StoreContract, RemoveContract, 
FailedEventContract, FailedEventsContract,
+    GroupsWithFailedEventsContract {
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraClusterExtension = new 
CassandraClusterExtension(CassandraEventDeadLettersModule.MODULE);
+
+    private CassandraEventDeadLetters eventDeadLetters;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandraCluster) {
+        EventSerializer eventSerializer = new EventSerializer(new 
TestId.Factory(), new TestMessageId.Factory());
+        eventDeadLetters = new CassandraEventDeadLetters(new 
CassandraEventDeadLettersDAO(cassandraCluster.getConf(), eventSerializer));
+    }
+
+    @Override
+    public EventDeadLetters eventDeadLetters() {
+        return eventDeadLetters;
+    }
+}
diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index 45fff05..fcddc5e 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -30,10 +30,6 @@ import reactor.core.publisher.MonoProcessor;
 
 public class MemoryEventDeadLetters implements EventDeadLetters {
 
-    private static final String REGISTERED_GROUP_CANNOT_BE_NULL = 
"registeredGroup cannot be null";
-    private static final String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = 
"failDeliveredEvent cannot be null";
-    private static final String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = 
"failDeliveredEventId cannot be null";
-
     private final Multimap<Group, Event> deadLetters;
 
     public MemoryEventDeadLetters() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to