This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4333ce0cea55067145b4df6845a0fe9df8f5970e Author: Rene Cordier <[email protected]> AuthorDate: Fri Aug 29 16:37:54 2025 +0700 JAMES-3340 Memory implementation for collapseThreads handling with EmailQueryView --- .../memory/projections/MemoryEmailQueryView.java | 41 +++++-- .../projections/MemoryEmailQueryViewTest.java | 130 +++++++++++++++++++++ 2 files changed, 161 insertions(+), 10 deletions(-) diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java index 2504f027fe..9097f3c8a2 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java @@ -50,7 +50,9 @@ public class MemoryEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return Flux.fromIterable(entries.row(mailboxId).values()) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -60,8 +62,10 @@ public class MemoryEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return Flux.fromIterable(entries.row(mailboxId).values()) - .filter(e -> e.getSentAt().isAfter(since) || e.getSentAt().isEqual(since)) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) + .filter(e -> e.getSentAt().isAfter(since) || e.getSentAt().isEqual(since)); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -71,8 +75,10 @@ public class MemoryEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return Flux.fromIterable(entries.row(mailboxId).values()) - .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) + .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -80,7 +86,9 @@ public class MemoryEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { - return Flux.fromIterable(entries.row(mailboxId).values()) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -88,8 +96,10 @@ public class MemoryEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - return Flux.fromIterable(entries.row(mailboxId).values()) - .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) + .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -97,13 +107,24 @@ public class MemoryEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime before, Limit limit, boolean collapseThreads) { - return Flux.fromIterable(entries.row(mailboxId).values()) - .filter(e -> e.getReceivedAt().isBefore(before) || e.getReceivedAt().isEqual(before)) + Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) + .filter(e -> e.getReceivedAt().isBefore(before) || e.getReceivedAt().isEqual(before)); + + return maybeCollapseThreads(baseEntries, collapseThreads) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); } + private Flux<Entry> maybeCollapseThreads(Flux<Entry> entries, boolean collapseThreads) { + if (collapseThreads) { + return entries.groupBy(Entry::getThreadId) + .flatMap(group -> group.reduce((e1, e2) -> + e1.getReceivedAt().isAfter(e2.getReceivedAt()) ? e1 : e2)); + } + return entries; + } + @Override public Mono<Void> delete(MailboxId mailboxId, MessageId messageId) { return Mono.fromRunnable(() -> entries.remove(mailboxId, messageId)); diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java index 8c53060187..dfd48ce7ab 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java @@ -19,6 +19,8 @@ package org.apache.james.jmap.memory.projections; +import static org.assertj.core.api.Assertions.assertThat; + import org.apache.james.jmap.api.projections.EmailQueryView; import org.apache.james.jmap.api.projections.EmailQueryViewContract; import org.apache.james.mailbox.model.MailboxId; @@ -26,9 +28,13 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.TestId; import org.apache.james.mailbox.model.TestMessageId; import org.apache.james.mailbox.model.ThreadId; +import org.apache.james.util.streams.Limit; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; public class MemoryEmailQueryViewTest implements EmailQueryViewContract { + private static final boolean COLLAPSE_THREAD = true; + private MemoryEmailQueryView testee; @BeforeEach @@ -71,5 +77,129 @@ public class MemoryEmailQueryViewTest implements EmailQueryViewContract { return ThreadId.fromBaseMessageId(TestMessageId.of(1)); } + @Test + public void listMailboxContentSortedBySentAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); + + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + public void listMailboxContentSortedBySentAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId4(), messageId3()); + } + + @Test + public void listMailboxContentSinceReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + public void listMailboxContentSinceReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + public void listMailboxContentSinceSentdAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_2, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + public void listMailboxContentSinceSentAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + public void listMailboxContentSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + + assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + public void listMailboxContentSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + public void listMailboxContentSinceSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + public void listMailboxContentSinceSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + public void listMailboxContentBeforeSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId1()); + } + + @Test + public void listMailboxContentBeforeSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); + + assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_5, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId4(), messageId1()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
