This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b44d1bfcac3ed6143eb6a2e7448a96fefb0eb483
Author: datph <[email protected]>
AuthorDate: Wed Mar 13 17:10:32 2019 +0700

    MAILBOX-373 Refactor EventDeadLetter API + MemoryEventDeadLetters
---
 .../james/mailbox/events/EventDeadLetters.java     |  10 +-
 .../mailbox/events/ErrorHandlingContract.java      |  12 +-
 .../mailbox/events/EventDeadLettersContract.java   | 188 ++++++++++++---------
 .../mailbox/events/MemoryEventDeadLetters.java     |  40 ++---
 .../mailbox/events/delivery/EventDelivery.java     |   2 +-
 .../james/mailbox/events/GroupConsumerRetry.java   |   4 +-
 6 files changed, 142 insertions(+), 114 deletions(-)

diff --git 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
index 38ac4ae..89e373e 100644
--- 
a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
+++ 
b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java
@@ -80,15 +80,15 @@ public interface EventDeadLetters {
 
     String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null";
     String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be 
null";
-    String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId 
cannot be null";
+    String FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL = 
"failDeliveredInsertionId cannot be null";
 
-    Mono<Void> store(Group registeredGroup, Event failDeliveredEvent);
+    Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, 
InsertionId insertionId);
 
-    Mono<Void> remove(Group registeredGroup, Event.EventId 
failDeliveredEventId);
+    Mono<Void> remove(Group registeredGroup, InsertionId 
failDeliveredInsertionId);
 
-    Mono<Event> failedEvent(Group registeredGroup, Event.EventId 
failDeliveredEventId);
+    Mono<Event> failedEvent(Group registeredGroup, InsertionId 
failDeliveredInsertionId);
 
-    Flux<Event.EventId> failedEventIds(Group registeredGroup);
+    Flux<InsertionId> failedIds(Group registeredGroup);
 
     Flux<Group> groupsWithFailedEvents();
 }
diff --git 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
index 54cee40..b246c82 100644
--- 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
+++ 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java
@@ -252,8 +252,10 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(eventCollector, GROUP_A);
         eventBus().dispatch(EVENT, NO_KEYS).block();
 
-        WAIT_CONDITION.until(() -> 
assertThat(deadLetter().failedEventIds(GROUP_A).toIterable())
-            .containsOnly(EVENT.getEventId()));
+        WAIT_CONDITION.until(() -> assertThat(deadLetter().failedIds(GROUP_A)
+                .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, 
insertionId))
+                .toIterable())
+            .containsOnly(EVENT));
         assertThat(eventCollector.getEvents())
             .isEmpty();
     }
@@ -272,8 +274,10 @@ interface ErrorHandlingContract extends EventBusContract {
         eventBus().register(eventCollector, GROUP_A);
         eventBus().reDeliver(GROUP_A, EVENT).block();
 
-        WAIT_CONDITION.until(() -> 
assertThat(deadLetter().failedEventIds(GROUP_A).toIterable())
-            .containsOnly(EVENT.getEventId()));
+        WAIT_CONDITION.until(() -> assertThat(deadLetter().failedIds(GROUP_A)
+                .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, 
insertionId))
+                .toIterable())
+            .containsOnly(EVENT));
         assertThat(eventCollector.getEvents())
             .isEmpty();
     }
diff --git 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
index 6bb00e6..4b3bca8 100644
--- 
a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
+++ 
b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java
@@ -92,20 +92,23 @@ interface EventDeadLettersContract {
     MailboxListener.MailboxAdded EVENT_1 = new 
MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, 
EVENT_ID_1);
     MailboxListener.MailboxAdded EVENT_2 = new 
MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, 
EVENT_ID_2);
     MailboxListener.MailboxAdded EVENT_3 = new 
MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, 
EVENT_ID_3);
+    EventDeadLetters.InsertionId INSERTION_ID_1 = 
EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b7");
+    EventDeadLetters.InsertionId INSERTION_ID_2 = 
EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b8");
+    EventDeadLetters.InsertionId INSERTION_ID_3 = 
EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b9");
 
     Group GROUP_A = new EventBusTestFixture.GroupA();
     Group GROUP_B = new EventBusTestFixture.GroupB();
     Group NULL_GROUP = null;
     Event NULL_EVENT = null;
