This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2bbde614cd5b6c8463038a492167eb18312c1967 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Oct 22 14:34:53 2020 +0700 JAMES-3277 Optimize range message updates for RFC-8621 --- .../rfc8621/contract/EmailSetMethodContract.scala | 312 +++++++++++++++++++++ .../org/apache/james/jmap/mail/EmailSet.scala | 19 +- .../apache/james/jmap/method/EmailSetMethod.scala | 104 ++++++- 3 files changed, 419 insertions(+), 16 deletions(-) diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala index 8a92b53..4029127 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EmailSetMethodContract.scala @@ -670,6 +670,318 @@ trait EmailSetMethodContract { } @Test + def rangeFlagsAdditionShouldUpdateStoredFlags(server: GuiceJamesServer): Unit = { + val message: Message = Fixture.createTestMessage + + val flags: Flags = FlagsBuilder.builder() + .add(Flags.Flag.ANSWERED) + .build() + + val bobPath = MailboxPath.inbox(BOB) + server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) + val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + + val request = String.format( + s"""{ + | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], + | "methodCalls": [ + | ["Email/set", { + | "accountId": "$ACCOUNT_ID", + | "update": { + | "${messageId1.serialize}":{ + | "keywords/music": true + | }, + | "${messageId2.serialize}":{ + | "keywords/music": true + | }, + | "${messageId3.serialize}":{ + | "keywords/music": true + | }, + | "${messageId4.serialize}":{ + | "keywords/music": true + | } + | } + | }, "c1"], + | ["Email/get", + | { + | "accountId": "$ACCOUNT_ID", + | "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"], + | "properties": ["keywords"] + | }, + | "c2"]] + |}""".stripMargin, "$Seen") + + val response = `given` + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(request) + .when + .post + .`then` + .statusCode(SC_OK) + .contentType(JSON) + .extract + .body + .asString + + assertThatJson(response) + .inPath("methodResponses[0][1].updated") + .isEqualTo(s"""{ + | "${messageId1.serialize}": null, + | "${messageId2.serialize}": null, + | "${messageId3.serialize}": null, + | "${messageId4.serialize}": null + |} + """.stripMargin) + assertThatJson(response) + .inPath("methodResponses[1][1].list") + .isEqualTo(String.format( + """[ + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true, + | "music": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true, + | "music": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true, + | "music": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true, + | "music": true + | } + |} + |] + """.stripMargin, messageId1.serialize, messageId2.serialize, messageId3.serialize, messageId4.serialize)) + } + + @Test + def rangeFlagsRemovalShouldUpdateStoredFlags(server: GuiceJamesServer): Unit = { + val message: Message = Fixture.createTestMessage + + val flags: Flags = FlagsBuilder.builder() + .add(Flags.Flag.ANSWERED) + .add("music") + .build() + + val bobPath = MailboxPath.inbox(BOB) + server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) + val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + + val request = String.format( + s"""{ + | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], + | "methodCalls": [ + | ["Email/set", { + | "accountId": "$ACCOUNT_ID", + | "update": { + | "${messageId1.serialize}":{ + | "keywords/music": null + | }, + | "${messageId2.serialize}":{ + | "keywords/music": null + | }, + | "${messageId3.serialize}":{ + | "keywords/music": null + | }, + | "${messageId4.serialize}":{ + | "keywords/music": null + | } + | } + | }, "c1"], + | ["Email/get", + | { + | "accountId": "$ACCOUNT_ID", + | "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"], + | "properties": ["keywords"] + | }, + | "c2"]] + |}""".stripMargin, "$Seen") + + val response = `given` + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(request) + .when + .post + .`then` + .statusCode(SC_OK) + .contentType(JSON) + .extract + .body + .asString + + assertThatJson(response) + .inPath("methodResponses[0][1].updated") + .isEqualTo(s"""{ + | "${messageId1.serialize}": null, + | "${messageId2.serialize}": null, + | "${messageId3.serialize}": null, + | "${messageId4.serialize}": null + |} + """.stripMargin) + assertThatJson(response) + .inPath("methodResponses[1][1].list") + .isEqualTo(String.format( + """[ + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true + | } + |}, + |{ + | "id":"%s", + | "keywords": { + | "$Answered": true + | } + |} + |] + """.stripMargin, messageId1.serialize, messageId2.serialize, messageId3.serialize, messageId4.serialize)) + } + + @Test + def rangeMoveShouldUpdateMailboxId(server: GuiceJamesServer): Unit = { + val message: Message = Fixture.createTestMessage + + val flags: Flags = FlagsBuilder.builder() + .add(Flags.Flag.ANSWERED) + .add("music") + .build() + + val bobPath = MailboxPath.inbox(BOB) + server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath) + val newId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.forUser(BOB, "other")) + val messageId1: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId2: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId3: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + val messageId4: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobPath, AppendCommand.builder() + .withFlags(flags).build(message)).getMessageId + + val request = String.format( + s"""{ + | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"], + | "methodCalls": [ + | ["Email/set", { + | "accountId": "$ACCOUNT_ID", + | "update": { + | "${messageId1.serialize}":{ + | "mailboxIds": { "${newId.serialize()}" : true} + | }, + | "${messageId2.serialize}":{ + | "mailboxIds": { "${newId.serialize()}" : true} + | }, + | "${messageId3.serialize}":{ + | "mailboxIds": { "${newId.serialize()}" : true} + | }, + | "${messageId4.serialize}":{ + | "mailboxIds": { "${newId.serialize()}" : true} + | } + | } + | }, "c1"], + | ["Email/get", + | { + | "accountId": "$ACCOUNT_ID", + | "ids": ["${messageId1.serialize}", "${messageId2.serialize}", "${messageId3.serialize}", "${messageId4.serialize}"], + | "properties": ["mailboxIds"] + | }, + | "c2"]] + |}""".stripMargin, "$Seen") + + val response = `given` + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(request) + .when + .post + .`then` + .statusCode(SC_OK) + .contentType(JSON) + .extract + .body + .asString + + assertThatJson(response) + .inPath("methodResponses[0][1].updated") + .isEqualTo(s"""{ + | "${messageId1.serialize}": null, + | "${messageId2.serialize}": null, + | "${messageId3.serialize}": null, + | "${messageId4.serialize}": null + |} + """.stripMargin) + assertThatJson(response) + .inPath("methodResponses[1][1].list") + .isEqualTo(s"""[ + |{ + | "id":"${messageId1.serialize}", + | "mailboxIds": { + | "${newId.serialize}": true + | } + |}, + |{ + | "id":"${messageId2.serialize}", + | "mailboxIds": { + | "${newId.serialize}": true + | } + |}, + |{ + | "id":"${messageId3.serialize}", + | "mailboxIds": { + | "${newId.serialize}": true + | } + |}, + |{ + | "id":"${messageId4.serialize}", + | "mailboxIds": { + | "${newId.serialize}": true + | } + |} + |] + """.stripMargin) + } + + @Test def emailSetShouldRejectPartiallyUpdateAndResetKeywordsAtTheSameTime(server: GuiceJamesServer): Unit = { val message: Message = Fixture.createTestMessage diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala index 8e36b33..ddeac39 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/mail/EmailSet.scala @@ -23,13 +23,12 @@ import eu.timepit.refined.api.Refined import eu.timepit.refined.collection.NonEmpty import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId import org.apache.james.jmap.method.WithAccountId -import org.apache.james.jmap.model.KeywordsFactory.STRICT_KEYWORDS_FACTORY import org.apache.james.jmap.model.State.State import org.apache.james.jmap.model.{AccountId, Keywords, SetError} import org.apache.james.mailbox.model.MessageId import play.api.libs.json.JsObject -import scala.util.{Failure, Right, Success, Try} +import scala.util.{Right, Try} object EmailSet { type UnparsedMessageIdConstraint = NonEmpty @@ -97,13 +96,23 @@ case class EmailSetUpdate(keywords: Option[Keywords], .compose(keywordsRemoval) .compose(keywordsReset) - Right(ValidatedEmailSetUpdate(keywordsTransformation, mailboxIdsTransformation)) + Right(ValidatedEmailSetUpdate(keywordsTransformation, mailboxIdsTransformation, this)) } } + + def isOnlyMove: Boolean = mailboxIds.isDefined && mailboxIds.get.value.size == 1 && + keywords.isEmpty && keywordsToAdd.isEmpty && keywordsToRemove.isEmpty + + def isOnlyFlagAddition: Boolean = keywordsToAdd.isDefined && keywordsToRemove.isEmpty && mailboxIds.isEmpty && + mailboxIdsToAdd.isEmpty && mailboxIdsToRemove.isEmpty + + def isOnlyFlagRemoval: Boolean = keywordsToRemove.isDefined && keywordsToAdd.isEmpty && mailboxIds.isEmpty && + mailboxIdsToAdd.isEmpty && mailboxIdsToRemove.isEmpty } -case class ValidatedEmailSetUpdate private (keywords: Function[Keywords, Keywords], - mailboxIdsTransformation: Function[MailboxIds, MailboxIds]) +case class ValidatedEmailSetUpdate private (keywordsTransformation: Function[Keywords, Keywords], + mailboxIdsTransformation: Function[MailboxIds, MailboxIds], + update: EmailSetUpdate) class EmailUpdateValidationException() extends IllegalArgumentException case class InvalidEmailPropertyException(property: String, cause: String) extends EmailUpdateValidationException diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala index 051d435..8c83e61 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala @@ -18,6 +18,8 @@ ****************************************************************/ package org.apache.james.jmap.method +import java.util.function.Consumer + import com.google.common.collect.ImmutableList import eu.timepit.refined.auto._ import javax.inject.Inject @@ -34,8 +36,8 @@ import org.apache.james.jmap.model.SetError.SetErrorDescription import org.apache.james.jmap.model.{Capabilities, Invocation, SetError, State} import org.apache.james.mailbox.MessageManager.FlagsUpdateMode import org.apache.james.mailbox.exception.MailboxNotFoundException -import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MessageId} -import org.apache.james.mailbox.{MailboxSession, MessageIdManager} +import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MailboxId, MessageId, MessageRange} +import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager} import org.apache.james.metrics.api.MetricFactory import play.api.libs.json.{JsError, JsObject, JsSuccess} import reactor.core.scala.publisher.{SFlux, SMono} @@ -47,6 +49,7 @@ case class MessageNotFoundExeception(messageId: MessageId) extends Exception class EmailSetMethod @Inject()(serializer: EmailSetSerializer, messageIdManager: MessageIdManager, + mailboxManager: MailboxManager, messageIdFactory: MessageId.Factory, val metricFactory: MetricFactory, val sessionSupplier: SessionSupplier) extends MethodRequiringAccountId[EmailSetRequest] { @@ -137,7 +140,7 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer, override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSetRequest): SMono[InvocationWithContext] = { for { destroyResults <- destroy(request, mailboxSession) - updateResults <- update(request, mailboxSession) + updateResults <- update(request, mailboxSession).doOnError(e => e.printStackTrace()) } yield InvocationWithContext( invocation = Invocation( methodName = invocation.invocation.methodName, @@ -199,19 +202,98 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer, updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session)) .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId) .flatMap(metaData => { - SFlux.fromIterable(validUpdates) - .concatMap[UpdateResult]({ - case (messageId, updatePatch) => - doUpdate(messageId, updatePatch, metaData.get(messageId).toList.flatten, session) - }) - .collectSeq() + doUpdate(validUpdates, metaData, session) }) } yield { UpdateResults(updates ++ failures) } } - private def doUpdate(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = { + private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[UpdateResult]] = { + val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1 + val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1 + + if (sameUpdate && singleMailbox && validUpdates.size > 3) { + val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get + val ranges: List[MessageRange] = asRanges(metaData) + val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get + + if (update.update.isOnlyFlagAddition) { + updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session) + } else if (update.update.isOnlyFlagRemoval) { + updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session) + } else if (update.update.isOnlyMove) { + moveByRange(mailboxId, update, ranges, metaData, session) + } else { + updateEachMessage(validUpdates, metaData, session) + } + } else { + updateEachMessage(validUpdates, metaData, session) + } + } + + private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) = + MessageRange.toRanges(metaData.values + .flatten.map(_.getComposedMessageId.getUid) + .toList.asJava) + .asScala.toList + + private def updateFlagsByRange(mailboxId: MailboxId, + flags: Flags, + ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + updateMode: FlagsUpdateMode, + session: MailboxSession): SMono[Seq[UpdateResult]] = { + val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session)) + + mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData, + range => mailbox.setFlags(flags, updateMode, range, session))) + .subscribeOn(Schedulers.elastic()) + } + + private def moveByRange(mailboxId: MailboxId, + update: ValidatedEmailSetUpdate, + ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[UpdateResult]] = { + val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get + + updateByRange(ranges, metaData, + range => mailboxManager.moveMessages(range, mailboxId, targetId, session)) + } + + private def updateByRange(ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + operation: Consumer[MessageRange]): SMono[Seq[UpdateResult]] = { + + SFlux.fromIterable(ranges) + .concatMap(range => { + val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid))) + .keys + .toSeq + SMono.fromCallable[Seq[UpdateResult]](() => { + operation.accept(range) + messageIds.map(UpdateSuccess) + }) + .onErrorResume(e => SMono.just(messageIds.map(id => UpdateFailure(EmailSet.asUnparsed(id), e)))) + .subscribeOn(Schedulers.elastic()) + }) + .reduce(Seq(), _ ++ _) + } + + private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[UpdateResult]] = + SFlux.fromIterable(validUpdates) + .concatMap[UpdateResult]({ + case (messageId, updatePatch) => + updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session) + }) + .collectSeq() + + private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = { val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId)) val originFlags: Flags = storedMetaData .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => { @@ -246,7 +328,7 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer, } private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[UpdateResult] = { - val newFlags = update.keywords + val newFlags = update.keywordsTransformation .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get) .asFlagsWithRecentAndDeletedFrom(originalFlags) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
