quantranhong1999 commented on code in PR #1561:
URL: https://github.com/apache/james-project/pull/1561#discussion_r1222589280
##########
server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java:
##########
@@ -176,6 +170,11 @@ public Publisher<Void> enqueueReactive(Mail mail) {
return Mono.fromRunnable(Throwing.runnable(() ->
enQueue(mail)).sneakyThrow());
}
+ @Override
+ public Publisher<Void> enqueueReactive(Mail mail, Duration delay) {
+ return Mono.fromRunnable(Throwing.runnable(() -> enQueue(mail,
delay)).sneakyThrow());
+ }
+
Review Comment:
No need this method while we already have a default method for this IMO.
##########
server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala:
##########
@@ -666,5 +666,4 @@ class PulsarMailQueue(
Source.fromPublisher(doDelete())
}
-
Review Comment:
No need this change
##########
server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala:
##########
@@ -250,31 +253,67 @@ class EmailSubmissionSetMethod @Inject()(serializer:
EmailSubmissionSetSerialize
private def sendEmail(mailboxSession: MailboxSession,
request: EmailSubmissionCreationRequest):
SMono[(EmailSubmissionCreationResponse, MessageId)] =
- for {
+ for {
message <-
SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.next
.switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
submissionId = EmailSubmissionId.generate
message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
_ <- validate(mailboxSession)(message, envelope)
+ parameters = envelope.mailFrom.parameters
+ _ <- SMono.fromTry(validateFromParameters(parameters))
+ delay <- SMono.fromTry(retrieveDelay(parameters))
+ _ <- SMono.fromTry(validateDelay(delay))
+
mail = {
val mailImpl = MailImpl.builder()
.name(submissionId.value)
- .addRecipients(envelope.rcptTo.map(_.email).asJava)
+ .addRecipients(envelope.rcptTo.filter(m => m.parameters.isEmpty ||
!(m.parameters.get.contains(ParameterName.holdFor) ||
m.parameters.get.contains(ParameterName.holdUntil))).map(_.email).asJava)
.sender(envelope.mailFrom.email)
.addAttribute(new Attribute(MAIL_METADATA_USERNAME_ATTRIBUTE,
AttributeValue.of(mailboxSession.getUser.asString())))
.build()
mailImpl.setMessageNoCopy(message)
mailImpl
}
- _ <- SMono(queue.enqueueReactive(mail))
+
+ _ <- SMono(queue.enqueueReactive(mail, delay))
.`then`(SMono.fromCallable(() =>
LifecycleUtil.dispose(mail)).subscribeOn(Schedulers.boundedElastic()))
.`then`(SMono.just(submissionId))
+ sendAt = UTCDate(ZonedDateTime.now(clock).plus(delay))
} yield {
- EmailSubmissionCreationResponse(submissionId) -> request.emailId
+ EmailSubmissionCreationResponse(submissionId, sendAt) -> request.emailId
+ }
+
+ private def retrieveDelay(mailParameters: Option[Map[ParameterName,
Option[ParameterValue]]]): Try[Duration] = mailParameters match {
+ case None => Success(Duration.ZERO)
Review Comment:
Do you think `NO_DELAY` is more readable than `Duration.ZERO`?
##########
server/queue/queue-api/src/main/java/org/apache/james/queue/api/MailQueue.java:
##########
@@ -59,6 +63,7 @@
* </ul>
* </p>
*/
+@SuppressWarnings("checkstyle:EmptyLineSeparator")
Review Comment:
Why?
Please remove this.
##########
server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala:
##########
@@ -250,31 +253,67 @@ class EmailSubmissionSetMethod @Inject()(serializer:
EmailSubmissionSetSerialize
private def sendEmail(mailboxSession: MailboxSession,
request: EmailSubmissionCreationRequest):
SMono[(EmailSubmissionCreationResponse, MessageId)] =
- for {
+ for {
message <-
SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.next
.switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
submissionId = EmailSubmissionId.generate
message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
_ <- validate(mailboxSession)(message, envelope)
+ parameters = envelope.mailFrom.parameters
+ _ <- SMono.fromTry(validateFromParameters(parameters))
+ delay <- SMono.fromTry(retrieveDelay(parameters))
+ _ <- SMono.fromTry(validateDelay(delay))
+
mail = {
val mailImpl = MailImpl.builder()
.name(submissionId.value)
- .addRecipients(envelope.rcptTo.map(_.email).asJava)
+ .addRecipients(envelope.rcptTo.filter(m => m.parameters.isEmpty ||
!(m.parameters.get.contains(ParameterName.holdFor) ||
m.parameters.get.contains(ParameterName.holdUntil))).map(_.email).asJava)
.sender(envelope.mailFrom.email)
.addAttribute(new Attribute(MAIL_METADATA_USERNAME_ATTRIBUTE,
AttributeValue.of(mailboxSession.getUser.asString())))
.build()
mailImpl.setMessageNoCopy(message)
mailImpl
}
- _ <- SMono(queue.enqueueReactive(mail))
+
+ _ <- SMono(queue.enqueueReactive(mail, delay))
.`then`(SMono.fromCallable(() =>
LifecycleUtil.dispose(mail)).subscribeOn(Schedulers.boundedElastic()))
.`then`(SMono.just(submissionId))
+ sendAt = UTCDate(ZonedDateTime.now(clock).plus(delay))
} yield {
- EmailSubmissionCreationResponse(submissionId) -> request.emailId
+ EmailSubmissionCreationResponse(submissionId, sendAt) -> request.emailId
+ }
+
+ private def retrieveDelay(mailParameters: Option[Map[ParameterName,
Option[ParameterValue]]]): Try[Duration] = mailParameters match {
+ case None => Success(Duration.ZERO)
+ case Some(aMap) if aMap.contains(ParameterName.holdFor) =>
Try(Duration.ofSeconds(aMap(ParameterName.holdFor).getOrElse(ParameterValue("0")).value.toLong))
+ case Some(aMap) if aMap.contains(ParameterName.holdUntil) &&
aMap(ParameterName.holdUntil).nonEmpty =>
Try(Duration.between(LocalDateTime.now(clock),
LocalDateTime.parse(aMap(ParameterName.holdUntil).get.value, formatter)))
+ case Some(aMap) if aMap.contains(ParameterName.holdUntil) &&
aMap(ParameterName.holdUntil).isEmpty => Success(Duration.ZERO)
+ case _ => Success(Duration.ZERO)
}
+ def validateDelay(delay: Duration): Try[Duration] =
+ if (delay.getSeconds >= 0 && delay.getSeconds <=
SubmissionCapabilityFactory.maximumDelays.getSeconds) {
+ Success(delay)
+ } else {
+ Failure(new IllegalArgumentException("Invalid Arguments"))
+ }
+
+ def validateFromParameters(mailParameters: Option[Map[ParameterName,
Option[ParameterValue]]]): Try[Option[Map[ParameterName,
Option[ParameterValue]]]] = {
+ val keySet: Set[ParameterName] = mailParameters.getOrElse(Map()).keySet
+ val invalidEntries = keySet -- Set(ParameterName.holdFor,
ParameterName.holdUntil)
+ if (invalidEntries.isEmpty) {
+ if (keySet.contains(ParameterName.holdFor) &&
keySet.contains(ParameterName.holdUntil)) {
+ Failure(new IllegalArgumentException("Can't specify holdFor and
holdUntil simultaneously"))
+ } else {
+ Success(mailParameters)
+ }
+ } else {
+ Failure(new IllegalArgumentException("Unsupported parameterName"))
+ }
+ }
Review Comment:
This does not follow O (open for extension but closed for modification) in
SOLID IMO.
With this code, it is hard to extend more parameters in the future, and it
requires a lot of modifications when adding them (more nested if else IMO).
##########
server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala:
##########
@@ -250,31 +253,67 @@ class EmailSubmissionSetMethod @Inject()(serializer:
EmailSubmissionSetSerialize
private def sendEmail(mailboxSession: MailboxSession,
request: EmailSubmissionCreationRequest):
SMono[(EmailSubmissionCreationResponse, MessageId)] =
- for {
+ for {
message <-
SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.next
.switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
submissionId = EmailSubmissionId.generate
message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
_ <- validate(mailboxSession)(message, envelope)
+ parameters = envelope.mailFrom.parameters
+ _ <- SMono.fromTry(validateFromParameters(parameters))
+ delay <- SMono.fromTry(retrieveDelay(parameters))
+ _ <- SMono.fromTry(validateDelay(delay))
+
mail = {
val mailImpl = MailImpl.builder()
.name(submissionId.value)
- .addRecipients(envelope.rcptTo.map(_.email).asJava)
+ .addRecipients(envelope.rcptTo.filter(m => m.parameters.isEmpty ||
!(m.parameters.get.contains(ParameterName.holdFor) ||
m.parameters.get.contains(ParameterName.holdUntil))).map(_.email).asJava)
.sender(envelope.mailFrom.email)
.addAttribute(new Attribute(MAIL_METADATA_USERNAME_ATTRIBUTE,
AttributeValue.of(mailboxSession.getUser.asString())))
.build()
mailImpl.setMessageNoCopy(message)
mailImpl
}
- _ <- SMono(queue.enqueueReactive(mail))
+
+ _ <- SMono(queue.enqueueReactive(mail, delay))
.`then`(SMono.fromCallable(() =>
LifecycleUtil.dispose(mail)).subscribeOn(Schedulers.boundedElastic()))
.`then`(SMono.just(submissionId))
+ sendAt = UTCDate(ZonedDateTime.now(clock).plus(delay))
} yield {
- EmailSubmissionCreationResponse(submissionId) -> request.emailId
+ EmailSubmissionCreationResponse(submissionId, sendAt) -> request.emailId
+ }
+
+ private def retrieveDelay(mailParameters: Option[Map[ParameterName,
Option[ParameterValue]]]): Try[Duration] = mailParameters match {
+ case None => Success(Duration.ZERO)
+ case Some(aMap) if aMap.contains(ParameterName.holdFor) =>
Try(Duration.ofSeconds(aMap(ParameterName.holdFor).getOrElse(ParameterValue("0")).value.toLong))
+ case Some(aMap) if aMap.contains(ParameterName.holdUntil) &&
aMap(ParameterName.holdUntil).nonEmpty =>
Try(Duration.between(LocalDateTime.now(clock),
LocalDateTime.parse(aMap(ParameterName.holdUntil).get.value, formatter)))
+ case Some(aMap) if aMap.contains(ParameterName.holdUntil) &&
aMap(ParameterName.holdUntil).isEmpty => Success(Duration.ZERO)
+ case _ => Success(Duration.ZERO)
}
+ def validateDelay(delay: Duration): Try[Duration] =
+ if (delay.getSeconds >= 0 && delay.getSeconds <=
SubmissionCapabilityFactory.maximumDelays.getSeconds) {
+ Success(delay)
+ } else {
+ Failure(new IllegalArgumentException("Invalid Arguments"))
Review Comment:
The error message should be more precise IMO. What is invalid?
##########
server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/EmailSubmissionSetMethod.scala:
##########
@@ -250,31 +253,67 @@ class EmailSubmissionSetMethod @Inject()(serializer:
EmailSubmissionSetSerialize
private def sendEmail(mailboxSession: MailboxSession,
request: EmailSubmissionCreationRequest):
SMono[(EmailSubmissionCreationResponse, MessageId)] =
- for {
+ for {
message <-
SFlux(messageIdManager.getMessagesReactive(List(request.emailId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.next
.switchIfEmpty(SMono.error(MessageNotFoundException(request.emailId)))
submissionId = EmailSubmissionId.generate
message <- SMono.fromTry(toMimeMessage(submissionId.value, message))
envelope <- SMono.fromTry(resolveEnvelope(message, request.envelope))
_ <- validate(mailboxSession)(message, envelope)
+ parameters = envelope.mailFrom.parameters
+ _ <- SMono.fromTry(validateFromParameters(parameters))
+ delay <- SMono.fromTry(retrieveDelay(parameters))
+ _ <- SMono.fromTry(validateDelay(delay))
+
mail = {
val mailImpl = MailImpl.builder()
.name(submissionId.value)
- .addRecipients(envelope.rcptTo.map(_.email).asJava)
+ .addRecipients(envelope.rcptTo.filter(m => m.parameters.isEmpty ||
!(m.parameters.get.contains(ParameterName.holdFor) ||
m.parameters.get.contains(ParameterName.holdUntil))).map(_.email).asJava)
.sender(envelope.mailFrom.email)
.addAttribute(new Attribute(MAIL_METADATA_USERNAME_ATTRIBUTE,
AttributeValue.of(mailboxSession.getUser.asString())))
.build()
mailImpl.setMessageNoCopy(message)
mailImpl
}
- _ <- SMono(queue.enqueueReactive(mail))
+
+ _ <- SMono(queue.enqueueReactive(mail, delay))
.`then`(SMono.fromCallable(() =>
LifecycleUtil.dispose(mail)).subscribeOn(Schedulers.boundedElastic()))
.`then`(SMono.just(submissionId))
+ sendAt = UTCDate(ZonedDateTime.now(clock).plus(delay))
} yield {
- EmailSubmissionCreationResponse(submissionId) -> request.emailId
+ EmailSubmissionCreationResponse(submissionId, sendAt) -> request.emailId
+ }
+
+ private def retrieveDelay(mailParameters: Option[Map[ParameterName,
Option[ParameterValue]]]): Try[Duration] = mailParameters match {
+ case None => Success(Duration.ZERO)
+ case Some(aMap) if aMap.contains(ParameterName.holdFor) =>
Try(Duration.ofSeconds(aMap(ParameterName.holdFor).getOrElse(ParameterValue("0")).value.toLong))
Review Comment:
What is `ParameterValue("0")`?
IMO something like this would be more readable:
```
aMap(ParameterName.holdFor)
.map(string -> long)
.getOrElse(NO_DELAY)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]