-    Event.EventId NULL_EVENT_ID = null;
+    EventDeadLetters.InsertionId NULL_INSERTION_ID = null;
 
     EventDeadLetters eventDeadLetters();
 
-    default Stream<Event.EventId> allEventIds() {
+    default Stream<EventDeadLetters.InsertionId> allInsertionIds() {
         EventDeadLetters eventDeadLetters = eventDeadLetters();
 
         return eventDeadLetters.groupsWithFailedEvents()
-            .flatMap(eventDeadLetters::failedEventIds)
+            .flatMap(eventDeadLetters::failedIds)
             .toStream();
     }
 
@@ -115,7 +118,7 @@ interface EventDeadLettersContract {
         default void storeShouldThrowWhenNullGroup() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, 
EVENT_1))
+            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, 
EVENT_1, INSERTION_ID_1))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -123,15 +126,23 @@ interface EventDeadLettersContract {
         default void storeShouldThrowWhenNullEvent() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, 
NULL_EVENT))
+            assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, 
NULL_EVENT, INSERTION_ID_1))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
-        default void storeShouldThrowWhenBothGroupAndEventAreNull() {
+        default void storeShouldThrowWhenNullInsertionId() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, 
NULL_EVENT))
+            assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, EVENT_1, 
NULL_INSERTION_ID))
+                .isInstanceOf(IllegalArgumentException.class);
+        }
+
+        @Test
+        default void 
storeShouldThrowWhenBothGroupAndEventAndInsertionIdAreNull() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
+
+            assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, 
NULL_EVENT, NULL_INSERTION_ID))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -139,21 +150,20 @@ interface EventDeadLettersContract {
         default void storeShouldStoreGroupWithCorrespondingEvent() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
EVENT_1.getEventId()).block())
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
INSERTION_ID_1).block())
                 .isEqualTo(EVENT_1);
         }
 
         @Test
-        default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() {
+        default void 
storeShouldStoreDuplicatedEventsPerGroupWhenStoreWithDifferentInsertionId() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_2).block();
 
-            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
-                .containsExactly(EVENT_ID_1);
+            assertThat(eventDeadLetters.failedIds(GROUP_A).toStream())
+                .containsOnly(INSERTION_ID_1, INSERTION_ID_2);
         }
 
         @Test
@@ -161,13 +171,14 @@ interface EventDeadLettersContract {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
             ImmutableMap<Integer, Group> groups = concurrentGroups();
-            Multimap<Integer, Event.EventId> storedEventIds = 
Multimaps.synchronizedSetMultimap(HashMultimap.create());
+            Multimap<Integer, EventDeadLetters.InsertionId> storedInsertionIds 
= Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, step) -> {
                     Event.EventId eventId = Event.EventId.random();
-                    storedEventIds.put(threadNumber, eventId);
-                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId)).subscribe();
+                    EventDeadLetters.InsertionId insertionId = 
EventDeadLetters.InsertionId.random();
+                    storedInsertionIds.put(threadNumber, insertionId);
+                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId), insertionId).subscribe();
                 })
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
@@ -175,8 +186,8 @@ interface EventDeadLettersContract {
 
             groups.forEach((groupId, group) -> {
                 Group storedGroup = groups.get(groupId);
-                
assertThat(eventDeadLetters.failedEventIds(storedGroup).collectList().block())
-                    .hasSameElementsAs(storedEventIds.get(groupId));
+                
assertThat(eventDeadLetters.failedIds(storedGroup).collectList().block())
+                    .hasSameElementsAs(storedInsertionIds.get(groupId));
             });
         }
     }
