This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b44d1bfcac3ed6143eb6a2e7448a96fefb0eb483 Author: datph <[email protected]> AuthorDate: Wed Mar 13 17:10:32 2019 +0700 MAILBOX-373 Refactor EventDeadLetter API + MemoryEventDeadLetters --- .../james/mailbox/events/EventDeadLetters.java | 10 +- .../mailbox/events/ErrorHandlingContract.java | 12 +- .../mailbox/events/EventDeadLettersContract.java | 188 ++++++++++++--------- .../mailbox/events/MemoryEventDeadLetters.java | 40 ++--- .../mailbox/events/delivery/EventDelivery.java | 2 +- .../james/mailbox/events/GroupConsumerRetry.java | 4 +- 6 files changed, 142 insertions(+), 114 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java index 38ac4ae..89e373e 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/events/EventDeadLetters.java @@ -80,15 +80,15 @@ public interface EventDeadLetters { String REGISTERED_GROUP_CANNOT_BE_NULL = "registeredGroup cannot be null"; String FAIL_DELIVERED_EVENT_CANNOT_BE_NULL = "failDeliveredEvent cannot be null"; - String FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL = "failDeliveredEventId cannot be null"; + String FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL = "failDeliveredInsertionId cannot be null"; - Mono<Void> store(Group registeredGroup, Event failDeliveredEvent); + Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId); - Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId); + Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId); - Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId); + Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId); - Flux<Event.EventId> failedEventIds(Group registeredGroup); + Flux<InsertionId> failedIds(Group registeredGroup); Flux<Group> groupsWithFailedEvents(); } diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java index 54cee40..b246c82 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/ErrorHandlingContract.java @@ -252,8 +252,10 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(eventCollector, GROUP_A); eventBus().dispatch(EVENT, NO_KEYS).block(); - WAIT_CONDITION.until(() -> assertThat(deadLetter().failedEventIds(GROUP_A).toIterable()) - .containsOnly(EVENT.getEventId())); + WAIT_CONDITION.until(() -> assertThat(deadLetter().failedIds(GROUP_A) + .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, insertionId)) + .toIterable()) + .containsOnly(EVENT)); assertThat(eventCollector.getEvents()) .isEmpty(); } @@ -272,8 +274,10 @@ interface ErrorHandlingContract extends EventBusContract { eventBus().register(eventCollector, GROUP_A); eventBus().reDeliver(GROUP_A, EVENT).block(); - WAIT_CONDITION.until(() -> assertThat(deadLetter().failedEventIds(GROUP_A).toIterable()) - .containsOnly(EVENT.getEventId())); + WAIT_CONDITION.until(() -> assertThat(deadLetter().failedIds(GROUP_A) + .flatMap(insertionId -> deadLetter().failedEvent(GROUP_A, insertionId)) + .toIterable()) + .containsOnly(EVENT)); assertThat(eventCollector.getEvents()) .isEmpty(); } diff --git a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java index 6bb00e6..4b3bca8 100644 --- a/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java +++ b/mailbox/api/src/test/java/org/apache/james/mailbox/events/EventDeadLettersContract.java @@ -92,20 +92,23 @@ interface EventDeadLettersContract { MailboxListener.MailboxAdded EVENT_1 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_1); MailboxListener.MailboxAdded EVENT_2 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_2); MailboxListener.MailboxAdded EVENT_3 = new MailboxListener.MailboxAdded(SESSION_ID, USER, MAILBOX_PATH, MAILBOX_ID, EVENT_ID_3); + EventDeadLetters.InsertionId INSERTION_ID_1 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b7"); + EventDeadLetters.InsertionId INSERTION_ID_2 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b8"); + EventDeadLetters.InsertionId INSERTION_ID_3 = EventDeadLetters.InsertionId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b9"); Group GROUP_A = new EventBusTestFixture.GroupA(); Group GROUP_B = new EventBusTestFixture.GroupB(); Group NULL_GROUP = null; Event NULL_EVENT = null; - Event.EventId NULL_EVENT_ID = null; + EventDeadLetters.InsertionId NULL_INSERTION_ID = null; EventDeadLetters eventDeadLetters(); - default Stream<Event.EventId> allEventIds() { + default Stream<EventDeadLetters.InsertionId> allInsertionIds() { EventDeadLetters eventDeadLetters = eventDeadLetters(); return eventDeadLetters.groupsWithFailedEvents() - .flatMap(eventDeadLetters::failedEventIds) + .flatMap(eventDeadLetters::failedIds) .toStream(); } @@ -115,7 +118,7 @@ interface EventDeadLettersContract { default void storeShouldThrowWhenNullGroup() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, EVENT_1)) + assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, EVENT_1, INSERTION_ID_1)) .isInstanceOf(IllegalArgumentException.class); } @@ -123,15 +126,23 @@ interface EventDeadLettersContract { default void storeShouldThrowWhenNullEvent() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, NULL_EVENT)) + assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, NULL_EVENT, INSERTION_ID_1)) .isInstanceOf(IllegalArgumentException.class); } @Test - default void storeShouldThrowWhenBothGroupAndEventAreNull() { + default void storeShouldThrowWhenNullInsertionId() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, NULL_EVENT)) + assertThatThrownBy(() -> eventDeadLetters.store(GROUP_A, EVENT_1, NULL_INSERTION_ID)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + default void storeShouldThrowWhenBothGroupAndEventAndInsertionIdAreNull() { + EventDeadLetters eventDeadLetters = eventDeadLetters(); + + assertThatThrownBy(() -> eventDeadLetters.store(NULL_GROUP, NULL_EVENT, NULL_INSERTION_ID)) .isInstanceOf(IllegalArgumentException.class); } @@ -139,21 +150,20 @@ interface EventDeadLettersContract { default void storeShouldStoreGroupWithCorrespondingEvent() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block()) + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + assertThat(eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_1).block()) .isEqualTo(EVENT_1); } @Test - default void storeShouldIgnoreStoreDuplicatedEventsPerGroup() { + default void storeShouldStoreDuplicatedEventsPerGroupWhenStoreWithDifferentInsertionId() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_2).block(); - assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream()) - .containsExactly(EVENT_ID_1); + assertThat(eventDeadLetters.failedIds(GROUP_A).toStream()) + .containsOnly(INSERTION_ID_1, INSERTION_ID_2); } @Test @@ -161,13 +171,14 @@ interface EventDeadLettersContract { EventDeadLetters eventDeadLetters = eventDeadLetters(); ImmutableMap<Integer, Group> groups = concurrentGroups(); - Multimap<Integer, Event.EventId> storedEventIds = Multimaps.synchronizedSetMultimap(HashMultimap.create()); + Multimap<Integer, EventDeadLetters.InsertionId> storedInsertionIds = Multimaps.synchronizedSetMultimap(HashMultimap.create()); ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { Event.EventId eventId = Event.EventId.random(); - storedEventIds.put(threadNumber, eventId); - eventDeadLetters.store(groups.get(threadNumber), event(eventId)).subscribe(); + EventDeadLetters.InsertionId insertionId = EventDeadLetters.InsertionId.random(); + storedInsertionIds.put(threadNumber, insertionId); + eventDeadLetters.store(groups.get(threadNumber), event(eventId), insertionId).subscribe(); }) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) @@ -175,8 +186,8 @@ interface EventDeadLettersContract { groups.forEach((groupId, group) -> { Group storedGroup = groups.get(groupId); - assertThat(eventDeadLetters.failedEventIds(storedGroup).collectList().block()) - .hasSameElementsAs(storedEventIds.get(groupId)); + assertThat(eventDeadLetters.failedIds(storedGroup).collectList().block()) + .hasSameElementsAs(storedInsertionIds.get(groupId)); }); } } @@ -187,23 +198,23 @@ interface EventDeadLettersContract { default void removeShouldThrowWhenGroupIsNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, EVENT_ID_1)) + assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, INSERTION_ID_1)) .isInstanceOf(IllegalArgumentException.class); } @Test - default void removeShouldThrowWhenEventIdIsNull() { + default void removeShouldThrowWhenInsertionIdIsNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, NULL_EVENT_ID)) + assertThatThrownBy(() -> eventDeadLetters.remove(GROUP_A, NULL_INSERTION_ID)) .isInstanceOf(IllegalArgumentException.class); } @Test - default void removeShouldThrowWhenBothGroupAndEventIdAreNull() { + default void removeShouldThrowWhenBothGroupAndInsertionIdAreNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, NULL_EVENT_ID)) + assertThatThrownBy(() -> eventDeadLetters.remove(NULL_GROUP, NULL_INSERTION_ID)) .isInstanceOf(IllegalArgumentException.class); } @@ -211,12 +222,12 @@ interface EventDeadLettersContract { default void removeShouldRemoveMatched() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); - eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block(); + eventDeadLetters.remove(GROUP_A, INSERTION_ID_1).block(); - assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block()) + assertThat(eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_1).block()) .isNull(); } @@ -224,23 +235,33 @@ interface EventDeadLettersContract { default void removeShouldKeepNonMatched() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); - eventDeadLetters.store(GROUP_A, EVENT_3).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block(); + + eventDeadLetters.remove(GROUP_A, INSERTION_ID_1).block(); + + assertThat(eventDeadLetters.failedIds(GROUP_A).toStream()) + .containsOnly(INSERTION_ID_2, INSERTION_ID_3); + } + + @Test + default void removeShouldNotThrowWhenNoInsertionIdMatched() { + EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.remove(GROUP_A, EVENT_1.getEventId()).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); - assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream()) - .containsOnly(EVENT_ID_2, EVENT_ID_3); + assertThatCode(() -> eventDeadLetters.remove(GROUP_A, INSERTION_ID_2).block()) + .doesNotThrowAnyException(); } @Test - default void removeShouldNotThrowWhenNoMatched() { + default void removeShouldNotThrowWhenNoGroupMatched() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); - assertThatCode(() -> eventDeadLetters.remove(GROUP_A, EVENT_2.getEventId()).block()) + assertThatCode(() -> eventDeadLetters.remove(GROUP_B, INSERTION_ID_1).block()) .doesNotThrowAnyException(); } @@ -249,14 +270,15 @@ interface EventDeadLettersContract { EventDeadLetters eventDeadLetters = eventDeadLetters(); ImmutableMap<Integer, Group> groups = concurrentGroups(); - ConcurrentHashMap<Integer, Event.EventId> storedEventIds = new ConcurrentHashMap<>(); + ConcurrentHashMap<Integer, EventDeadLetters.InsertionId> storedInsertionIds = new ConcurrentHashMap<>(); ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { int operationIndex = threadNumber * OPERATION_COUNT + step; Event.EventId eventId = Event.EventId.random(); - storedEventIds.put(operationIndex, eventId); - eventDeadLetters.store(groups.get(threadNumber), event(eventId)).subscribe(); + EventDeadLetters.InsertionId insertionId = EventDeadLetters.InsertionId.random(); + storedInsertionIds.put(operationIndex, insertionId); + eventDeadLetters.store(groups.get(threadNumber), event(eventId), insertionId).subscribe(); }) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) @@ -265,14 +287,14 @@ interface EventDeadLettersContract { ConcurrentTestRunner.builder() .operation((threadNumber, step) -> { int operationIndex = threadNumber * OPERATION_COUNT + step; - eventDeadLetters.remove(groups.get(threadNumber), storedEventIds.get(operationIndex)) + eventDeadLetters.remove(groups.get(threadNumber), storedInsertionIds.get(operationIndex)) .subscribe(); }) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) .runSuccessfullyWithin(RUN_SUCCESSFULLY_IN); - assertThat(allEventIds()) + assertThat(allInsertionIds()) .isEmpty(); } } @@ -283,23 +305,23 @@ interface EventDeadLettersContract { default void failedEventShouldThrowWhenGroupIsNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, EVENT_ID_1)) + assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, INSERTION_ID_1)) .isInstanceOf(IllegalArgumentException.class); } @Test - default void failedEventShouldThrowWhenEventIdIsNull() { + default void failedEventShouldThrowWhenInsertionIdIsNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, NULL_EVENT_ID)) + assertThatThrownBy(() -> eventDeadLetters.failedEvent(GROUP_A, NULL_INSERTION_ID)) .isInstanceOf(IllegalArgumentException.class); } @Test - default void failedEventShouldThrowWhenBothGroupAndEventIdAreNull() { + default void failedEventShouldThrowWhenBothGroupAndInsertionIdAreNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, NULL_EVENT_ID)) + assertThatThrownBy(() -> eventDeadLetters.failedEvent(NULL_GROUP, NULL_INSERTION_ID)) .isInstanceOf(IllegalArgumentException.class); } @@ -307,10 +329,10 @@ interface EventDeadLettersContract { default void failedEventShouldReturnEmptyWhenNotFound() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); - assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_ID_3).block()) + assertThat(eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_3).block()) .isNull(); } @@ -318,10 +340,10 @@ interface EventDeadLettersContract { default void failedEventShouldReturnEventWhenContains() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); - assertThat(eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block()) + assertThat(eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_1).block()) .isEqualTo(EVENT_1); } @@ -329,14 +351,24 @@ interface EventDeadLettersContract { default void failedEventShouldNotRemoveEvent() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); - eventDeadLetters.store(GROUP_A, EVENT_3).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block(); + + eventDeadLetters.failedEvent(GROUP_A, INSERTION_ID_1).block(); + + assertThat(allInsertionIds()) + .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3); + } - eventDeadLetters.failedEvent(GROUP_A, EVENT_1.getEventId()).block(); + @Test + default void failedEventShouldNotThrowWhenNoGroupMatched() { + EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThat(allEventIds()) - .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + + assertThatCode(() -> eventDeadLetters.failedEvent(GROUP_B, INSERTION_ID_1).block()) + .doesNotThrowAnyException(); } } @@ -346,7 +378,7 @@ interface EventDeadLettersContract { default void failedEventsShouldThrowWhenGroupIsNull() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - assertThatThrownBy(() -> eventDeadLetters.failedEventIds(NULL_GROUP)) + assertThatThrownBy(() -> eventDeadLetters.failedIds(NULL_GROUP)) .isInstanceOf(IllegalArgumentException.class); } @@ -354,11 +386,11 @@ interface EventDeadLettersContract { default void failedEventsByGroupShouldReturnEmptyWhenNonMatch() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); - eventDeadLetters.store(GROUP_A, EVENT_3).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); + eventDeadLetters.store(GROUP_A, EVENT_3, INSERTION_ID_3).block(); - assertThat(eventDeadLetters.failedEventIds(GROUP_B).toStream()) + assertThat(eventDeadLetters.failedIds(GROUP_B).toStream()) .isEmpty(); } @@ -366,26 +398,26 @@ interface EventDeadLettersContract { default void failedEventsByGroupShouldReturnAllEventsCorrespondingToGivenGroup() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); - eventDeadLetters.store(GROUP_B, EVENT_3).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); + eventDeadLetters.store(GROUP_B, EVENT_3, INSERTION_ID_3).block(); - assertThat(eventDeadLetters.failedEventIds(GROUP_A).toStream()) - .containsOnly(EVENT_ID_1, EVENT_ID_2); + assertThat(eventDeadLetters.failedIds(GROUP_A).toStream()) + .containsOnly(INSERTION_ID_1, INSERTION_ID_2); } @Test default void failedEventsByGroupShouldNotRemoveEvents() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_A, EVENT_2).block(); - eventDeadLetters.store(GROUP_B, EVENT_3).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_2, INSERTION_ID_2).block(); + eventDeadLetters.store(GROUP_B, EVENT_3, INSERTION_ID_3).block(); - eventDeadLetters.failedEventIds(GROUP_A).toStream(); + eventDeadLetters.failedIds(GROUP_A).toStream(); - assertThat(allEventIds()) - .containsOnly(EVENT_ID_1, EVENT_ID_2, EVENT_ID_3); + assertThat(allInsertionIds()) + .containsOnly(INSERTION_ID_1, INSERTION_ID_2, INSERTION_ID_3); } } @@ -393,8 +425,8 @@ interface EventDeadLettersContract { @Test default void groupsWithFailedEventsShouldReturnAllStoredGroups() { EventDeadLetters eventDeadLetters = eventDeadLetters(); - eventDeadLetters.store(GROUP_A, EVENT_1).block(); - eventDeadLetters.store(GROUP_B, EVENT_1).block(); + eventDeadLetters.store(GROUP_A, EVENT_1, INSERTION_ID_1).block(); + eventDeadLetters.store(GROUP_B, EVENT_1, INSERTION_ID_2).block(); assertThat(eventDeadLetters.groupsWithFailedEvents().collectList().block()) .containsOnly(GROUP_A, GROUP_B); diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java index fdeec7c..ed324f2 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/MemoryEventDeadLetters.java @@ -20,10 +20,8 @@ package org.apache.james.mailbox.events; import com.google.common.base.Preconditions; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -31,65 +29,59 @@ import reactor.core.publisher.MonoProcessor; public class MemoryEventDeadLetters implements EventDeadLetters { - private final Multimap<Group, Event> deadLetters; + private final Table<Group, InsertionId, Event> deadLetters; public MemoryEventDeadLetters() { - this.deadLetters = HashMultimap.create(); + this.deadLetters = HashBasedTable.create(); } @Override - public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent) { + public Mono<Void> store(Group registeredGroup, Event failDeliveredEvent, InsertionId insertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); Preconditions.checkArgument(failDeliveredEvent != null, FAIL_DELIVERED_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(insertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); synchronized (deadLetters) { - return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, failDeliveredEvent)) + return Mono.fromRunnable(() -> deadLetters.put(registeredGroup, insertionId, failDeliveredEvent)) .subscribeWith(MonoProcessor.create()) .then(); } } @Override - public Mono<Void> remove(Group registeredGroup, Event.EventId failDeliveredEventId) { + public Mono<Void> remove(Group registeredGroup, InsertionId failDeliveredInsertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); synchronized (deadLetters) { - return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next() - .doOnNext(event -> deadLetters.remove(registeredGroup, event)) - .subscribeWith(MonoProcessor.create()) + return Mono.justOrEmpty(deadLetters.remove(registeredGroup, failDeliveredInsertionId)) .then(); } } @Override - public Mono<Event> failedEvent(Group registeredGroup, Event.EventId failDeliveredEventId) { + public Mono<Event> failedEvent(Group registeredGroup, InsertionId failDeliveredInsertionId) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); - Preconditions.checkArgument(failDeliveredEventId != null, FAIL_DELIVERED_ID_EVENT_CANNOT_BE_NULL); + Preconditions.checkArgument(failDeliveredInsertionId != null, FAIL_DELIVERED_ID_INSERTION_CANNOT_BE_NULL); synchronized (deadLetters) { - return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) - .filter(event -> event.getEventId().equals(failDeliveredEventId)) - .next(); + return Mono.justOrEmpty(deadLetters.get(registeredGroup, failDeliveredInsertionId)); } } @Override - public Flux<Event.EventId> failedEventIds(Group registeredGroup) { + public Flux<InsertionId> failedIds(Group registeredGroup) { Preconditions.checkArgument(registeredGroup != null, REGISTERED_GROUP_CANNOT_BE_NULL); synchronized (deadLetters) { - return Flux.fromIterable(ImmutableList.copyOf(deadLetters.get(registeredGroup))) - .map(Event::getEventId); + return Flux.fromIterable(deadLetters.row(registeredGroup).keySet()); } } @Override public Flux<Group> groupsWithFailedEvents() { synchronized (deadLetters) { - return Flux.fromIterable(ImmutableSet.copyOf(deadLetters.keySet())); + return Flux.fromIterable(deadLetters.rowKeySet()); } } } diff --git a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java index c30a709..e7fbcc1 100644 --- a/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java +++ b/mailbox/event/event-memory/src/main/java/org/apache/james/mailbox/events/delivery/EventDelivery.java @@ -121,7 +121,7 @@ public interface EventDelivery { @Override public Mono<Void> handle(Event event) { - return eventDeadLetters.store(group, event); + return eventDeadLetters.store(group, event, EventDeadLetters.InsertionId.random()); } } diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java index 1994e45..9fdd440 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupConsumerRetry.java @@ -109,7 +109,7 @@ class GroupConsumerRetry { private Mono<Void> retryOrStoreToDeadLetter(Event event, int currentRetryCount) { if (currentRetryCount >= retryBackoff.getMaxRetries()) { - return eventDeadLetters.store(group, event); + return eventDeadLetters.store(group, event, EventDeadLetters.InsertionId.random()); } return sendRetryMessage(event, currentRetryCount); } @@ -128,7 +128,7 @@ class GroupConsumerRetry { return sender.send(retryMessage) .doOnError(throwable -> createStructuredLogger(event) .log(logger -> logger.error("Exception happens when publishing event to retry exchange, this event will be stored in deadLetter", throwable))) - .onErrorResume(e -> eventDeadLetters.store(group, event)); + .onErrorResume(e -> eventDeadLetters.store(group, event, EventDeadLetters.InsertionId.random())); } private StructuredLogger createStructuredLogger(Event event) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
