chibenwa commented on code in PR #2792:
URL: https://github.com/apache/james-project/pull/2792#discussion_r2577396186
##########
server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/projections/PostgresEmailQueryViewDAO.java:
##########
@@ -25,99 +25,118 @@
import static
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.RECEIVED_AT;
import static
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.SENT_AT;
import static
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.TABLE_NAME;
+import static
org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewDataDefinition.PostgresEmailQueryViewTable.THREAD_ID;
+import java.time.Instant;
+import java.time.OffsetDateTime;
import java.time.ZonedDateTime;
+import java.util.UUID;
+import java.util.function.Function;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils;
+import org.apache.james.jmap.api.projections.EmailQueryViewUtils.EmailEntry;
import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.ThreadId;
import org.apache.james.mailbox.postgres.PostgresMailboxId;
import org.apache.james.mailbox.postgres.PostgresMessageId;
import org.apache.james.util.streams.Limit;
-
-import com.google.common.base.Preconditions;
+import org.jooq.Field;
+import org.jooq.Record;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class PostgresEmailQueryViewDAO {
- private PostgresExecutor postgresExecutor;
+ private final PostgresExecutor postgresExecutor;
@Inject
public
PostgresEmailQueryViewDAO(@Named(PostgresExecutor.BY_PASS_RLS_INJECT)
PostgresExecutor postgresExecutor) {
this.postgresExecutor = postgresExecutor;
}
- public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId
mailboxId, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
-
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .orderBy(SENT_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ public Flux<MessageId> listMailboxContentSortedBySentAt(PostgresMailboxId
mailboxId, Limit limit, boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .orderBy(SENT_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(SENT_AT)));
}
- public Flux<MessageId>
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
-
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .orderBy(RECEIVED_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ public Flux<MessageId>
listMailboxContentSortedByReceivedAt(PostgresMailboxId mailboxId, Limit limit,
boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .orderBy(RECEIVED_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(RECEIVED_AT)));
}
- public Flux<MessageId>
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
-
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
- .orderBy(SENT_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ public Flux<MessageId>
listMailboxContentSinceAfterSortedBySentAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+ .orderBy(SENT_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(SENT_AT)));
}
- public Flux<MessageId>
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
-
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
- .orderBy(RECEIVED_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ public Flux<MessageId>
listMailboxContentSinceAfterSortedByReceivedAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .and(RECEIVED_AT.greaterOrEqual(since.toOffsetDateTime()))
+ .orderBy(RECEIVED_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(RECEIVED_AT)));
}
- public Flux<MessageId>
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
+ public Flux<MessageId>
listMailboxContentBeforeSortedByReceivedAt(PostgresMailboxId mailboxId,
ZonedDateTime since, Limit limit, boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, RECEIVED_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
+ .orderBy(RECEIVED_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(RECEIVED_AT)));
+ }
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .and(RECEIVED_AT.lessOrEqual(since.toOffsetDateTime()))
- .orderBy(RECEIVED_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId
mailboxId, ZonedDateTime since, Limit limit, boolean collapseThreads) {
+ return EmailQueryViewUtils.QueryViewExtender.of(limit, collapseThreads)
+ .resolve(backendFetchLimit ->
postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID, SENT_AT, THREAD_ID)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(mailboxId.asUuid()))
+ .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
+ .orderBy(SENT_AT.desc())
+ .limit(backendFetchLimit.getLimit().get())))
+ .map(asEmailEntry(SENT_AT)));
}
- public Flux<MessageId> listMailboxContentSinceSentAt(PostgresMailboxId
mailboxId, ZonedDateTime since, Limit limit) {
- Preconditions.checkArgument(!limit.isUnlimited(), "Limit should be
defined");
+ private Function<Record, EmailEntry> asEmailEntry(Field<OffsetDateTime>
dateField) {
+ return (Record record) -> {
+ PostgresMessageId messageId =
PostgresMessageId.Factory.of(record.get(MESSAGE_ID));
+ ThreadId threadId = getThreadIdFromRecord(record, messageId);
+ Instant messageDate = record.get(dateField).toInstant();
+ return new EmailEntry(messageId, threadId, messageDate);
+ };
+ }
- return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MESSAGE_ID)
- .from(TABLE_NAME)
- .where(MAILBOX_ID.eq(mailboxId.asUuid()))
- .and(SENT_AT.greaterOrEqual(since.toOffsetDateTime()))
- .orderBy(SENT_AT.desc())
- .limit(limit.getLimit().get())))
- .map(record ->
PostgresMessageId.Factory.of(record.get(MESSAGE_ID)));
+ private ThreadId getThreadIdFromRecord(Record record, MessageId messageId)
{
+ UUID threadIdUUID = record.get(THREAD_ID);
+ if (threadIdUUID == null) {
+ return ThreadId.fromBaseMessageId(messageId);
+ }
+ return
ThreadId.fromBaseMessageId(PostgresMessageId.Factory.of(threadIdUUID));
Review Comment:
Use Optional
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]