This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ea131cbf16f07654c9782e75d645d0541e779925 Author: Rene Cordier <[email protected]> AuthorDate: Wed Sep 3 17:05:24 2025 +0700 JAMES-3340 Cassandra implementation for collapseThreads handling with EmailQueryView --- .../projections/CassandraEmailQueryView.java | 244 ++++++++++++++--- .../projections/CassandraEmailQueryViewTest.java | 12 +- .../projections/PostgresEmailQueryViewTest.java | 12 +- .../memory/projections/MemoryEmailQueryView.java | 29 +- .../api/projections/EmailQueryViewContract.java | 291 +++++++++++++++------ .../projections/MemoryEmailQueryViewTest.java | 135 +--------- 6 files changed, 457 insertions(+), 266 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java index ab838eb895..de4942a37f 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java @@ -36,12 +36,17 @@ import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQu import java.time.Instant; import java.time.ZonedDateTime; import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Function; import jakarta.inject.Inject; -import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.jmap.api.projections.EmailQueryView; +import org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.model.MailboxId; @@ -49,18 +54,62 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.ThreadId; import org.apache.james.util.streams.Limit; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CassandraEmailQueryView implements EmailQueryView { private static final String LIMIT_MARKER = "LIMIT_BIND_MARKER"; + private static final int COLLAPSE_THREADS_LIMIT_MULTIPLIER = 4; + + private class EmailEntry { + private final MessageId messageId; + private final ThreadId threadId; + private final Instant messageDate; + + EmailEntry(MessageId messageId, ThreadId threadId, Instant messageDate) { + this.messageId = messageId; + this.threadId = threadId; + this.messageDate = messageDate; + } + + public MessageId getMessageId() { + return messageId; + } + + public ThreadId getThreadId() { + return threadId; + } + + public Instant getMessageDate() { + return messageDate; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof EmailEntry) { + EmailEntry entry = (EmailEntry) o; + + return Objects.equals(this.messageId, entry.messageId) + && Objects.equals(this.threadId, entry.threadId) + && Objects.equals(this.messageDate, entry.messageDate); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(messageId, threadId, messageDate); + } + } private final CassandraAsyncExecutor executor; private final PreparedStatement listMailboxContentBySentAt; @@ -84,21 +133,21 @@ public class CassandraEmailQueryView implements EmailQueryView { this.executor = new CassandraAsyncExecutor(session); listMailboxContentBySentAt = session.prepare(selectFrom(TABLE_NAME_SENT_AT) - .column(MESSAGE_ID) + .columns(MESSAGE_ID, SENT_AT, THREAD_ID) .whereColumn(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)) .orderBy(SENT_AT, DESC) .limit(bindMarker(LIMIT_MARKER)) .build()); listMailboxContentByReceivedAt = session.prepare(selectFrom(TABLE_NAME_RECEIVED_AT) - .column(MESSAGE_ID) + .columns(MESSAGE_ID, RECEIVED_AT, THREAD_ID) .whereColumn(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)) .orderBy(RECEIVED_AT, DESC) .limit(bindMarker(LIMIT_MARKER)) .build()); listMailboxContentSinceSentAt = session.prepare(selectFrom(TABLE_NAME_SENT_AT) - .column(MESSAGE_ID) + .columns(MESSAGE_ID, SENT_AT, THREAD_ID) .whereColumn(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)) .whereColumn(SENT_AT).isGreaterThanOrEqualTo(bindMarker(SENT_AT)) .orderBy(SENT_AT, DESC) @@ -106,17 +155,19 @@ public class CassandraEmailQueryView implements EmailQueryView { .build()); listMailboxContentSinceReceivedAt = session.prepare(selectFrom(TABLE_NAME_RECEIVED_AT) - .columns(MESSAGE_ID, SENT_AT) + .columns(MESSAGE_ID, SENT_AT, THREAD_ID) .whereColumn(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)) .whereColumn(RECEIVED_AT).isGreaterThanOrEqualTo(bindMarker(RECEIVED_AT)) .orderBy(RECEIVED_AT, DESC) + .limit(bindMarker(LIMIT_MARKER)) .build()); listMailboxContentBeforeReceivedAt = session.prepare(selectFrom(TABLE_NAME_RECEIVED_AT) - .columns(MESSAGE_ID, SENT_AT) + .columns(MESSAGE_ID, SENT_AT, THREAD_ID) .whereColumn(MAILBOX_ID).isEqualTo(bindMarker(MAILBOX_ID)) .whereColumn(RECEIVED_AT).isLessThanOrEqualTo(bindMarker(RECEIVED_AT)) .orderBy(RECEIVED_AT, DESC) + .limit(bindMarker(LIMIT_MARKER)) .build()); insertInLookupTable = session.prepare(insertInto(DATE_LOOKUP_TABLE) @@ -181,22 +232,71 @@ public class CassandraEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - CassandraId cassandraId = (CassandraId) mailboxId; - return executor.executeRows(listMailboxContentBySentAt.bind() - .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) - .setInt(LIMIT_MARKER, limit.getLimit().get())) - .map(row -> CassandraMessageId.Factory.of(row.get(0, TypeCodecs.UUID))); + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + return listMailboxContentSortedBySentAtWithBackendLimit((CassandraId) mailboxId, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSortedBySentAtWithBackendLimit(CassandraId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentBySentAt.bind() + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); + } + + private Flux<MessageId> messagesWithCollapseThreads(Limit limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) { + return baseEntries.collectList() + .flatMapMany(results -> { + List<EmailEntry> distinctByThreadId = distinctByThreadId(results); + boolean hasEnoughResults = distinctByThreadId.size() >= limit.getLimit().get(); + boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); + if (hasEnoughResults || isExhaustive) { + return Flux.fromIterable(distinctByThreadId) + .take(limit.getLimit().get()) + .map(EmailEntry::getMessageId); + } + Limit newBackendFetchLimit = Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + return listMessagesCallbackFunction.apply(newBackendFetchLimit); + }); + } + + private List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) { + ImmutableList.Builder<EmailEntry> list = ImmutableList.builder(); + HashSet<ThreadId> threadIdHashSet = new HashSet<>(); + emailEntries.forEach(emailEntry -> { + if (threadIdHashSet.add(emailEntry.getThreadId())) { + list.add(emailEntry); + } + }); + return list.build(); } @Override public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - CassandraId cassandraId = (CassandraId) mailboxId; - return executor.executeRows(listMailboxContentByReceivedAt.bind() - .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) - .setInt(LIMIT_MARKER, limit.getLimit().get())) - .map(row -> CassandraMessageId.Factory.of(row.get(0, TypeCodecs.UUID))); + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + return listMailboxContentSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentByReceivedAt.bind() + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(RECEIVED_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } @Override @@ -205,17 +305,21 @@ public class CassandraEmailQueryView implements EmailQueryView { CassandraId cassandraId = (CassandraId) mailboxId; - return executor.executeRows(listMailboxContentSinceReceivedAt.bind() + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentSinceReceivedAt.bind() .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) .setInstant(RECEIVED_AT, since.toInstant())) - .map(row -> { - CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID)); - Instant sentAt = row.getInstant(SENT_AT); - - return Pair.of(messageId, sentAt); - }) - .sort(Comparator.<Pair<CassandraMessageId, Instant>, Instant>comparing(Pair::getValue).reversed()) - .map(pair -> (MessageId) pair.getKey()) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return baseEntries.groupBy(EmailEntry::getThreadId) + .flatMap(group -> group.reduce((e1, e2) -> + e1.getMessageDate().isAfter(e2.getMessageDate()) ? e1 : e2)) + .sort(Comparator.comparing(EmailEntry::getMessageDate).reversed()) + .map(EmailEntry::getMessageId) + .take(limit.getLimit().get()); + } + return baseEntries.sort(Comparator.comparing(EmailEntry::getMessageDate).reversed()) + .map(EmailEntry::getMessageId) .take(limit.getLimit().get()); } @@ -223,39 +327,93 @@ public class CassandraEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - CassandraId cassandraId = (CassandraId) mailboxId; + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + return listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); + } - return executor.executeRows(listMailboxContentSinceReceivedAt.bind() - .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) - .setInstant(RECEIVED_AT, since.toInstant())) - .<MessageId>map(row -> CassandraMessageId.Factory.of(row.get(0, TypeCodecs.UUID))) - .take(limit.getLimit().get()); + private Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentSinceReceivedAt.bind() + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + .setInstant(RECEIVED_AT, since.toInstant()) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } @Override public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - CassandraId cassandraId = (CassandraId) mailboxId; + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + return listMailboxContentBeforeSortedByReceivedAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); + } - return executor.executeRows(listMailboxContentBeforeReceivedAt.bind() - .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) - .setInstant(RECEIVED_AT, since.toInstant())) - .<MessageId>map(row -> CassandraMessageId.Factory.of(row.get(0, TypeCodecs.UUID))) - .take(limit.getLimit().get()); + private Flux<MessageId> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentBeforeReceivedAt.bind() + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + .setInstant(RECEIVED_AT, since.toInstant()) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } @Override public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - CassandraId cassandraId = (CassandraId) mailboxId; + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + return listMailboxContentSinceSentAtWithBackendLimit((CassandraId) mailboxId, since, limit, collapseThreads, backendFetchLimit); + } - return executor.executeRows(listMailboxContentSinceSentAt.bind() - .set(MAILBOX_ID, cassandraId.asUuid(), TypeCodecs.UUID) - .setInt(LIMIT_MARKER, limit.getLimit().get()) - .setInstant(SENT_AT, since.toInstant())) - .map(row -> CassandraMessageId.Factory.of(row.get(0, TypeCodecs.UUID))); + private Flux<MessageId> listMailboxContentSinceSentAtWithBackendLimit(CassandraId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = executor.executeRows(listMailboxContentSinceSentAt.bind() + .set(MAILBOX_ID, mailboxId.asUuid(), TypeCodecs.UUID) + .setInstant(SENT_AT, since.toInstant()) + .setInt(LIMIT_MARKER, backendFetchLimit.getLimit().get())) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); + } + + private Limit backendLimitFetch(Limit limit, boolean collapseThreads) { + if (collapseThreads) { + return Limit.limit(limit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + } + return limit; + } + + private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) { + return (Row row) -> { + CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID)); + ThreadId threadId = getThreadIdFromRow(row, messageId); + Instant messageDate = row.getInstant(dateField); + return new EmailEntry(messageId, threadId, messageDate); + }; + } + + private ThreadId getThreadIdFromRow(Row row, MessageId messageId) { + UUID threadIdUUID = row.get(CassandraEmailQueryViewTable.THREAD_ID, TypeCodecs.TIMEUUID); + if (threadIdUUID == null) { + return ThreadId.fromBaseMessageId(messageId); + } + return ThreadId.fromBaseMessageId(CassandraMessageId.Factory.of(threadIdUUID)); } @Override diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryViewTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryViewTest.java index 33f0eabada..7d263fe27a 100644 --- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryViewTest.java +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryViewTest.java @@ -81,7 +81,17 @@ public class CassandraEmailQueryViewTest implements EmailQueryViewContract { } @Override - public ThreadId threadId() { + public ThreadId threadId1() { return THREAD_ID; } + + @Override + public ThreadId threadId2() { + return ThreadId.fromBaseMessageId(MESSAGE_ID_FACTORY.generate()); + } + + @Override + public ThreadId threadId3() { + return ThreadId.fromBaseMessageId(MESSAGE_ID_FACTORY.generate()); + } } diff --git a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewTest.java b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewTest.java index 29c366d2ca..147974e052 100644 --- a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewTest.java +++ b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewTest.java @@ -72,7 +72,17 @@ public class PostgresEmailQueryViewTest implements EmailQueryViewContract { } @Override - public ThreadId threadId() { + public ThreadId threadId1() { return THREAD_ID; } + + @Override + public ThreadId threadId2() { + return ThreadId.fromBaseMessageId(MESSAGE_ID_FACTORY.generate()); + } + + @Override + public ThreadId threadId3() { + return ThreadId.fromBaseMessageId(MESSAGE_ID_FACTORY.generate()); + } } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java index 9097f3c8a2..71d3cb6e1c 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryView.java @@ -21,6 +21,7 @@ package org.apache.james.jmap.memory.projections; import java.time.ZonedDateTime; import java.util.Comparator; +import java.util.function.Function; import jakarta.inject.Inject; @@ -52,7 +53,7 @@ public class MemoryEmailQueryView implements EmailQueryView { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getSentAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -65,7 +66,7 @@ public class MemoryEmailQueryView implements EmailQueryView { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) .filter(e -> e.getSentAt().isAfter(since) || e.getSentAt().isEqual(since)); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getSentAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -78,7 +79,7 @@ public class MemoryEmailQueryView implements EmailQueryView { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getSentAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getSentAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -88,7 +89,7 @@ public class MemoryEmailQueryView implements EmailQueryView { public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getReceivedAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -99,7 +100,7 @@ public class MemoryEmailQueryView implements EmailQueryView { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) .filter(e -> e.getReceivedAt().isAfter(since) || e.getReceivedAt().isEqual(since)); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getReceivedAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); @@ -110,19 +111,21 @@ public class MemoryEmailQueryView implements EmailQueryView { Flux<Entry> baseEntries = Flux.fromIterable(entries.row(mailboxId).values()) .filter(e -> e.getReceivedAt().isBefore(before) || e.getReceivedAt().isEqual(before)); - return maybeCollapseThreads(baseEntries, collapseThreads) + return maybeCollapseThreads(Entry::getReceivedAt, collapseThreads).apply(baseEntries) .sort(Comparator.comparing(Entry::getReceivedAt).reversed()) .map(Entry::getMessageId) .take(limit.getLimit().get()); } - private Flux<Entry> maybeCollapseThreads(Flux<Entry> entries, boolean collapseThreads) { - if (collapseThreads) { - return entries.groupBy(Entry::getThreadId) - .flatMap(group -> group.reduce((e1, e2) -> - e1.getReceivedAt().isAfter(e2.getReceivedAt()) ? e1 : e2)); - } - return entries; + private Function<Flux<Entry>, Flux<Entry>> maybeCollapseThreads(Function<Entry, ZonedDateTime> dateExtractor, boolean collapseThreads) { + return entries -> { + if (collapseThreads) { + return entries.groupBy(Entry::getThreadId) + .flatMap(group -> group.reduce((e1, e2) -> + dateExtractor.apply(e1).isAfter(dateExtractor.apply(e2)) ? e1 : e2)); + } + return entries; + }; } @Override diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/projections/EmailQueryViewContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/projections/EmailQueryViewContract.java index ea2b3e7e88..e24e534d95 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/projections/EmailQueryViewContract.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/projections/EmailQueryViewContract.java @@ -32,7 +32,7 @@ import org.apache.james.util.streams.Limit; import org.junit.jupiter.api.Test; public interface EmailQueryViewContract { - boolean NO_COLLAPSE_THREAD = false; + boolean COLLAPSE_THREAD = true; ZonedDateTime DATE_1 = ZonedDateTime.parse("2010-10-30T15:12:00Z"); ZonedDateTime DATE_2 = ZonedDateTime.parse("2010-10-30T16:12:00Z"); ZonedDateTime DATE_3 = ZonedDateTime.parse("2010-10-30T17:12:00Z"); @@ -53,211 +53,215 @@ public interface EmailQueryViewContract { MessageId messageId4(); - ThreadId threadId(); + ThreadId threadId1(); + + ThreadId threadId2(); + + ThreadId threadId3(); @Test default void listMailboxContentShouldReturnEmptyByDefault() { - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void listMailboxContentShouldBeOrderedBySentAt() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId2(), messageId3(), messageId1()); } @Test default void listMailboxContentShouldApplyLimit() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(2), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(2), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId2(), messageId3()); } @Test default void listMailboxContentSinceReceivedAtShouldExcludeTooOldItems() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId2()); } @Test default void listMailboxContentSinceReceivedAtShouldReturnEmptyWhenNoneMatch() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_7, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_7, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void listMailboxContentSinceReceivedAtAtShouldApplyLimit() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_1, Limit.limit(2), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_1, Limit.limit(2), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId2()); } @Test default void listMailboxContentSinceSentdAtShouldExcludeTooOldItems() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_2, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_2, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId2()); } @Test default void listMailboxContentSinceSentAtAtShouldApplyLimit() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_1, Limit.limit(2), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_1, Limit.limit(2), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId2()); } @Test default void listMailboxContentSinceSentAtShouldReturnEmptyWhenNoneMatch() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_7, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_7, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void listMailboxContentShouldNotReturnClearedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void listMailboxContentShouldNotReturnDeletedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1(), messageId2()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId1()); } @Test default void listMailboxContentSinceReceivedAtShouldNotReturnClearedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_4, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_4, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void listMailboxContentSinceSentAtShouldNotReturnClearedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1()).block(); - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_4, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_4, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .isEmpty(); } @Test default void saveShouldBeIdempotent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId1()); } @Test default void datesCanBeDuplicated() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_1, DATE_2, messageId2(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId2(), threadId1()).block(); - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactlyInAnyOrder(messageId1(), messageId2()); } @Test default void listMailboxContentSinceReceivedAtShouldNotReturnDeletedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1(), messageId2()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3()); } @Test default void listMailboxContentSinceSentAtShouldNotReturnDeletedContent() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); testee().delete(mailboxId1(), messageId2()).block(); - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_3, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_3, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3()); } @Test default void listMailboxContentSortedByReceivedAtShouldBeSortedByReceivedAt() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId1(), messageId2()); } @Test default void listMailboxContentSinceSortedByReceivedAtShouldBeSortedByReceivedAt() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId3(), messageId1()); } @Test default void listMailboxContentBeforeSortedByReceivedAtShouldBeSortedByReceivedAt() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); - assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), NO_COLLAPSE_THREAD).collectList().block()) + assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), !COLLAPSE_THREAD).collectList().block()) .containsExactly(messageId1(), messageId2()); } @@ -273,19 +277,146 @@ public interface EmailQueryViewContract { @Test default void listMailboxContentShouldThrowOnUndefinedLimit() { - assertThatThrownBy(() -> testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.unlimited(), NO_COLLAPSE_THREAD).blockLast()) + assertThatThrownBy(() -> testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.unlimited(), !COLLAPSE_THREAD).blockLast()) .isInstanceOf(IllegalArgumentException.class); } @Test default void listMailboxContentSinceSentAtShouldThrowOnUndefinedLimit() { - assertThatThrownBy(() -> testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_3, Limit.unlimited(), NO_COLLAPSE_THREAD).blockLast()) + assertThatThrownBy(() -> testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_3, Limit.unlimited(), !COLLAPSE_THREAD).blockLast()) .isInstanceOf(IllegalArgumentException.class); } @Test default void listMailboxContentSinceReceivedAtShouldThrowOnUndefinedLimit() { - assertThatThrownBy(() -> testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.unlimited(), NO_COLLAPSE_THREAD).blockLast()) + assertThatThrownBy(() -> testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.unlimited(), !COLLAPSE_THREAD).blockLast()) .isInstanceOf(IllegalArgumentException.class); } + + + @Test + default void listMailboxContentSortedBySentAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId2()); + } + + @Test + default void listMailboxContentSortedBySentAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId3()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId4(), messageId3()); + } + + @Test + default void listMailboxContentSinceReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + default void listMailboxContentSinceReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId3()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + default void listMailboxContentSinceSentdAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_2, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + default void listMailboxContentSinceSentAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId3()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + default void listMailboxContentSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + default void listMailboxContentSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId3()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + default void listMailboxContentSinceSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3()); + } + + @Test + default void listMailboxContentSinceSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId3()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId3(), messageId2()); + } + + @Test + default void listMailboxContentBeforeSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId1()).block(); + testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId1()).block(); + + assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId1()); + } + + @Test + default void listMailboxContentBeforeSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { + testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId2()).block(); + testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId1()).block(); + testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId3()).block(); + testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId1()).block(); + + assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_5, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) + .containsExactly(messageId4(), messageId1()); + } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java index dfd48ce7ab..cbbdbaa453 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/projections/MemoryEmailQueryViewTest.java @@ -19,8 +19,6 @@ package org.apache.james.jmap.memory.projections; -import static org.assertj.core.api.Assertions.assertThat; - import org.apache.james.jmap.api.projections.EmailQueryView; import org.apache.james.jmap.api.projections.EmailQueryViewContract; import org.apache.james.mailbox.model.MailboxId; @@ -28,12 +26,9 @@ import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.TestId; import org.apache.james.mailbox.model.TestMessageId; import org.apache.james.mailbox.model.ThreadId; -import org.apache.james.util.streams.Limit; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; public class MemoryEmailQueryViewTest implements EmailQueryViewContract { - private static final boolean COLLAPSE_THREAD = true; private MemoryEmailQueryView testee; @@ -73,133 +68,17 @@ public class MemoryEmailQueryViewTest implements EmailQueryViewContract { } @Override - public ThreadId threadId() { + public ThreadId threadId1() { return ThreadId.fromBaseMessageId(TestMessageId.of(1)); } - @Test - public void listMailboxContentSortedBySentAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3()); - } - - @Test - public void listMailboxContentSortedBySentAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_6, messageId3(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentSortedBySentAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId4(), messageId3()); - } - - @Test - public void listMailboxContentSinceReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_3, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3()); - } - - @Test - public void listMailboxContentSinceReceivedAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceAfterSortedBySentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3(), messageId2()); - } - - @Test - public void listMailboxContentSinceSentdAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_2, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3()); - } - - @Test - public void listMailboxContentSinceSentAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceSentAt(mailboxId1(), DATE_1, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3(), messageId2()); - } - - @Test - public void listMailboxContentSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3()); - } - - @Test - public void listMailboxContentSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentSortedByReceivedAt(mailboxId1(), Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3(), messageId2()); - } - - @Test - public void listMailboxContentSinceSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3()); - } - - @Test - public void listMailboxContentSinceSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentSinceAfterSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId3(), messageId2()); - } - - @Test - public void listMailboxContentBeforeSortedByReceivedAtShouldReturnLatestMessageOfThreadWhenCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_4, messageId1(), threadId()).block(); - testee().save(mailboxId1(), DATE_2, DATE_3, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), threadId()).block(); - - assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_4, Limit.limit(12), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId1()); + @Override + public ThreadId threadId2() { + return ThreadId.fromBaseMessageId(TestMessageId.of(2)); } - @Test - public void listMailboxContentBeforeSortedByReceivedAtShouldApplyLimitWithCollapseThreads() { - testee().save(mailboxId1(), DATE_1, DATE_2, messageId1(), ThreadId.fromBaseMessageId(TestMessageId.of(2))).block(); - testee().save(mailboxId1(), DATE_3, DATE_4, messageId2(), threadId()).block(); - testee().save(mailboxId1(), DATE_5, DATE_6, messageId3(), ThreadId.fromBaseMessageId(TestMessageId.of(3))).block(); - testee().save(mailboxId1(), DATE_4, DATE_5, messageId4(), threadId()).block(); - - assertThat(testee().listMailboxContentBeforeSortedByReceivedAt(mailboxId1(), DATE_5, Limit.limit(2), COLLAPSE_THREAD).collectList().block()) - .containsExactly(messageId4(), messageId1()); + @Override + public ThreadId threadId3() { + return ThreadId.fromBaseMessageId(TestMessageId.of(3)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