@@ -187,23 +198,23 @@ interface EventDeadLettersContract {
         default void removeShouldThrowWhenGroupIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, 
EVENT_ID_1))
+            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, 
INSERTION_ID_1))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
-        default void removeShouldThrowWhenEventIdIsNull() {
+        default void removeShouldThrowWhenInsertionIdIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, 
NULL_EVENT_ID))
+            assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, 
NULL_INSERTION_ID))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
-        default void removeShouldThrowWhenBothGroupAndEventIdAreNull() {
+        default void removeShouldThrowWhenBothGroupAndInsertionIdAreNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, 
NULL_EVENT_ID))
+            assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, 
NULL_INSERTION_ID))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -211,12 +222,12 @@ interface EventDeadLettersContract {
         default void removeShouldRemoveMatched() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
 
-            eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+            eventDeadLetters.remove(GROUP_A, INSERTION_ID_1).block();
 
-            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
EVENT_1.getEventId()).block())
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
INSERTION_ID_1).block())
                 .isNull();
         }
 
@@ -224,23 +235,33 @@ interface EventDeadLettersContract {
         default void removeShouldKeepNonMatched() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
-            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block();
+
+            eventDeadLetters.remove(GROUP_A, INSERTION_ID_1).block();
+
+            assertThat(eventDeadLetters.failedIds(GROUP_A).toStream())
+                .containsOnly(INSERTION_ID_2, INSERTION_ID_3);
+        }
+
+        @Test
+        default void removeShouldNotThrowWhenNoInsertionIdMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
 
-            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
-                .containsOnly(EVENT_ID_2, EVENT_ID_3);
+            assertThatCode(() -> eventDeadLetters.remove(GROUP_A, 
INSERTION_ID_2).block())
+                .doesNotThrowAnyException();
         }
 
         @Test
-        default void removeShouldNotThrowWhenNoMatched() {
+        default void removeShouldNotThrowWhenNoGroupMatched() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
 
-            assertThatCode(() -> eventDeadLetters.remove(GROUP_A, 
EVENT_2.getEventId()).block())
+            assertThatCode(() -> eventDeadLetters.remove(GROUP_B, 
INSERTION_ID_1).block())
                 .doesNotThrowAnyException();
         }
 
@@ -249,14 +270,15 @@ interface EventDeadLettersContract {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
             ImmutableMap<Integer, Group> groups = concurrentGroups();
-            ConcurrentHashMap<Integer, Event.EventId> storedEventIds = new 
ConcurrentHashMap<>();
+            ConcurrentHashMap<Integer, EventDeadLetters.InsertionId> 
storedInsertionIds = new ConcurrentHashMap<>();
 
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, step) -> {
                     int operationIndex = threadNumber * OPERATION_COUNT + step;
                     Event.EventId eventId = Event.EventId.random();
-                    storedEventIds.put(operationIndex, eventId);
-                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId)).subscribe();
+                    EventDeadLetters.InsertionId insertionId = 
EventDeadLetters.InsertionId.random();
+                    storedInsertionIds.put(operationIndex, insertionId);
+                    eventDeadLetters.store(groups.get(threadNumber), 
event(eventId), insertionId).subscribe();
                 })
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
@@ -265,14 +287,14 @@ interface EventDeadLettersContract {
             ConcurrentTestRunner.builder()
                 .operation((threadNumber, step) -> {
                     int operationIndex = threadNumber * OPERATION_COUNT + step;
-                    eventDeadLetters.remove(groups.get(threadNumber), 
storedEventIds.get(operationIndex))
+                    eventDeadLetters.remove(groups.get(threadNumber), 
storedInsertionIds.get(operationIndex))
                         .subscribe();
                 })
                 .threadCount(THREAD_COUNT)
                 .operationCount(OPERATION_COUNT)
                 .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN);
 
-            assertThat(allEventIds())
+            assertThat(allInsertionIds())
                 .isEmpty();
         }
     }
@@ -283,23 +305,23 @@ interface EventDeadLettersContract {
         default void failedEventShouldThrowWhenGroupIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, 
EVENT_ID_1))
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, 
INSERTION_ID_1))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
-        default void failedEventShouldThrowWhenEventIdIsNull() {
+        default void failedEventShouldThrowWhenInsertionIdIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, 
NULL_EVENT_ID))
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, 
NULL_INSERTION_ID))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
         @Test
-        default void failedEventShouldThrowWhenBothGroupAndEventIdAreNull() {
+        default void 
failedEventShouldThrowWhenBothGroupAndInsertionIdAreNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, 
NULL_EVENT_ID))
+            assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, 
NULL_INSERTION_ID))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -307,10 +329,10 @@ interface EventDeadLettersContract {
         default void failedEventShouldReturnEmptyWhenNotFound() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
 
-            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
EVENT_ID_3).block())
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
INSERTION_ID_3).block())
                 .isNull();
         }
 
