This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 88f6d210a6f766c6bc0f3c3634372803fbbcc685 Author: Benoit Tellier <[email protected]> AuthorDate: Mon Nov 2 10:35:04 2020 +0700 [REFACTORING] Split EmailSetMethod and extract create/update/destroy --- .../jmap/method/EmailSetCreatePerformer.scala | 102 ++++++ .../jmap/method/EmailSetDeletePerformer.scala | 99 ++++++ .../apache/james/jmap/method/EmailSetMethod.scala | 361 +-------------------- .../jmap/method/EmailSetUpdatePerformer.scala | 249 ++++++++++++++ 4 files changed, 463 insertions(+), 348 deletions(-) diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala new file mode 100644 index 0000000..b7abebe --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetCreatePerformer.scala @@ -0,0 +1,102 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.method + +import java.time.ZonedDateTime +import java.util.Date + +import javax.inject.Inject +import javax.mail.Flags +import org.apache.james.jmap.core.SetError.SetErrorDescription +import org.apache.james.jmap.core.{SetError, UTCDate} +import org.apache.james.jmap.json.EmailSetSerializer +import org.apache.james.jmap.mail.EmailSet.EmailCreationId +import org.apache.james.jmap.mail.{EmailCreationRequest, EmailCreationResponse, EmailSetRequest} +import org.apache.james.jmap.method.EmailSetCreatePerformer.{CreationFailure, CreationResult, CreationResults, CreationSuccess} +import org.apache.james.mailbox.MessageManager.AppendCommand +import org.apache.james.mailbox.exception.MailboxNotFoundException +import org.apache.james.mailbox.model.MailboxId +import org.apache.james.mailbox.{MailboxManager, MailboxSession} +import reactor.core.scala.publisher.{SFlux, SMono} +import reactor.core.scheduler.Schedulers + +object EmailSetCreatePerformer { + case class CreationResults(results: Seq[CreationResult]) { + def created: Option[Map[EmailCreationId, EmailCreationResponse]] = + Option(results.flatMap{ + case result: CreationSuccess => Some((result.clientId, result.response)) + case _ => None + }.toMap) + .filter(_.nonEmpty) + + def notCreated: Option[Map[EmailCreationId, SetError]] = { + Option(results.flatMap{ + case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError)) + case _ => None + } + .toMap) + .filter(_.nonEmpty) + } + } + trait CreationResult + case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult + case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult { + def asMessageSetError: SetError = e match { + case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage)) + case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage)) + case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) + } + } +} + +class EmailSetCreatePerformer @Inject()(serializer: EmailSetSerializer, + mailboxManager: MailboxManager) { + + def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] = + SFlux.fromIterable(request.create.getOrElse(Map())) + .concatMap { + case (clientId, json) => serializer.deserializeCreationRequest(json) + .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))), + creationRequest => create(clientId, creationRequest, mailboxSession)) + }.collectSeq() + .map(CreationResults) + + private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = { + val mailboxIds: List[MailboxId] = request.mailboxIds.value + if (mailboxIds.size != 1) { + SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1"))) + } else { + request.toMime4JMessage + .fold(e => SMono.just(CreationFailure(clientId, e)), + message => SMono.fromCallable[CreationResult](() => { + val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession) + .appendMessage(AppendCommand.builder() + .recent() + .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags())) + .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant)) + .build(message), + mailboxSession) + CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId)) + }) + .subscribeOn(Schedulers.elastic()) + .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e)))) + } + } +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala new file mode 100644 index 0000000..da56ab5 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetDeletePerformer.scala @@ -0,0 +1,99 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.method + +import javax.inject.Inject +import org.apache.james.jmap.core.SetError +import org.apache.james.jmap.core.SetError.SetErrorDescription +import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId +import org.apache.james.jmap.mail.{DestroyIds, EmailSet, EmailSetRequest} +import org.apache.james.jmap.method.EmailSetDeletePerformer.{DestroyFailure, DestroyResult, DestroyResults} +import org.apache.james.mailbox.model.{DeleteResult, MessageId} +import org.apache.james.mailbox.{MailboxSession, MessageIdManager} +import reactor.core.scala.publisher.SMono +import reactor.core.scheduler.Schedulers + +import scala.jdk.CollectionConverters._ + +object EmailSetDeletePerformer { + case class DestroyResults(results: Seq[DestroyResult]) { + def destroyed: Option[DestroyIds] = + Option(results.flatMap{ + case result: DestroySuccess => Some(result.messageId) + case _ => None + }.map(EmailSet.asUnparsed)) + .filter(_.nonEmpty) + .map(DestroyIds) + + def notDestroyed: Option[Map[UnparsedMessageId, SetError]] = + Option(results.flatMap{ + case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError)) + case _ => None + }.toMap) + .filter(_.nonEmpty) + } + object DestroyResult { + def from(deleteResult: DeleteResult): Seq[DestroyResult] = { + val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq + .map(DestroySuccess) + val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq + .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id))) + + success ++ notFound + } + } + trait DestroyResult + case class DestroySuccess(messageId: MessageId) extends DestroyResult + case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult { + def asMessageSetError: SetError = e match { + case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}")) + case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}")) + case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) + } + } +} + +class EmailSetDeletePerformer @Inject()(messageIdManager: MessageIdManager, + messageIdFactory: MessageId.Factory) { + def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = { + if (emailSetRequest.destroy.isDefined) { + val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value + .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither + .left.map(e => DestroyFailure(unparsedId, e))) + val messageIds: Seq[MessageId] = messageIdsValidation.flatMap { + case Right(messageId) => Some(messageId) + case _ => None + } + val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap { + case Left(e) => Some(e) + case _ => None + } + + SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession)) + .map(DestroyResult.from) + .subscribeOn(Schedulers.elastic()) + .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e)))) + .map(_ ++ parsingErrors) + .map(DestroyResults) + } else { + SMono.just(DestroyResults(Seq())) + } + } +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala index 5a2dd36..1bcc308 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetMethod.scala @@ -18,142 +18,36 @@ ****************************************************************/ package org.apache.james.jmap.method -import java.time.ZonedDateTime -import java.util.Date -import java.util.function.Consumer - -import com.google.common.collect.ImmutableList import eu.timepit.refined.auto._ import javax.inject.Inject -import javax.mail.Flags import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, JMAP_CORE, JMAP_MAIL} import org.apache.james.jmap.core.Invocation.{Arguments, MethodName} -import org.apache.james.jmap.core.SetError.SetErrorDescription -import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, SetError, State, UTCDate} +import org.apache.james.jmap.core.{ClientId, Id, Invocation, ServerId, State} import org.apache.james.jmap.json.{EmailSetSerializer, ResponseSerializer} -import org.apache.james.jmap.mail.EmailSet.{EmailCreationId, UnparsedMessageId} -import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY -import org.apache.james.jmap.mail.{DestroyIds, EmailCreationRequest, EmailCreationResponse, EmailSet, EmailSetRequest, EmailSetResponse, MailboxIds, ValidatedEmailSetUpdate} +import org.apache.james.jmap.mail.{EmailSetRequest, EmailSetResponse} import org.apache.james.jmap.routes.SessionSupplier -import org.apache.james.mailbox.MessageManager.{AppendCommand, FlagsUpdateMode} -import org.apache.james.mailbox.exception.MailboxNotFoundException -import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, DeleteResult, MailboxId, MessageId, MessageRange} -import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager} +import org.apache.james.mailbox.MailboxSession +import org.apache.james.mailbox.model.MessageId import org.apache.james.metrics.api.MetricFactory -import play.api.libs.json.{JsError, JsObject, JsSuccess} -import reactor.core.scala.publisher.{SFlux, SMono} -import reactor.core.scheduler.Schedulers - -import scala.jdk.CollectionConverters._ +import play.api.libs.json.{JsError, JsSuccess} +import reactor.core.scala.publisher.SMono case class MessageNotFoundExeception(messageId: MessageId) extends Exception class EmailSetMethod @Inject()(serializer: EmailSetSerializer, - messageIdManager: MessageIdManager, - mailboxManager: MailboxManager, - messageIdFactory: MessageId.Factory, val metricFactory: MetricFactory, - val sessionSupplier: SessionSupplier) extends MethodRequiringAccountId[EmailSetRequest] { - - case class DestroyResults(results: Seq[DestroyResult]) { - def destroyed: Option[DestroyIds] = - Option(results.flatMap{ - case result: DestroySuccess => Some(result.messageId) - case _ => None - }.map(EmailSet.asUnparsed)) - .filter(_.nonEmpty) - .map(DestroyIds) - - def notDestroyed: Option[Map[UnparsedMessageId, SetError]] = - Option(results.flatMap{ - case failure: DestroyFailure => Some((failure.unparsedMessageId, failure.asMessageSetError)) - case _ => None - }.toMap) - .filter(_.nonEmpty) - } - - object DestroyResult { - def from(deleteResult: DeleteResult): Seq[DestroyResult] = { - val success: Seq[DestroySuccess] = deleteResult.getDestroyed.asScala.toSeq - .map(DestroySuccess) - val notFound: Seq[DestroyResult] = deleteResult.getNotFound.asScala.toSeq - .map(id => DestroyFailure(EmailSet.asUnparsed(id), MessageNotFoundExeception(id))) - - success ++ notFound - } - } - - trait DestroyResult - case class DestroySuccess(messageId: MessageId) extends DestroyResult - case class DestroyFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends DestroyResult { - def asMessageSetError: SetError = e match { - case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(s"$unparsedMessageId is not a messageId: ${e.getMessage}")) - case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}")) - case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) - } - } - - case class CreationResults(results: Seq[CreationResult]) { - def created: Option[Map[EmailCreationId, EmailCreationResponse]] = - Option(results.flatMap{ - case result: CreationSuccess => Some((result.clientId, result.response)) - case _ => None - }.toMap) - .filter(_.nonEmpty) - - def notCreated: Option[Map[EmailCreationId, SetError]] = { - Option(results.flatMap{ - case failure: CreationFailure => Some((failure.clientId, failure.asMessageSetError)) - case _ => None - } - .toMap) - .filter(_.nonEmpty) - } - } - trait CreationResult - case class CreationSuccess(clientId: EmailCreationId, response: EmailCreationResponse) extends CreationResult - case class CreationFailure(clientId: EmailCreationId, e: Throwable) extends CreationResult { - def asMessageSetError: SetError = e match { - case e: IllegalArgumentException => SetError.invalidArguments(SetErrorDescription(e.getMessage)) - case e: MailboxNotFoundException => SetError.notFound(SetErrorDescription("Mailbox " + e.getMessage)) - case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) - } - } - - trait UpdateResult - case class UpdateSuccess(messageId: MessageId) extends UpdateResult - case class UpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends UpdateResult { - def asMessageSetError: SetError = e match { - case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}")) - case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found")) - case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}")) - case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) - } - } - case class UpdateResults(results: Seq[UpdateResult]) { - def updated: Option[Map[MessageId, Unit]] = - Option(results.flatMap{ - case result: UpdateSuccess => Some(result.messageId, ()) - case _ => None - }.toMap) - .filter(_.nonEmpty) - - def notUpdated: Option[Map[UnparsedMessageId, SetError]] = - Option(results.flatMap{ - case failure: UpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError)) - case _ => None - }.toMap) - .filter(_.nonEmpty) - } - + val sessionSupplier: SessionSupplier, + createPerformer: EmailSetCreatePerformer, + deletePerformer: EmailSetDeletePerformer, + updatePerformer: EmailSetUpdatePerformer) extends MethodRequiringAccountId[EmailSetRequest] { override val methodName: MethodName = MethodName("Email/set") override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE, JMAP_MAIL) override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: InvocationWithContext, mailboxSession: MailboxSession, request: EmailSetRequest): SMono[InvocationWithContext] = { for { - destroyResults <- destroy(request, mailboxSession) - updateResults <- update(request, mailboxSession) - created <- create(request, mailboxSession) + destroyResults <- deletePerformer.destroy(request, mailboxSession) + updateResults <- updatePerformer.update(request, mailboxSession) + created <- createPerformer.create(request, mailboxSession) } yield InvocationWithContext( invocation = Invocation( methodName = invocation.invocation.methodName, @@ -183,233 +77,4 @@ class EmailSetMethod @Inject()(serializer: EmailSetSerializer, case JsSuccess(emailSetRequest, _) => SMono.just(emailSetRequest) case errors: JsError => SMono.raiseError(new IllegalArgumentException(ResponseSerializer.serialize(errors).toString)) } - - private def destroy(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[DestroyResults] = { - if (emailSetRequest.destroy.isDefined) { - val messageIdsValidation: Seq[Either[DestroyFailure, MessageId]] = emailSetRequest.destroy.get.value - .map(unparsedId => EmailSet.parse(messageIdFactory)(unparsedId).toEither - .left.map(e => DestroyFailure(unparsedId, e))) - val messageIds: Seq[MessageId] = messageIdsValidation.flatMap { - case Right(messageId) => Some(messageId) - case _ => None - } - val parsingErrors: Seq[DestroyFailure] = messageIdsValidation.flatMap { - case Left(e) => Some(e) - case _ => None - } - - SMono.fromCallable(() => messageIdManager.delete(messageIds.toList.asJava, mailboxSession)) - .map(DestroyResult.from) - .subscribeOn(Schedulers.elastic()) - .onErrorResume(e => SMono.just(messageIds.map(id => DestroyFailure(EmailSet.asUnparsed(id), e)))) - .map(_ ++ parsingErrors) - .map(DestroyResults) - } else { - SMono.just(DestroyResults(Seq())) - } - } - - private def create(request: EmailSetRequest, mailboxSession: MailboxSession): SMono[CreationResults] = - SFlux.fromIterable(request.create.getOrElse(Map())) - .concatMap { - case (clientId, json) => serializer.deserializeCreationRequest(json) - .fold(e => SMono.just[CreationResult](CreationFailure(clientId, new IllegalArgumentException(e.toString))), - creationRequest => create(clientId, creationRequest, mailboxSession)) - }.collectSeq() - .map(CreationResults) - - private def create(clientId: EmailCreationId, request: EmailCreationRequest, mailboxSession: MailboxSession): SMono[CreationResult] = { - val mailboxIds: List[MailboxId] = request.mailboxIds.value - if (mailboxIds.size != 1) { - SMono.just(CreationFailure(clientId, new IllegalArgumentException("mailboxIds need to have size 1"))) - } else { - request.toMime4JMessage - .fold(e => SMono.just(CreationFailure(clientId, e)), - message => SMono.fromCallable[CreationResult](() => { - val appendResult = mailboxManager.getMailbox(mailboxIds.head, mailboxSession) - .appendMessage(AppendCommand.builder() - .recent() - .withFlags(request.keywords.map(_.asFlags).getOrElse(new Flags())) - .withInternalDate(Date.from(request.receivedAt.getOrElse(UTCDate(ZonedDateTime.now())).asUTC.toInstant)) - .build(message), - mailboxSession) - CreationSuccess(clientId, EmailCreationResponse(appendResult.getId.getMessageId)) - }) - .subscribeOn(Schedulers.elastic()) - .onErrorResume(e => SMono.just[CreationResult](CreationFailure(clientId, e)))) - } - } - - private def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[UpdateResults] = { - emailSetRequest.update - .filter(_.nonEmpty) - .map(update(_, mailboxSession)) - .getOrElse(SMono.just(UpdateResults(Seq()))) - } - - private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[UpdateResults] = { - val validatedUpdates: List[Either[UpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates - .map({ - case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId) - .toEither - .left.map(e => UpdateFailure(unparsedMessageId, e)) - .flatMap(id => serializer.deserializeEmailSetUpdate(json) - .asEither.left.map(e => new IllegalArgumentException(e.toString)) - .flatMap(_.validate) - .fold(e => Left(UpdateFailure(unparsedMessageId, e)), - emailSetUpdate => Right((id, emailSetUpdate)))) - }) - .toList - val failures: List[UpdateFailure] = validatedUpdates.flatMap({ - case Left(e) => Some(e) - case _ => None - }) - val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({ - case Right(pair) => Some(pair) - case _ => None - }) - - for { - updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session)) - .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId) - .flatMap(metaData => { - doUpdate(validUpdates, metaData, session) - }) - } yield { - UpdateResults(updates ++ failures) - } - } - - private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], - metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], - session: MailboxSession): SMono[Seq[UpdateResult]] = { - val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1 - val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1 - - if (sameUpdate && singleMailbox && validUpdates.size > 3) { - val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get - val ranges: List[MessageRange] = asRanges(metaData) - val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get - - if (update.update.isOnlyFlagAddition) { - updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session) - } else if (update.update.isOnlyFlagRemoval) { - updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session) - } else if (update.update.isOnlyMove) { - moveByRange(mailboxId, update, ranges, metaData, session) - } else { - updateEachMessage(validUpdates, metaData, session) - } - } else { - updateEachMessage(validUpdates, metaData, session) - } - } - - private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) = - MessageRange.toRanges(metaData.values - .flatten.map(_.getComposedMessageId.getUid) - .toList.asJava) - .asScala.toList - - private def updateFlagsByRange(mailboxId: MailboxId, - flags: Flags, - ranges: List[MessageRange], - metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], - updateMode: FlagsUpdateMode, - session: MailboxSession): SMono[Seq[UpdateResult]] = { - val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session)) - - mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData, - range => mailbox.setFlags(flags, updateMode, range, session))) - .subscribeOn(Schedulers.elastic()) - } - - private def moveByRange(mailboxId: MailboxId, - update: ValidatedEmailSetUpdate, - ranges: List[MessageRange], - metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], - session: MailboxSession): SMono[Seq[UpdateResult]] = { - val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get - - updateByRange(ranges, metaData, - range => mailboxManager.moveMessages(range, mailboxId, targetId, session)) - } - - private def updateByRange(ranges: List[MessageRange], - metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], - operation: Consumer[MessageRange]): SMono[Seq[UpdateResult]] = { - - SFlux.fromIterable(ranges) - .concatMap(range => { - val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid))) - .keys - .toSeq - SMono.fromCallable[Seq[UpdateResult]](() => { - operation.accept(range) - messageIds.map(UpdateSuccess) - }) - .onErrorResume(e => SMono.just(messageIds.map(id => UpdateFailure(EmailSet.asUnparsed(id), e)))) - .subscribeOn(Schedulers.elastic()) - }) - .reduce(Seq(), _ ++ _) - } - - private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], - metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], - session: MailboxSession): SMono[Seq[UpdateResult]] = - SFlux.fromIterable(validUpdates) - .concatMap[UpdateResult]({ - case (messageId, updatePatch) => - updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session) - }) - .collectSeq() - - private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[UpdateResult] = { - val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId)) - val originFlags: Flags = storedMetaData - .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => { - flags.add(m.getFlags) - flags - }) - - if (mailboxIds.value.isEmpty) { - SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId))) - } else { - updateFlags(messageId, update, mailboxIds, originFlags, session) - .flatMap { - case failure: UpdateFailure => SMono.just[UpdateResult](failure) - case _: UpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session) - } - .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e))) - .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId))) - } - } - - private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[UpdateResult] = { - val targetIds = update.mailboxIdsTransformation.apply(mailboxIds) - if (targetIds.equals(mailboxIds)) { - SMono.just[UpdateResult](UpdateSuccess(messageId)) - } else { - SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session)) - .subscribeOn(Schedulers.elastic()) - .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId))) - .onErrorResume(e => SMono.just[UpdateResult](UpdateFailure(EmailSet.asUnparsed(messageId), e))) - .switchIfEmpty(SMono.just[UpdateResult](UpdateSuccess(messageId))) - } - } - - private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[UpdateResult] = { - val newFlags = update.keywordsTransformation - .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get) - .asFlagsWithRecentAndDeletedFrom(originalFlags) - - if (newFlags.equals(originalFlags)) { - SMono.just[UpdateResult](UpdateSuccess(messageId)) - } else { - SMono.fromCallable(() => - messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session)) - .subscribeOn(Schedulers.elastic()) - .`then`(SMono.just[UpdateResult](UpdateSuccess(messageId))) - } - } } \ No newline at end of file diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala new file mode 100644 index 0000000..662435d --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSetUpdatePerformer.scala @@ -0,0 +1,249 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.method + +import java.util.function.Consumer + +import com.google.common.collect.ImmutableList +import javax.inject.Inject +import javax.mail.Flags +import org.apache.james.jmap.core.SetError +import org.apache.james.jmap.core.SetError.SetErrorDescription +import org.apache.james.jmap.json.EmailSetSerializer +import org.apache.james.jmap.mail.EmailSet.UnparsedMessageId +import org.apache.james.jmap.mail.KeywordsFactory.LENIENT_KEYWORDS_FACTORY +import org.apache.james.jmap.mail.{EmailSet, EmailSetRequest, MailboxIds, ValidatedEmailSetUpdate} +import org.apache.james.jmap.method.EmailSetUpdatePerformer.{EmailUpdateFailure, EmailUpdateResult, EmailUpdateResults, EmailUpdateSuccess} +import org.apache.james.mailbox.MessageManager.FlagsUpdateMode +import org.apache.james.mailbox.exception.MailboxNotFoundException +import org.apache.james.mailbox.model.{ComposedMessageIdWithMetaData, MailboxId, MessageId, MessageRange} +import org.apache.james.mailbox.{MailboxManager, MailboxSession, MessageIdManager, MessageManager} +import play.api.libs.json.JsObject +import reactor.core.scala.publisher.{SFlux, SMono} +import reactor.core.scheduler.Schedulers + +import scala.jdk.CollectionConverters._ + +object EmailSetUpdatePerformer { + trait EmailUpdateResult + case class EmailUpdateSuccess(messageId: MessageId) extends EmailUpdateResult + case class EmailUpdateFailure(unparsedMessageId: UnparsedMessageId, e: Throwable) extends EmailUpdateResult { + def asMessageSetError: SetError = e match { + case e: IllegalArgumentException => SetError.invalidPatch(SetErrorDescription(s"Message $unparsedMessageId update is invalid: ${e.getMessage}")) + case _: MailboxNotFoundException => SetError.notFound(SetErrorDescription(s"Mailbox not found")) + case e: MessageNotFoundExeception => SetError.notFound(SetErrorDescription(s"Cannot find message with messageId: ${e.messageId.serialize()}")) + case _ => SetError.serverFail(SetErrorDescription(e.getMessage)) + } + } + case class EmailUpdateResults(results: Seq[EmailUpdateResult]) { + def updated: Option[Map[MessageId, Unit]] = + Option(results.flatMap{ + case result: EmailUpdateSuccess => Some(result.messageId, ()) + case _ => None + }.toMap) + .filter(_.nonEmpty) + + def notUpdated: Option[Map[UnparsedMessageId, SetError]] = + Option(results.flatMap{ + case failure: EmailUpdateFailure => Some((failure.unparsedMessageId, failure.asMessageSetError)) + case _ => None + }.toMap) + .filter(_.nonEmpty) + } +} + +class EmailSetUpdatePerformer @Inject() (serializer: EmailSetSerializer, + messageIdManager: MessageIdManager, + mailboxManager: MailboxManager, + messageIdFactory: MessageId.Factory) { + + def update(emailSetRequest: EmailSetRequest, mailboxSession: MailboxSession): SMono[EmailUpdateResults] = { + emailSetRequest.update + .filter(_.nonEmpty) + .map(update(_, mailboxSession)) + .getOrElse(SMono.just(EmailUpdateResults(Seq()))) + } + + private def update(updates: Map[UnparsedMessageId, JsObject], session: MailboxSession): SMono[EmailUpdateResults] = { + val validatedUpdates: List[Either[EmailUpdateFailure, (MessageId, ValidatedEmailSetUpdate)]] = updates + .map({ + case (unparsedMessageId, json) => EmailSet.parse(messageIdFactory)(unparsedMessageId) + .toEither + .left.map(e => EmailUpdateFailure(unparsedMessageId, e)) + .flatMap(id => serializer.deserializeEmailSetUpdate(json) + .asEither.left.map(e => new IllegalArgumentException(e.toString)) + .flatMap(_.validate) + .fold(e => Left(EmailUpdateFailure(unparsedMessageId, e)), + emailSetUpdate => Right((id, emailSetUpdate)))) + }) + .toList + val failures: List[EmailUpdateFailure] = validatedUpdates.flatMap({ + case Left(e) => Some(e) + case _ => None + }) + val validUpdates: List[(MessageId, ValidatedEmailSetUpdate)] = validatedUpdates.flatMap({ + case Right(pair) => Some(pair) + case _ => None + }) + + for { + updates <- SFlux.fromPublisher(messageIdManager.messagesMetadata(validUpdates.map(_._1).asJavaCollection, session)) + .collectMultimap(metaData => metaData.getComposedMessageId.getMessageId) + .flatMap(metaData => { + doUpdate(validUpdates, metaData, session) + }) + } yield { + EmailUpdateResults(updates ++ failures) + } + } + + private def doUpdate(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { + val sameUpdate: Boolean = validUpdates.map(_._2).distinctBy(_.update).size == 1 + val singleMailbox: Boolean = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).toSet.size == 1 + + if (sameUpdate && singleMailbox && validUpdates.size > 3) { + val update: ValidatedEmailSetUpdate = validUpdates.map(_._2).headOption.get + val ranges: List[MessageRange] = asRanges(metaData) + val mailboxId: MailboxId = metaData.values.flatten.map(_.getComposedMessageId.getMailboxId).headOption.get + + if (update.update.isOnlyFlagAddition) { + updateFlagsByRange(mailboxId, update.update.keywordsToAdd.get.asFlags, ranges, metaData, FlagsUpdateMode.ADD, session) + } else if (update.update.isOnlyFlagRemoval) { + updateFlagsByRange(mailboxId, update.update.keywordsToRemove.get.asFlags, ranges, metaData, FlagsUpdateMode.REMOVE, session) + } else if (update.update.isOnlyMove) { + moveByRange(mailboxId, update, ranges, metaData, session) + } else { + updateEachMessage(validUpdates, metaData, session) + } + } else { + updateEachMessage(validUpdates, metaData, session) + } + } + + private def asRanges(metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]]) = + MessageRange.toRanges(metaData.values + .flatten.map(_.getComposedMessageId.getUid) + .toList.asJava) + .asScala.toList + + private def updateFlagsByRange(mailboxId: MailboxId, + flags: Flags, + ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + updateMode: FlagsUpdateMode, + session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { + val mailboxMono: SMono[MessageManager] = SMono.fromCallable(() => mailboxManager.getMailbox(mailboxId, session)) + + mailboxMono.flatMap(mailbox => updateByRange(ranges, metaData, + range => mailbox.setFlags(flags, updateMode, range, session))) + .subscribeOn(Schedulers.elastic()) + } + + private def moveByRange(mailboxId: MailboxId, + update: ValidatedEmailSetUpdate, + ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[EmailUpdateResult]] = { + val targetId: MailboxId = update.update.mailboxIds.get.value.headOption.get + + updateByRange(ranges, metaData, + range => mailboxManager.moveMessages(range, mailboxId, targetId, session)) + } + + private def updateByRange(ranges: List[MessageRange], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + operation: Consumer[MessageRange]): SMono[Seq[EmailUpdateResult]] = { + + SFlux.fromIterable(ranges) + .concatMap(range => { + val messageIds = metaData.filter(entry => entry._2.exists(composedId => range.includes(composedId.getComposedMessageId.getUid))) + .keys + .toSeq + SMono.fromCallable[Seq[EmailUpdateResult]](() => { + operation.accept(range) + messageIds.map(EmailUpdateSuccess) + }) + .onErrorResume(e => SMono.just(messageIds.map(id => EmailUpdateFailure(EmailSet.asUnparsed(id), e)))) + .subscribeOn(Schedulers.elastic()) + }) + .reduce(Seq(), _ ++ _) + } + + private def updateEachMessage(validUpdates: List[(MessageId, ValidatedEmailSetUpdate)], + metaData: Map[MessageId, Traversable[ComposedMessageIdWithMetaData]], + session: MailboxSession): SMono[Seq[EmailUpdateResult]] = + SFlux.fromIterable(validUpdates) + .concatMap[EmailUpdateResult]({ + case (messageId, updatePatch) => + updateSingleMessage(messageId, updatePatch, metaData.get(messageId).toList.flatten, session) + }) + .collectSeq() + + private def updateSingleMessage(messageId: MessageId, update: ValidatedEmailSetUpdate, storedMetaData: List[ComposedMessageIdWithMetaData], session: MailboxSession): SMono[EmailUpdateResult] = { + val mailboxIds: MailboxIds = MailboxIds(storedMetaData.map(metaData => metaData.getComposedMessageId.getMailboxId)) + val originFlags: Flags = storedMetaData + .foldLeft[Flags](new Flags())((flags: Flags, m: ComposedMessageIdWithMetaData) => { + flags.add(m.getFlags) + flags + }) + + if (mailboxIds.value.isEmpty) { + SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), MessageNotFoundExeception(messageId))) + } else { + updateFlags(messageId, update, mailboxIds, originFlags, session) + .flatMap { + case failure: EmailUpdateFailure => SMono.just[EmailUpdateResult](failure) + case _: EmailUpdateSuccess => updateMailboxIds(messageId, update, mailboxIds, session) + } + .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e))) + .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + } + } + + private def updateMailboxIds(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, session: MailboxSession): SMono[EmailUpdateResult] = { + val targetIds = update.mailboxIdsTransformation.apply(mailboxIds) + if (targetIds.equals(mailboxIds)) { + SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) + } else { + SMono.fromCallable(() => messageIdManager.setInMailboxes(messageId, targetIds.value.asJava, session)) + .subscribeOn(Schedulers.elastic()) + .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + .onErrorResume(e => SMono.just[EmailUpdateResult](EmailUpdateFailure(EmailSet.asUnparsed(messageId), e))) + .switchIfEmpty(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + } + } + + private def updateFlags(messageId: MessageId, update: ValidatedEmailSetUpdate, mailboxIds: MailboxIds, originalFlags: Flags, session: MailboxSession): SMono[EmailUpdateResult] = { + val newFlags = update.keywordsTransformation + .apply(LENIENT_KEYWORDS_FACTORY.fromFlags(originalFlags).get) + .asFlagsWithRecentAndDeletedFrom(originalFlags) + + if (newFlags.equals(originalFlags)) { + SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId)) + } else { + SMono.fromCallable(() => + messageIdManager.setFlags(newFlags, FlagsUpdateMode.REPLACE, messageId, ImmutableList.copyOf(mailboxIds.value.asJavaCollection), session)) + .subscribeOn(Schedulers.elastic()) + .`then`(SMono.just[EmailUpdateResult](EmailUpdateSuccess(messageId))) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
