This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 941091f9b143b407445abfa676f0e54cf51e3970 Author: Rene Cordier <[email protected]> AuthorDate: Tue Nov 25 11:20:16 2025 +0700 JAMES-3340 Postgres implementation for collapseThreads handling with EmailQueryView --- .../projections/CassandraEmailQueryView.java | 83 +---------- .../projections/PostgresEmailQueryView.java | 14 +- .../projections/PostgresEmailQueryViewDAO.java | 152 +++++++++++++++++---- .../jmap/api/projections/EmailQueryViewUtils.java | 113 +++++++++++++++ 4 files changed, 251 insertions(+), 111 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java index de4942a37f..4ddb6f69ab 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/projections/CassandraEmailQueryView.java @@ -24,6 +24,8 @@ import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; +import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch; +import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithCollapseThreads; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.DATE_LOOKUP_TABLE; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MAILBOX_ID; import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable.MESSAGE_ID; @@ -36,9 +38,6 @@ import static org.apache.james.jmap.cassandra.projections.table.CassandraEmailQu import java.time.Instant; import java.time.ZonedDateTime; import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Objects; import java.util.UUID; import java.util.function.Function; @@ -46,6 +45,7 @@ import jakarta.inject.Inject; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.jmap.api.projections.EmailQueryView; +import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry; import org.apache.james.jmap.cassandra.projections.table.CassandraEmailQueryViewTable; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; @@ -61,55 +61,12 @@ import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.type.codec.TypeCodecs; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class CassandraEmailQueryView implements EmailQueryView { private static final String LIMIT_MARKER = "LIMIT_BIND_MARKER"; - private static final int COLLAPSE_THREADS_LIMIT_MULTIPLIER = 4; - - private class EmailEntry { - private final MessageId messageId; - private final ThreadId threadId; - private final Instant messageDate; - - EmailEntry(MessageId messageId, ThreadId threadId, Instant messageDate) { - this.messageId = messageId; - this.threadId = threadId; - this.messageDate = messageDate; - } - - public MessageId getMessageId() { - return messageId; - } - - public ThreadId getThreadId() { - return threadId; - } - - public Instant getMessageDate() { - return messageDate; - } - - @Override - public final boolean equals(Object o) { - if (o instanceof EmailEntry) { - EmailEntry entry = (EmailEntry) o; - - return Objects.equals(this.messageId, entry.messageId) - && Objects.equals(this.threadId, entry.threadId) - && Objects.equals(this.messageDate, entry.messageDate); - } - return false; - } - - @Override - public final int hashCode() { - return Objects.hash(messageId, threadId, messageDate); - } - } private final CassandraAsyncExecutor executor; private final PreparedStatement listMailboxContentBySentAt; @@ -250,33 +207,6 @@ public class CassandraEmailQueryView implements EmailQueryView { return baseEntries.map(EmailEntry::getMessageId); } - private Flux<MessageId> messagesWithCollapseThreads(Limit limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) { - return baseEntries.collectList() - .flatMapMany(results -> { - List<EmailEntry> distinctByThreadId = distinctByThreadId(results); - boolean hasEnoughResults = distinctByThreadId.size() >= limit.getLimit().get(); - boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); - if (hasEnoughResults || isExhaustive) { - return Flux.fromIterable(distinctByThreadId) - .take(limit.getLimit().get()) - .map(EmailEntry::getMessageId); - } - Limit newBackendFetchLimit = Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); - return listMessagesCallbackFunction.apply(newBackendFetchLimit); - }); - } - - private List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) { - ImmutableList.Builder<EmailEntry> list = ImmutableList.builder(); - HashSet<ThreadId> threadIdHashSet = new HashSet<>(); - emailEntries.forEach(emailEntry -> { - if (threadIdHashSet.add(emailEntry.getThreadId())) { - list.add(emailEntry); - } - }); - return list.build(); - } - @Override public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); @@ -392,13 +322,6 @@ public class CassandraEmailQueryView implements EmailQueryView { return baseEntries.map(EmailEntry::getMessageId); } - private Limit backendLimitFetch(Limit limit, boolean collapseThreads) { - if (collapseThreads) { - return Limit.limit(limit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); - } - return limit; - } - private Function<Row, EmailEntry> asEmailEntry(CqlIdentifier dateField) { return (Row row) -> { CassandraMessageId messageId = CassandraMessageId.Factory.of(row.getUuid(MESSAGE_ID)); diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java index 4831d046c8..5398ef41c5 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryView.java @@ -35,7 +35,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class PostgresEmailQueryView implements EmailQueryView { - private PostgresEmailQueryViewDAO emailQueryViewDAO; + private final PostgresEmailQueryViewDAO emailQueryViewDAO; @Inject public PostgresEmailQueryView(PostgresEmailQueryViewDAO emailQueryViewDAO) { @@ -44,32 +44,32 @@ public class PostgresEmailQueryView implements EmailQueryView { @Override public Flux<MessageId> listMailboxContentSortedBySentAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentSortedBySentAt(PostgresMailboxId.class.cast(mailboxId), limit); + return emailQueryViewDAO.listMailboxContentSortedBySentAt(PostgresMailboxId.class.cast(mailboxId), limit, collapseThreads); } @Override public Flux<MessageId> listMailboxContentSortedByReceivedAt(MailboxId mailboxId, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), limit); + return emailQueryViewDAO.listMailboxContentSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), limit, collapseThreads); } @Override public Flux<MessageId> listMailboxContentSinceAfterSortedBySentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId.class.cast(mailboxId), since, limit); + return emailQueryViewDAO.listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId.class.cast(mailboxId), since, limit, collapseThreads); } @Override public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), since, limit); + return emailQueryViewDAO.listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), since, limit, collapseThreads); } @Override public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), since, limit); + return emailQueryViewDAO.listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId.class.cast(mailboxId), since, limit, collapseThreads); } @Override public Flux<MessageId> listMailboxContentSinceSentAt(MailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { - return emailQueryViewDAO.listMailboxContentSinceSentAt(PostgresMailboxId.class.cast(mailboxId), since, limit); + return emailQueryViewDAO.listMailboxContentSinceSentAt(PostgresMailboxId.class.cast(mailboxId), since, limit, collapseThreads); } @Override diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java index 609662856c..a8c0296c12 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java @@ -19,6 +19,8 @@ package org.apache.james.jmap.postgres.projections; +import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.backendLimitFetch; +import static org.apache.james.jmap.api.projections.EmailQueryViewUtils.messagesWithCollapseThreads; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MAILBOX_ID; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.MESSAGE_ID; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.PK_CONSTRAINT_NAME; @@ -27,17 +29,24 @@ import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewD import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.TABLE_NAME; import static org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.THREAD_ID; +import java.time.Instant; +import java.time.OffsetDateTime; import java.time.ZonedDateTime; +import java.util.UUID; +import java.util.function.Function; import jakarta.inject.Inject; import jakarta.inject.Named; import org.apache.james.backends.postgres.utils.PostgresExecutor; +import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry; import org.apache.james.mailbox.model.MessageId; import org.apache.james.mailbox.model.ThreadId; import org.apache.james.mailbox.postgres.PostgresMailboxId; import org.apache.james.mailbox.postgres.PostgresMessageId; import org.apache.james.util.streams.Limit; +import org.jooq.Field; +import org.jooq.Record; import com.google.common.base.Preconditions; @@ -52,74 +61,169 @@ public class PostgresEmailQueryViewDAO { this.postgresExecutor = postgresExecutor; } - public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId mailboxId, Limit limit) { + public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .orderBy(SENT_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSortedBySentAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } - public Flux<MessageId> listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit) { + public Flux<MessageId> listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .orderBy(RECEIVED_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSortedByReceivedAtWithBackendLimit(mailboxId, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } - public Flux<MessageId> listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit) { + public Flux<MessageId> listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) .orderBy(SENT_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSinceAfterSortedBySentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } - public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit) { + public Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime())) .orderBy(RECEIVED_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSinceAfterSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } - public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit) { + public Flux<MessageId> listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime())) .orderBy(RECEIVED_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(RECEIVED_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentBeforeSortedByReceivedAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); } - public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit) { + public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) { Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be defined"); - return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID) + Limit backendFetchLimit = backendLimitFetch(limit, collapseThreads); + + return listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, backendFetchLimit); + } + + private Flux<MessageId> listMailboxContentSinceSentAtWithBackendLimit(PostgresMailboxId mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads, Limit backendFetchLimit) { + Flux<EmailEntry> baseEntries = postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID) .from(TABLE_NAME) .where(MAILBOX_ID.eq(mailboxId.asUuid())) .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime())) .orderBy(SENT_AT.desc()) - .limit(limit.getLimit().get()))) - .map(record -> PostgresMessageId.Factory.of(record.get(MESSAGE_ID))); + .limit(backendFetchLimit.getLimit().get()))) + .map(asEmailEntry(SENT_AT)); + + if (collapseThreads) { + return messagesWithCollapseThreads(limit, backendFetchLimit, baseEntries, + newLimit -> listMailboxContentSinceSentAtWithBackendLimit(mailboxId, since, limit, collapseThreads, newLimit)); + } + + return baseEntries.map(EmailEntry::getMessageId); + } + + private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime> dateField) { + return (Record record) -> { + PostgresMessageId messageId = PostgresMessageId.Factory.of(record.get(MESSAGE_ID)); + ThreadId threadId = getThreadIdFromRecord(record, messageId); + Instant messageDate = record.get(dateField).toInstant(); + return new EmailEntry(messageId, threadId, messageDate); + }; + } + + private ThreadId getThreadIdFromRecord(Record record, MessageId messageId) { + UUID threadIdUUID = record.get(THREAD_ID); + if (threadIdUUID == null) { + return ThreadId.fromBaseMessageId(messageId); + } + return ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdUUID)); } public Mono<Void> delete(PostgresMailboxId mailboxId, PostgresMessageId messageId) { diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java new file mode 100644 index 0000000000..42f8ee454f --- /dev/null +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/projections/EmailQueryViewUtils.java @@ -0,0 +1,113 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.api.projections; + +import java.time.Instant; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.mailbox.model.ThreadId; +import org.apache.james.util.streams.Limit; + +import com.google.common.collect.ImmutableList; + +import reactor.core.publisher.Flux; + +public class EmailQueryViewUtils { + private static final int COLLAPSE_THREADS_LIMIT_MULTIPLIER = 4; + + public static class EmailEntry { + private final MessageId messageId; + private final ThreadId threadId; + private final Instant messageDate; + + public EmailEntry(MessageId messageId, ThreadId threadId, Instant messageDate) { + this.messageId = messageId; + this.threadId = threadId; + this.messageDate = messageDate; + } + + public MessageId getMessageId() { + return messageId; + } + + public ThreadId getThreadId() { + return threadId; + } + + public Instant getMessageDate() { + return messageDate; + } + + @Override + public final boolean equals(Object o) { + if (o instanceof EmailEntry) { + EmailEntry entry = (EmailEntry) o; + + return Objects.equals(this.messageId, entry.messageId) + && Objects.equals(this.threadId, entry.threadId) + && Objects.equals(this.messageDate, entry.messageDate); + } + return false; + } + + @Override + public final int hashCode() { + return Objects.hash(messageId, threadId, messageDate); + } + } + + public static Limit backendLimitFetch(Limit limit, boolean collapseThreads) { + if (collapseThreads) { + return Limit.limit(limit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + } + return limit; + } + + public static Flux<MessageId> messagesWithCollapseThreads(Limit limit, Limit backendFetchLimit, Flux<EmailEntry> baseEntries, Function<Limit, Flux<MessageId>> listMessagesCallbackFunction) { + return baseEntries.collectList() + .flatMapMany(results -> { + List<EmailEntry> distinctByThreadId = distinctByThreadId(results); + boolean hasEnoughResults = distinctByThreadId.size() >= limit.getLimit().get(); + boolean isExhaustive = results.size() < backendFetchLimit.getLimit().get(); + if (hasEnoughResults || isExhaustive) { + return Flux.fromIterable(distinctByThreadId) + .take(limit.getLimit().get()) + .map(EmailEntry::getMessageId); + } + Limit newBackendFetchLimit = Limit.from(backendFetchLimit.getLimit().get() * COLLAPSE_THREADS_LIMIT_MULTIPLIER); + return listMessagesCallbackFunction.apply(newBackendFetchLimit); + }); + } + + private static List<EmailEntry> distinctByThreadId(List<EmailEntry> emailEntries) { + ImmutableList.Builder<EmailEntry> list = ImmutableList.builder(); + HashSet<ThreadId> threadIdHashSet = new HashSet<>(); + emailEntries.forEach(emailEntry -> { + if (threadIdHashSet.add(emailEntry.getThreadId())) { + list.add(emailEntry); + } + }); + return list.build(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