@@ -318,10 +340,10 @@ interface EventDeadLettersContract {
         default void failedEventShouldReturnEventWhenContains() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
 
-            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
EVENT_1.getEventId()).block())
+            assertThat(eventDeadLetters.failedEvent(GROUP_A, 
INSERTION_ID_1).block())
                 .isEqualTo(EVENT_1);
         }
 
@@ -329,14 +351,24 @@ interface EventDeadLettersContract {
         default void failedEventShouldNotRemoveEvent() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
-            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block();
+
+            eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_1).block();
+
+            assertThat(allInsertionIds())
+                .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
+        }
 
-            eventDeadLetters.failedEvent(GROUP_A, 
EVENT_1.getEventId()).block();
+        @Test
+        default void failedEventShouldNotThrowWhenNoGroupMatched() {
+            EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThat(allEventIds())
-                .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+
+            assertThatCode(() -> eventDeadLetters.failedEvent(GROUP_B, 
INSERTION_ID_1).block())
+                .doesNotThrowAnyException();
         }
     }
 
@@ -346,7 +378,7 @@ interface EventDeadLettersContract {
         default void failedEventsShouldThrowWhenGroupIsNull() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            assertThatThrownBy(() -> 
eventDeadLetters.failedEventIds(NULL_GROUP))
+            assertThatThrownBy(() -> eventDeadLetters.failedIds(NULL_GROUP))
                 .isInstanceOf(IllegalArgumentException.class);
         }
 
@@ -354,11 +386,11 @@ interface EventDeadLettersContract {
         default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
-            eventDeadLetters.store(GROUP_A, EVENT_3).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
+            eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block();
 
-            assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream())
+            assertThat(eventDeadLetters.failedIds(GROUP_B).toStream())
                 .isEmpty();
         }
 
@@ -366,26 +398,26 @@ interface EventDeadLettersContract {
         default void 
failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
-            eventDeadLetters.store(GROUP_B, EVENT_3).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
+            eventDeadLetters.store(GROUP_B, EVENT_3, INSERTION_ID_3).block();
 
-            assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream())
-                .containsOnly(EVENT_ID_1, EVENT_ID_2);
+            assertThat(eventDeadLetters.failedIds(GROUP_A).toStream())
+                .containsOnly(INSERTION_ID_1, INSERTION_ID_2);
         }
 
         @Test
         default void failedEventsByGroupShouldNotRemoveEvents() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
 
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_A, EVENT_2).block();
-            eventDeadLetters.store(GROUP_B, EVENT_3).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block();
+            eventDeadLetters.store(GROUP_B, EVENT_3, INSERTION_ID_3).block();
 
-            eventDeadLetters.failedEventIds(GROUP_A).toStream();
+            eventDeadLetters.failedIds(GROUP_A).toStream();
 
-            assertThat(allEventIds())
-                .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3);
+            assertThat(allInsertionIds())
+                .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3);
         }
     }
 
@@ -393,8 +425,8 @@ interface EventDeadLettersContract {
         @Test
         default void groupsWithFailedEventsShouldReturnAllStoredGroups() {
             EventDeadLetters eventDeadLetters = eventDeadLetters();
-            eventDeadLetters.store(GROUP_A, EVENT_1).block();
-            eventDeadLetters.store(GROUP_B, EVENT_1).block();
+            eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block();
+            eventDeadLetters.store(GROUP_B, EVENT_1, INSERTION_ID_2).block();
 
             
assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block())
                 .containsOnly(GROUP_A, GROUP_B);
diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
index fdeec7c..ed324f2 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
+++ 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java
@@ -20,10 +20,8 @@
 package org.apache.james.mailbox.events;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -31,65 +29,59 @@ import reactor.core.publisher.MonoProcessor;
 
 public class MemoryEventDeadLetters implements EventDeadLetters {
 
-    private final Multimap<Group, Event> deadLetters;
+    private final Table<Group, InsertionId, Event> deadLetters;
 
     public MemoryEventDeadLetters() {
-        this.deadLetters = HashMultimap.create();
+        this.deadLetters = HashBasedTable.create();
     }
 
     @Override
-    public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) {
+    public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, 
InsertionId insertionId) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
         Preconditions.checkArgument(failDeliveredEvent != null, 
FAIL_DELIVERED_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(insertionId != null, 
FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
         synchronized (deadLetters) {
-            return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, 
failDeliveredEvent))
+            return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, 
insertionId, failDeliveredEvent))
                 .subscribeWith(MonoProcessor.create())
                 .then();
         }
     }
 
     @Override
