This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a452c744e71b9e8590a93c839aeee56f9381369b Author: Quan Tran <[email protected]> AuthorDate: Mon Mar 9 11:37:23 2026 +0700 JAMES-4187 Add internalDate to FlagsUpdated event And the caller(s) should build FlagsUpdated event with internalDate. --- .../apache/james/mailbox/model/UpdatedFlags.java | 33 ++++++++++-- .../cassandra/mail/CassandraMessageIdMapper.java | 31 +++++++---- .../cassandra/mail/CassandraMessageMapper.java | 51 +++++++++++------- .../task/SolveMessageInconsistenciesService.java | 1 + .../scala/org/apache/james/event/json/DTOs.scala | 6 ++- .../event/json/FlagsUpdatedSerializationTest.java | 63 ++++++++++++++++++++++ .../james/mailbox/jpa/mail/MessageUtils.java | 1 + .../inmemory/mail/InMemoryMessageIdMapper.java | 1 + .../postgres/mail/PostgresMessageIdMapper.java | 24 +++++---- .../postgres/mail/PostgresMessageMapper.java | 21 +++++--- .../mail/dao/PostgresMailboxMessageDAO.java | 12 +++++ .../mailbox/store/mail/AbstractMessageMapper.java | 15 +++--- .../store/mail/model/MessageIdMapperTest.java | 7 +++ .../store/mail/model/MessageMapperTest.java | 5 ++ 14 files changed, 211 insertions(+), 60 deletions(-) diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/UpdatedFlags.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/UpdatedFlags.java index 39c0b67248..d46359d3b2 100644 --- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/UpdatedFlags.java +++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/UpdatedFlags.java @@ -20,6 +20,7 @@ package org.apache.james.mailbox.model; import java.util.Arrays; +import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.Objects; @@ -50,6 +51,7 @@ public class UpdatedFlags { private Flags oldFlags; private Flags newFlags; private Optional<ModSeq> modSeq = Optional.empty(); + private Optional<Date> internalDate = Optional.empty(); private Builder() { } @@ -84,12 +86,22 @@ public class UpdatedFlags { return this; } + public Builder internalDate(Date internalDate) { + this.internalDate = Optional.of(internalDate); + return this; + } + + public Builder internalDate(Optional<Date> internalDate) { + this.internalDate = internalDate; + return this; + } + public UpdatedFlags build() { Preconditions.checkNotNull(uid); Preconditions.checkNotNull(newFlags); Preconditions.checkNotNull(oldFlags); Preconditions.checkState(modSeq.isPresent()); - return new UpdatedFlags(uid, messageId, modSeq.get(), oldFlags, newFlags); + return new UpdatedFlags(uid, messageId, modSeq.get(), oldFlags, newFlags, internalDate); } } @@ -146,13 +158,18 @@ public class UpdatedFlags { private final Flags newFlags; private final Flags modifiedFlags; private final ModSeq modSeq; + /** + * The usage of Optional here is for backward compatibility (to be able to still dequeue older events) + */ + private final Optional<Date> internalDate; - private UpdatedFlags(MessageUid uid, Optional<MessageId> messageId, ModSeq modSeq, Flags oldFlags, Flags newFlags) { + private UpdatedFlags(MessageUid uid, Optional<MessageId> messageId, ModSeq modSeq, Flags oldFlags, Flags newFlags, Optional<Date> internalDate) { this.uid = uid; - this.messageId = messageId; + this.messageId = Optional.ofNullable(messageId).orElse(Optional.empty()); this.modSeq = modSeq; this.oldFlags = oldFlags; this.newFlags = newFlags; + this.internalDate = Optional.ofNullable(internalDate).orElse(Optional.empty()); this.modifiedFlags = new Flags(); addModifiedSystemFlags(oldFlags, newFlags, modifiedFlags); addModifiedUserFlags(oldFlags, newFlags, modifiedFlags); @@ -203,6 +220,10 @@ public class UpdatedFlags { return messageId; } + public Optional<Date> getInternalDate() { + return internalDate; + } + /** * Gets an iterator for the system flags changed. * @@ -272,12 +293,13 @@ public class UpdatedFlags { Objects.equals(messageId, that.messageId) && Objects.equals(oldFlags, that.oldFlags) && Objects.equals(newFlags, that.newFlags) && - Objects.equals(modSeq, that.modSeq); + Objects.equals(modSeq, that.modSeq) && + Objects.equals(internalDate, that.internalDate); } @Override public final int hashCode() { - return Objects.hash(uid, messageId, oldFlags, newFlags, modSeq); + return Objects.hash(uid, messageId, oldFlags, newFlags, modSeq, internalDate); } @Override @@ -288,6 +310,7 @@ public class UpdatedFlags { .add("oldFlags", oldFlags) .add("newFlags", newFlags) .add("modSeq", modSeq) + .add("internalDate", internalDate) .toString(); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index efd8b2da85..7cd8064c5e 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -299,11 +299,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); } - private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { + private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(CassandraMessageMetadata metadata, Flags oldFlags) { + ComposedMessageIdWithMetaData composedMessageIdWithMetaData = metadata.getComposedMessageId(); return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), UpdatedFlags.builder() .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) .messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()) + .internalDate(metadata.getInternalDate()) .modSeq(composedMessageIdWithMetaData.getModSeq()) .oldFlags(oldFlags) .newFlags(composedMessageIdWithMetaData.getFlags()) @@ -316,19 +318,19 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .thenReturn(pair); } - private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) { + private Mono<List<Pair<Flags, CassandraMessageMetadata>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) { CassandraId cassandraId = (CassandraId) mailboxId; return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites()) - .map(CassandraMessageMetadata::getComposedMessageId) - .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY) + .flatMap(oldMetadata -> updateFlags(newState, updateMode, cassandraId, oldMetadata), ReactorUtils.DEFAULT_CONCURRENCY) .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new)) .collectList(); } - private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) { + private Mono<Pair<Flags, CassandraMessageMetadata>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, CassandraMessageMetadata oldMetadata) { + ComposedMessageIdWithMetaData oldComposedId = oldMetadata.getComposedMessageId(); Flags newFlags = new FlagsUpdateCalculator(newState, updateMode).buildNewFlags(oldComposedId.getFlags()); if (identicalFlags(oldComposedId, newFlags)) { - return Mono.just(Pair.of(oldComposedId.getFlags(), oldComposedId)); + return Mono.just(Pair.of(oldComposedId.getFlags(), oldMetadata)); } else { return modSeqProvider.nextModSeqReactive(cassandraId) .map(modSeq -> new ComposedMessageIdWithMetaData( @@ -336,7 +338,15 @@ public class CassandraMessageIdMapper implements MessageIdMapper { newFlags, modSeq, oldComposedId.getThreadId())) - .flatMap(newComposedId -> updateFlags(oldComposedId, newComposedId)); + .map(newComposedId -> CassandraMessageMetadata.builder() + .ids(newComposedId) + .internalDate(oldMetadata.getInternalDate()) + .saveDate(oldMetadata.getSaveDate()) + .bodyStartOctet(oldMetadata.getBodyStartOctet()) + .size(oldMetadata.getSize()) + .headerContent(oldMetadata.getHeaderContent()) + .build()) + .flatMap(newMetadata -> updateFlags(oldMetadata, newMetadata)); } } @@ -344,7 +354,9 @@ public class CassandraMessageIdMapper implements MessageIdMapper { return oldComposedId.getFlags().equals(newFlags); } - private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(ComposedMessageIdWithMetaData oldComposedId, ComposedMessageIdWithMetaData newComposedId) { + private Mono<Pair<Flags, CassandraMessageMetadata>> updateFlags(CassandraMessageMetadata oldMetadata, CassandraMessageMetadata newMetadata) { + ComposedMessageIdWithMetaData oldComposedId = oldMetadata.getComposedMessageId(); + ComposedMessageIdWithMetaData newComposedId = newMetadata.getComposedMessageId(); ComposedMessageId composedMessageId = newComposedId.getComposedMessageId(); ModSeq previousModseq = oldComposedId.getModSeq(); UpdatedFlags updatedFlags = UpdatedFlags.builder() @@ -353,12 +365,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper { .oldFlags(oldComposedId.getFlags()) .newFlags(newComposedId.getFlags()) .uid(composedMessageId.getUid()) + .internalDate(newMetadata.getInternalDate()) .build(); return imapUidDAO.updateMetadata(composedMessageId, updatedFlags, previousModseq) .filter(FunctionalUtils.identityPredicate()) .flatMap(any -> messageIdDAO.updateMetadata(composedMessageId, updatedFlags) - .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId))) + .thenReturn(Pair.of(oldComposedId.getFlags(), newMetadata))) .single(); } } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 2ded9ae3ab..c8fc6ee5ef 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -652,29 +652,34 @@ public class CassandraMessageMapper implements MessageMapper { Flags oldFlags = oldMetaData.getFlags(); Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags); - if (identicalFlags(oldFlags, newFlags)) { - return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder() - .uid(oldMetaData.getComposedMessageId().getUid()) - .messageId(oldMetaData.getComposedMessageId().getMessageId()) - .modSeq(oldMetaData.getModSeq()) - .oldFlags(oldFlags) - .newFlags(newFlags) - .build())); - } - - return updateFlags(oldMetaData, newFlags, newModSeq) - .map(success -> { - if (success) { - return FlagsUpdateStageResult.success(UpdatedFlags.builder() + return retrieveInternalDate(oldMetaData) + .flatMap(internalDate -> { + if (identicalFlags(oldFlags, newFlags)) { + return Mono.just(FlagsUpdateStageResult.success(UpdatedFlags.builder() .uid(oldMetaData.getComposedMessageId().getUid()) .messageId(oldMetaData.getComposedMessageId().getMessageId()) - .modSeq(newModSeq) + .internalDate(internalDate) + .modSeq(oldMetaData.getModSeq()) .oldFlags(oldFlags) .newFlags(newFlags) - .build()); - } else { - return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId()); + .build())); } + + return updateFlags(oldMetaData, newFlags, newModSeq, internalDate) + .map(success -> { + if (success) { + return FlagsUpdateStageResult.success(UpdatedFlags.builder() + .uid(oldMetaData.getComposedMessageId().getUid()) + .messageId(oldMetaData.getComposedMessageId().getMessageId()) + .internalDate(internalDate) + .modSeq(newModSeq) + .oldFlags(oldFlags) + .newFlags(newFlags) + .build()); + } else { + return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId()); + } + }); }); } @@ -682,7 +687,14 @@ public class CassandraMessageMapper implements MessageMapper { return oldFlags.equals(newFlags); } - private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, ModSeq newModSeq) { + private Mono<Optional<Date>> retrieveInternalDate(ComposedMessageIdWithMetaData oldMetaData) { + CassandraId mailboxId = (CassandraId) oldMetaData.getComposedMessageId().getMailboxId(); + MessageUid uid = oldMetaData.getComposedMessageId().getUid(); + return messageIdDAO.retrieve(mailboxId, uid) + .map(cassandraMessageMetadata -> cassandraMessageMetadata.flatMap(CassandraMessageMetadata::getInternalDate)); + } + + private Mono<Boolean> updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, ModSeq newModSeq, Optional<Date> internalDate) { ComposedMessageIdWithMetaData newMetadata = ComposedMessageIdWithMetaData.builder() .composedMessageId(oldMetadata.getComposedMessageId()) .modSeq(newModSeq) @@ -694,6 +706,7 @@ public class CassandraMessageMapper implements MessageMapper { ModSeq previousModseq = oldMetadata.getModSeq(); UpdatedFlags updatedFlags = UpdatedFlags.builder() .messageId(composedMessageId.getMessageId()) + .internalDate(internalDate) .modSeq(newMetadata.getModSeq()) .oldFlags(oldMetadata.getFlags()) .newFlags(newMetadata.getFlags()) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java index 7f1a55d44d..0aee0509a9 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java @@ -127,6 +127,7 @@ public class SolveMessageInconsistenciesService { .modSeq(id.getModSeq()) .messageId(id.getComposedMessageId().getMessageId()) .uid(id.getComposedMessageId().getUid()) + .internalDate(messageFromImapUid.getInternalDate()) .build()) .doOnSuccess(any -> notifySuccess(context)) .thenReturn(Task.Result.COMPLETED) diff --git a/mailbox/event/json/src/main/scala/org/apache/james/event/json/DTOs.scala b/mailbox/event/json/src/main/scala/org/apache/james/event/json/DTOs.scala index 33a935e7db..311ddafc1e 100644 --- a/mailbox/event/json/src/main/scala/org/apache/james/event/json/DTOs.scala +++ b/mailbox/event/json/src/main/scala/org/apache/james/event/json/DTOs.scala @@ -153,16 +153,18 @@ object DTOs { javaUpdatedFlags.getMessageId.toScala, javaUpdatedFlags.getModSeq, Flags.fromJavaFlags(javaUpdatedFlags.getOldFlags), - Flags.fromJavaFlags(javaUpdatedFlags.getNewFlags)) + Flags.fromJavaFlags(javaUpdatedFlags.getNewFlags), + javaUpdatedFlags.getInternalDate.map(_.toInstant).toScala) } - case class UpdatedFlags(uid: MessageUid, messageId: Option[MessageId], modSeq: ModSeq, oldFlags: Flags, newFlags: Flags) { + case class UpdatedFlags(uid: MessageUid, messageId: Option[MessageId], modSeq: ModSeq, oldFlags: Flags, newFlags: Flags, internalDate: Option[Instant]) { def toJava: JavaUpdatedFlags = JavaUpdatedFlags.builder() .uid(uid) .messageId(messageId.toJava) .modSeq(modSeq) .oldFlags(Flags.toJavaFlags(oldFlags)) .newFlags(Flags.toJavaFlags(newFlags)) + .internalDate(internalDate.map(Date.from).toJava) .build() } } diff --git a/mailbox/event/json/src/test/java/org/apache/james/event/json/FlagsUpdatedSerializationTest.java b/mailbox/event/json/src/test/java/org/apache/james/event/json/FlagsUpdatedSerializationTest.java index 058e4af195..11fc5c184f 100644 --- a/mailbox/event/json/src/test/java/org/apache/james/event/json/FlagsUpdatedSerializationTest.java +++ b/mailbox/event/json/src/test/java/org/apache/james/event/json/FlagsUpdatedSerializationTest.java @@ -25,6 +25,8 @@ import static org.apache.james.event.json.SerializerFixture.EVENT_SERIALIZER; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.time.Instant; +import java.util.Date; import java.util.List; import java.util.NoSuchElementException; @@ -176,6 +178,9 @@ class FlagsUpdatedSerializationTest { @Nested class WithMessageId { + private static final Date INTERNAL_DATE_1 = Date.from(Instant.parse("2024-01-01T01:02:03.000Z")); + private static final Date INTERNAL_DATE_2 = Date.from(Instant.parse("2024-01-02T01:02:03.000Z")); + private final MessageId messageId1 = TestMessageId.of(23456); private final MessageId messageId2 = TestMessageId.of(78901); @@ -185,6 +190,7 @@ class FlagsUpdatedSerializationTest { .modSeq(MOD_SEQ_1) .oldFlags(OLD_FLAGS_1) .newFlags(NEW_FLAGS_1) + .internalDate(INTERNAL_DATE_1) .build(); private final UpdatedFlags updatedFlagsWithMessageId2 = UpdatedFlags.builder() .uid(MESSAGE_UID_2) @@ -192,6 +198,7 @@ class FlagsUpdatedSerializationTest { .modSeq(MOD_SEQ_2) .oldFlags(OLD_FLAGS_2) .newFlags(NEW_FLAGS_2) + .internalDate(INTERNAL_DATE_2) .build(); private final List<UpdatedFlags> updatedFlagsListWithMessageIds = ImmutableList.of(updatedFlagsWithMessageId1, updatedFlagsWithMessageId2); @@ -200,6 +207,39 @@ class FlagsUpdatedSerializationTest { MAILBOX_PATH, MAILBOX_ID, updatedFlagsListWithMessageIds, EVENT_ID); private static final String EVENT_WITH_MESSAGE_IDS_JSON = + "{" + + " \"FlagsUpdated\": {" + + " \"eventId\":\"6e0dd59d-660e-4d9b-b22f-0354479f47b4\"," + + " \"path\": {" + + " \"namespace\": \"#private\"," + + " \"user\": \"user\"," + + " \"name\": \"mailboxName\"" + + " }," + + " \"mailboxId\": \"18\"," + + " \"sessionId\": 42," + + " \"updatedFlags\": [" + + " {" + + " \"uid\": 123456," + + " \"messageId\": \"23456\"," + + " \"modSeq\": 35," + + " \"internalDate\": \"2024-01-01T01:02:03Z\"," + + " \"oldFlags\": {\"systemFlags\":[\"Deleted\",\"Seen\"],\"userFlags\":[\"Old Flag 1\"]}," + + " \"newFlags\": {\"systemFlags\":[\"Answered\",\"Draft\"],\"userFlags\":[\"New Flag 1\"]}" + + " }," + + " {" + + " \"uid\": 654321," + + " \"messageId\": \"78901\"," + + " \"modSeq\": 36," + + " \"internalDate\": \"2024-01-02T01:02:03Z\"," + + " \"oldFlags\": {\"systemFlags\":[\"Flagged\",\"Recent\"],\"userFlags\":[\"Old Flag 2\"]}," + + " \"newFlags\": {\"systemFlags\":[\"Answered\",\"Seen\"],\"userFlags\":[\"New Flag 2\"]}" + + " }" + + " ]," + + " \"user\": \"user\"" + + " }" + + "}"; + + private static final String EVENT_WITH_MESSAGE_IDS_WITHOUT_INTERNAL_DATE_JSON = "{" + " \"FlagsUpdated\": {" + " \"eventId\":\"6e0dd59d-660e-4d9b-b22f-0354479f47b4\"," + @@ -242,6 +282,29 @@ class FlagsUpdatedSerializationTest { assertThat(EVENT_SERIALIZER.fromJson(EVENT_WITH_MESSAGE_IDS_JSON).get()) .isEqualTo(eventWithMessageIds); } + + @Test + void flagsUpdatedShouldDeserializeWhenInternalDateIsMissing() { + UpdatedFlags updatedFlagsWithoutInternalDate1 = UpdatedFlags.builder() + .uid(MESSAGE_UID_1) + .messageId(messageId1) + .modSeq(MOD_SEQ_1) + .oldFlags(OLD_FLAGS_1) + .newFlags(NEW_FLAGS_1) + .build(); + UpdatedFlags updatedFlagsWithoutInternalDate2 = UpdatedFlags.builder() + .uid(MESSAGE_UID_2) + .messageId(messageId2) + .modSeq(MOD_SEQ_2) + .oldFlags(OLD_FLAGS_2) + .newFlags(NEW_FLAGS_2) + .build(); + FlagsUpdated eventWithoutInternalDate = new FlagsUpdated(SESSION_ID, USERNAME, + MAILBOX_PATH, MAILBOX_ID, ImmutableList.of(updatedFlagsWithoutInternalDate1, updatedFlagsWithoutInternalDate2), EVENT_ID); + + assertThat(EVENT_SERIALIZER.fromJson(EVENT_WITH_MESSAGE_IDS_WITHOUT_INTERNAL_DATE_JSON).get()) + .isEqualTo(eventWithoutInternalDate); + } } @Nested diff --git a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/MessageUtils.java b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/MessageUtils.java index bc2d8d0182..c0966be22a 100644 --- a/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/MessageUtils.java +++ b/mailbox/jpa/src/main/java/org/apache/james/mailbox/jpa/mail/MessageUtils.java @@ -74,6 +74,7 @@ class MessageUtils { updatedFlags.add(UpdatedFlags.builder() .uid(member.getUid()) + .internalDate(member.getInternalDate()) .modSeq(member.getModSeq()) .newFlags(newFlags) .oldFlags(originalFlags) diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java index 4d90d1e877..61c38d3148 100644 --- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java +++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java @@ -153,6 +153,7 @@ public class InMemoryMessageIdMapper implements MessageIdMapper { .modSeq(message.getModSeq()) .uid(message.getUid()) .messageId(message.getMessageId()) + .internalDate(message.getInternalDate()) .oldFlags(message.createFlags()) .newFlags(newState) .build(); diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java index 961b51fb53..e8f910eaa2 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageIdMapper.java @@ -193,18 +193,22 @@ public class PostgresMessageIdMapper implements MessageIdMapper { return Mono.empty(); }) .flatMapIterable(Function.identity()) - .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); + .flatMap(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft())); } - private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { - return Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), - UpdatedFlags.builder() - .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) - .messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()) - .modSeq(composedMessageIdWithMetaData.getModSeq()) - .oldFlags(oldFlags) - .newFlags(composedMessageIdWithMetaData.getFlags()) - .build()); + private Mono<Pair<MailboxId, UpdatedFlags>> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) { + PostgresMailboxId mailboxId = (PostgresMailboxId) composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(); + + return mailboxMessageDAO.retrieveInternalDate(mailboxId, composedMessageIdWithMetaData.getComposedMessageId().getUid()) + .map(internalDate -> Pair.of(composedMessageIdWithMetaData.getComposedMessageId().getMailboxId(), + UpdatedFlags.builder() + .uid(composedMessageIdWithMetaData.getComposedMessageId().getUid()) + .messageId(composedMessageIdWithMetaData.getComposedMessageId().getMessageId()) + .internalDate(internalDate) + .modSeq(composedMessageIdWithMetaData.getModSeq()) + .oldFlags(oldFlags) + .newFlags(composedMessageIdWithMetaData.getFlags()) + .build())); } private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) { diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java index f824b8bdcf..b4e225ed92 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMessageMapper.java @@ -301,10 +301,12 @@ public class PostgresMessageMapper implements MessageMapper { ModSeq newModSeq) { Flags oldFlags = currentMetaData.getFlags(); ComposedMessageId composedMessageId = currentMetaData.getComposedMessageId(); + Mono<Optional<Date>> internalDate = mailboxMessageDAO.retrieveInternalDate((PostgresMailboxId) composedMessageId.getMailboxId(), composedMessageId.getUid()); if (oldFlags.equals(flagsUpdateCalculator.buildNewFlags(oldFlags))) { - return Mono.just(UpdatedFlags.builder() + return internalDate.map(date -> UpdatedFlags.builder() .messageId(composedMessageId.getMessageId()) + .internalDate(date) .oldFlags(oldFlags) .newFlags(oldFlags) .uid(composedMessageId.getUid()) @@ -323,13 +325,15 @@ public class PostgresMessageMapper implements MessageMapper { default: return Mono.error(() -> new RuntimeException("Unknown MessageRange type " + mode)); } - }).map(updatedFlags -> UpdatedFlags.builder() - .messageId(composedMessageId.getMessageId()) - .oldFlags(oldFlags) - .newFlags(updatedFlags) - .uid(composedMessageId.getUid()) - .modSeq(newModSeq) - .build()); + }).flatMap(updatedFlags -> internalDate + .map(date -> UpdatedFlags.builder() + .messageId(composedMessageId.getMessageId()) + .internalDate(date) + .oldFlags(oldFlags) + .newFlags(updatedFlags) + .uid(composedMessageId.getUid()) + .modSeq(newModSeq) + .build())); } } @@ -353,6 +357,7 @@ public class PostgresMessageMapper implements MessageMapper { .flatMapMany(newModSeq -> mailboxMessageDAO.resetRecentFlag(mailboxId, List.copyOf(uidMapping.keySet()), newModSeq)) .map(newMetaData -> UpdatedFlags.builder() .messageId(newMetaData.getMessageId()) + .internalDate(newMetaData.getInternalDate()) .modSeq(newMetaData.getModSeq()) .oldFlags(uidMapping.get(newMetaData.getUid()).getFlags()) .newFlags(newMetaData.getFlags()) diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java index fdc5c1ec62..8ca50e936d 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxMessageDAO.java @@ -21,6 +21,7 @@ package org.apache.james.mailbox.postgres.mail.dao; import static org.apache.james.backends.postgres.PostgresCommons.DATE_TO_LOCAL_DATE_TIME; import static org.apache.james.backends.postgres.PostgresCommons.IN_CLAUSE_MAX_SIZE; +import static org.apache.james.backends.postgres.PostgresCommons.LOCAL_DATE_TIME_DATE_FUNCTION; import static org.apache.james.backends.postgres.PostgresCommons.UNNEST_FIELD; import static org.apache.james.backends.postgres.PostgresCommons.tableField; import static org.apache.james.backends.postgres.utils.PostgresExecutor.EAGER_FETCH; @@ -49,7 +50,9 @@ import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageD import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_METADATA_FUNCTION; import static org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAOUtils.RECORD_TO_MESSAGE_UID_FUNCTION; +import java.time.LocalDateTime; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; @@ -677,4 +680,13 @@ public class PostgresMailboxMessageDAO { .map(RECORD_TO_COMPOSED_MESSAGE_ID_WITH_META_DATA_FUNCTION); } + public Mono<Optional<Date>> retrieveInternalDate(PostgresMailboxId mailboxId, MessageUid uid) { + return postgresExecutor.executeRow(dslContext -> Mono.from(dslContext.select(INTERNAL_DATE) + .from(TABLE_NAME) + .where(MAILBOX_ID.eq(mailboxId.asUuid())) + .and(MESSAGE_UID.eq(uid.asLong())))) + .map(record -> Optional.of(LOCAL_DATE_TIME_DATE_FUNCTION.apply(record.get(INTERNAL_DATE, LocalDateTime.class)))) + .defaultIfEmpty(Optional.empty()); + } + } diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java index d1d5fb4116..d2302f3b4e 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/AbstractMessageMapper.java @@ -103,13 +103,14 @@ public abstract class AbstractMessageMapper extends TransactionalMapper implemen save(mailbox, member); } - updatedFlags.add(UpdatedFlags.builder() - .uid(member.getUid()) - .messageId(member.getMessageId()) - .modSeq(member.getModSeq()) - .newFlags(newFlags) - .oldFlags(originalFlags) - .build()); + updatedFlags.add(UpdatedFlags.builder() + .uid(member.getUid()) + .messageId(member.getMessageId()) + .internalDate(member.getInternalDate()) + .modSeq(member.getModSeq()) + .newFlags(newFlags) + .oldFlags(originalFlags) + .build()); } diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java index 0f26680875..618baacec9 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java @@ -364,6 +364,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() .uid(message1.getUid()) .messageId(messageId) + .internalDate(message1.getInternalDate()) .modSeq(modSeq) .oldFlags(new Flags()) .newFlags(newFlags) @@ -392,6 +393,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() .uid(message1.getUid()) .messageId(messageId) + .internalDate(message1.getInternalDate()) .modSeq(modSeq) .oldFlags(messageFlags) .newFlags(newFlags) @@ -422,6 +424,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() .uid(message1.getUid()) .messageId(messageId) + .internalDate(message1.getInternalDate()) .modSeq(modSeq) .oldFlags(messageFlags) .newFlags(new Flags(Flags.Flag.RECENT)) @@ -495,6 +498,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() .uid(message1.getUid()) .messageId(messageId) + .internalDate(message1.getInternalDate()) .modSeq(modSeq) .oldFlags(initialFlags) .newFlags(newFlags) @@ -523,6 +527,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder() .uid(message1.getUid()) .messageId(messageId) + .internalDate(message1.getInternalDate()) .modSeq(modSeqBenwaInboxMailbox) .oldFlags(new Flags()) .newFlags(newFlags) @@ -530,6 +535,7 @@ public abstract class MessageIdMapperTest { UpdatedFlags expectedUpdatedFlags2 = UpdatedFlags.builder() .uid(message1InOtherMailbox.getUid()) .messageId(messageId) + .internalDate(message1InOtherMailbox.getInternalDate()) .modSeq(modSeqBenwaWorkMailbox) .oldFlags(new Flags()) .newFlags(newFlags) @@ -874,6 +880,7 @@ public abstract class MessageIdMapperTest { .modSeq(modSeq) .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .newFlags(flags) .oldFlags(flags) .build()))); diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java index bef9c55b43..d3f0bfa9cb 100644 --- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java +++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java @@ -764,6 +764,7 @@ public abstract class MessageMapperTest { .contains(UpdatedFlags.builder() .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .modSeq(modSeq.next()) .oldFlags(new Flags()) .newFlags(new Flags(Flags.Flag.FLAGGED)) @@ -779,6 +780,7 @@ public abstract class MessageMapperTest { .contains(UpdatedFlags.builder() .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .modSeq(modSeq.next()) .oldFlags(new Flags(Flags.Flag.FLAGGED)) .newFlags(new FlagsBuilder().add(Flags.Flag.SEEN, Flags.Flag.FLAGGED).build()) @@ -813,6 +815,7 @@ public abstract class MessageMapperTest { UpdatedFlags.builder() .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .modSeq(modSeq.next()) .oldFlags(new FlagsBuilder().add(Flags.Flag.SEEN, Flags.Flag.FLAGGED).build()) .newFlags(new Flags(Flags.Flag.FLAGGED)) @@ -984,6 +987,7 @@ public abstract class MessageMapperTest { UpdatedFlags.builder() .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .modSeq(modSeq.next()) .oldFlags(new Flags()) .newFlags(new Flags(USER_FLAG)) @@ -1001,6 +1005,7 @@ public abstract class MessageMapperTest { UpdatedFlags.builder() .uid(message1.getUid()) .messageId(message1.getMessageId()) + .internalDate(message1.getInternalDate()) .modSeq(message1.getModSeq()) .oldFlags(new Flags()) .newFlags(new Flags()) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