-    public Mono<Void> remove(Group registeredGroup, Event.EventId 
failDeliveredEventId) {
+    public Mono<Void> remove(Group registeredGroup, InsertionId 
failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
-        Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredInsertionId != null, 
FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
         synchronized (deadLetters) {
-            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
-                .filter(event -> 
event.getEventId().equals(failDeliveredEventId))
-                .next()
-                .doOnNext(event -> deadLetters.remove(registeredGroup, event))
-                .subscribeWith(MonoProcessor.create())
+            return Mono.justOrEmpty(deadLetters.remove(registeredGroup, 
failDeliveredInsertionId))
                 .then();
         }
     }
 
     @Override
-    public Mono<Event> failedEvent(Group registeredGroup, Event.EventId 
failDeliveredEventId) {
+    public Mono<Event> failedEvent(Group registeredGroup, InsertionId 
failDeliveredInsertionId) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
-        Preconditions.checkArgument(failDeliveredEventId != null, 
FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL);
+        Preconditions.checkArgument(failDeliveredInsertionId != null, 
FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL);
 
         synchronized (deadLetters) {
-            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
-                .filter(event -> 
event.getEventId().equals(failDeliveredEventId))
-                .next();
+            return Mono.justOrEmpty(deadLetters.get(registeredGroup, 
failDeliveredInsertionId));
         }
     }
 
     @Override
-    public Flux<Event.EventId> failedEventIds(Group registeredGroup) {
+    public Flux<InsertionId> failedIds(Group registeredGroup) {
         Preconditions.checkArgument(registeredGroup != null, 
REGISTERED_GROUP_CANNOT_BE_NULL);
 
         synchronized (deadLetters) {
-            return 
Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup)))
-                .map(Event::getEventId);
+            return 
Flux.fromIterable(deadLetters.row(registeredGroup).keySet());
         }
     }
 
     @Override
     public Flux<Group> groupsWithFailedEvents() {
         synchronized (deadLetters) {
-            return 
Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet()));
+            return Flux.fromIterable(deadLetters.rowKeySet());
         }
     }
 }
diff --git 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
index c30a709..e7fbcc1 100644
--- 
a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
+++ 
b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java
@@ -121,7 +121,7 @@ public interface EventDelivery {
 
             @Override
             public Mono<Void> handle(Event event) {
-                return eventDeadLetters.store(group, event);
+                return eventDeadLetters.store(group, event, 
EventDeadLetters.InsertionId.random());
             }
         }
 
diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
index 1994e45..9fdd440 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java
@@ -109,7 +109,7 @@ class GroupConsumerRetry {
 
     private Mono<Void> retryOrStoreToDeadLetter(Event event, int 
currentRetryCount) {
         if (currentRetryCount >= retryBackoff.getMaxRetries()) {
-            return eventDeadLetters.store(group, event);
+            return eventDeadLetters.store(group, event, 
EventDeadLetters.InsertionId.random());
         }
         return sendRetryMessage(event, currentRetryCount);
     }
@@ -128,7 +128,7 @@ class GroupConsumerRetry {
         return sender.send(retryMessage)
             .doOnError(throwable -> createStructuredLogger(event)
                 .log(logger -> logger.error("Exception happens when publishing 
event to retry exchange, this event will be stored in deadLetter", throwable)))
-            .onErrorResume(e -> eventDeadLetters.store(group, event));
+            .onErrorResume(e -> eventDeadLetters.store(group, event, 
EventDeadLetters.InsertionId.random()));
     }
 
     private StructuredLogger createStructuredLogger(Event event) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to