This is an automated email from the ASF dual-hosted git repository.
matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 09e0d600ff JAMES-3700: implement a dead letter queue for pulsar
mailqueue #13 (#2355)
09e0d600ff is described below
commit 09e0d600ffdf82e8eaae448c199f610f22573243
Author: Matthieu Baechler <[email protected]>
AuthorDate: Thu Jul 18 14:06:05 2024 +0200
JAMES-3700: implement a dead letter queue for pulsar mailqueue #13 (#2355)
---
pom.xml | 16 ++++++
server/queue/queue-pulsar/pom.xml | 27 +++------
.../org/apache/james/queue/pulsar/Filter.scala | 12 ++--
.../james/queue/pulsar/PulsarMailQueue.scala | 65 +++++++++++++---------
.../james/queue/pulsar/PulsarMailQueueTest.java | 23 ++++----
5 files changed, 84 insertions(+), 59 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9ee34392d5..af05247c23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -615,6 +615,7 @@
<jakarta.activation-api.version>2.1.2</jakarta.activation-api.version>
<slf4j.version>2.0.13</slf4j.version>
+ <circe.version>0.14.9</circe.version>
<dnsjava.version>3.5.2</dnsjava.version>
<junit.jupiter.version>5.10.2</junit.jupiter.version>
<junit.platform.version>1.10.2</junit.platform.version>
@@ -2318,6 +2319,21 @@
<artifactId>refined_${scala.base}</artifactId>
<version>0.11.2</version>
</dependency>
+ <dependency>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-core_${scala.base}</artifactId>
+ <version>${circe.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-generic_${scala.base}</artifactId>
+ <version>${circe.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-parser_${scala.base}</artifactId>
+ <version>${circe.version}</version>
+ </dependency>
<dependency>
<groupId>io.cucumber</groupId>
<artifactId>cucumber-guice</artifactId>
diff --git a/server/queue/queue-pulsar/pom.xml
b/server/queue/queue-pulsar/pom.xml
index 3e58bd508b..5027488e7c 100644
--- a/server/queue/queue-pulsar/pom.xml
+++ b/server/queue/queue-pulsar/pom.xml
@@ -100,21 +100,20 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.chuusai</groupId>
- <artifactId>shapeless_2.13</artifactId>
+ <groupId>eu.timepit</groupId>
+ <artifactId>refined_${scala.base}</artifactId>
</dependency>
<dependency>
- <groupId>com.clever-cloud.pulsar4s</groupId>
- <artifactId>pulsar4s-play-json_${scala.base}</artifactId>
- <version>2.9.0</version>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-core_${scala.base}</artifactId>
</dependency>
<dependency>
- <groupId>com.typesafe.play</groupId>
- <artifactId>play-json_${scala.base}</artifactId>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-generic_${scala.base}</artifactId>
</dependency>
<dependency>
- <groupId>eu.timepit</groupId>
- <artifactId>refined_${scala.base}</artifactId>
+ <groupId>io.circe</groupId>
+ <artifactId>circe-parser_${scala.base}</artifactId>
</dependency>
<dependency>
<groupId>jakarta.annotation</groupId>
@@ -124,16 +123,6 @@
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
- <dependency>
- <groupId>org.julienrf</groupId>
- <artifactId>play-json-derived-codecs_${scala.base}</artifactId>
- <exclusions>
- <exclusion>
- <groupId>com.chuusai</groupId>
- <artifactId>shapeless_2.13</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
index 62ae58e14a..c10f303bf3 100644
---
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
+++
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
@@ -18,9 +18,9 @@
****************************************************************/
package org.apache.james.queue.pulsar
-import julienrf.json.derived
import com.sksamuel.pulsar4s.SequenceId
-import play.api.libs.json.{Json, OFormat}
+import io.circe.generic.semiauto._
+import io.circe.{Codec, Decoder, Encoder}
private[pulsar] sealed trait Filter {
def lastSequenceId: SequenceId
@@ -29,8 +29,12 @@ private[pulsar] sealed trait Filter {
}
private[pulsar] object Filter {
- implicit val sequenceIdFormat: OFormat[SequenceId] = Json.format[SequenceId]
- implicit val filterOFormat: OFormat[Filter] = derived.oformat()
+ implicit val sequenceIdFormat: Codec[SequenceId] =
+ Codec.from(
+ Decoder.decodeLong.map(SequenceId),
+ Encoder.encodeLong.contramap(_.value))
+
+ implicit val filterOFormat: Codec[Filter] = deriveCodec
case class ByName(name: String, lastSequenceId: SequenceId) extends Filter {
def matches(mailMetadata: MailMetadata): Boolean = mailMetadata.name ==
name
diff --git
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index f11fe39506..28ce68cd18 100644
---
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -19,10 +19,11 @@
package org.apache.james.queue.pulsar
+import cats.implicits.toShow
+
import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
import java.util.concurrent.TimeUnit
import java.util.{Date, UUID}
-
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink,
Source, SourceQueueWithComplete, StreamConverters}
import org.apache.pekko.stream.{Attributes, OverflowStrategy}
@@ -45,10 +46,9 @@ import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
import org.apache.james.server.core.MailImpl
import org.apache.mailet._
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
-import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition,
SubscriptionType}
+import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema,
SubscriptionInitialPosition, SubscriptionType}
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
-import play.api.libs.json._
import scala.concurrent._
import scala.concurrent.duration._
@@ -56,16 +56,19 @@ import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.math.Ordered.orderingToOrdered
import scala.util.Failure
+import io.circe.generic.semiauto._
+import io.circe.syntax.EncoderOps
+import io.circe.{Codec, Decoder, Encoder}
+import io.circe._
+import io.circe.parser._
private[pulsar] object serializers {
- implicit val headerFormat: Format[Header] = Json.format[Header]
- implicit val enqueueIdFormat: Format[EnqueueId] = new Format[EnqueueId] {
- override def writes(o: EnqueueId): JsValue = JsString(o.value)
-
- override def reads(json: JsValue): JsResult[EnqueueId] =
- json.validate[String].map(EnqueueId.apply).flatMap(_.fold(JsError.apply,
JsSuccess(_)))
- }
- implicit val mailMetadataFormat: Format[MailMetadata] =
Json.format[MailMetadata]
+ implicit val headerCodec: Codec[Header] = deriveCodec
+ implicit val enqueueIdCodec: Codec[EnqueueId] = Codec.from(
+ Decoder.decodeString.emap(EnqueueId.apply),
+ Encoder.encodeString.contramap(_.value)
+ )
+ implicit val mailMetadataCodec: Codec[MailMetadata] = deriveCodec
}
private[pulsar] object schemas {
@@ -150,7 +153,7 @@ class PulsarMailQueue(
Source.fromPublisher(saveMimeMessage(mail.getMessage))
.map { partsId =>
val mailMetadata = MailMetadata.of(EnqueueId.generate(), mail, partsId)
- val payload = Json.stringify(Json.toJson(mailMetadata))
+ val payload = mailMetadata.asJson.noSpaces
(payload, duration, enqueued)
}
@@ -196,7 +199,8 @@ class PulsarMailQueue(
topics = Seq(topic),
subscriptionType = Some(SubscriptionType.Shared),
subscriptionInitialPosition =
Some(SubscriptionInitialPosition.Earliest),
- negativeAckRedeliveryDelay = Some(1.second)
+ negativeAckRedeliveryDelay = Some(1.second),
+ deadLetterPolicy =
Some(DeadLetterPolicy.builder().maxRedeliverCount(1).initialSubscriptionName("dead-letter-sub-name").build())
)
)
@@ -240,12 +244,20 @@ class PulsarMailQueue(
.toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial
= 1, max = 1)))(Keep.both)
}
+ private def decodeOrFail(message: CommittableMessage[String]):
Source[(MailMetadata, CommittableMessage[MessageAsJson]), NotUsed] =
+ decode[MailMetadata](message.message.value).map(_ -> message) match {
+ case Right(value) => Source.single(value)
+ case Left(value) =>
+ logger.error("unable to parse message {}", value.show)
+ Source.lazyFuture(() => message.nack()).flatMapConcat(_ =>
Source.empty)
+ }
+
+
private def filteringFlow(filterActor: ActorRef) = {
implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS)
- Flow.apply[CommittableMessage[String]].map(message =>
- (Json.fromJson[MailMetadata](Json.parse(message.message.value)).get,
- message)
- ).ask[(Option[MailMetadata], Option[MimeMessagePartsId],
CommittableMessage[String])](filterActor)
+ Flow.apply[CommittableMessage[String]]
+ .flatMapConcat(decodeOrFail)
+ .ask[(Option[MailMetadata], Option[MimeMessagePartsId],
CommittableMessage[String])](filterActor)
.flatMapConcat {
case (None, Some(partsId), committableMessage) =>
Source.lazyFuture(() => committableMessage.ack())
@@ -323,11 +335,12 @@ class PulsarMailQueue(
* @see [[FilterStage]]
*/
private def filtersCommandFlow(topic: Topic, filterSubscription:
Subscription, filteringStage: ActorRef) = {
- val logInvalidFilterPayload = Flow.apply[JsResult[Filter]]
- .collectType[JsError]
- .map(error => "unable to parse filter" +
Json.prettyPrint(JsError.toJson(error)))
+ val logInvalidFilterPayload = Flow.apply[Either[Error, Filter]]
+ .collect { case Left(error) => error }
+ .map(error => "unable to parse filter " + error.show)
.log("filterFlow")
- .addAttributes(Attributes.logLevels(onElement =
Attributes.LogLevels.Error)).to(Sink.ignore)
+ .addAttributes(Attributes.logLevels(onElement =
Attributes.LogLevels.Error))
+ .to(Sink.ignore)
streams.source(() =>
client.consumer(
@@ -338,9 +351,9 @@ class PulsarMailQueue(
subscriptionInitialPosition =
Some(SubscriptionInitialPosition.Earliest),
)
)
- ).map(message => Json.fromJson[Filter](Json.parse(message.value)))
- .divertTo(logInvalidFilterPayload, when = _.isError)
- .map(_.get)
+ ).map(message => decode[Filter](message.value))
+ .divertTo(logInvalidFilterPayload, when = _.isLeft)
+ .map(_.toOption.get)
.via(debugLogger("filterFlow"))
.to(Sink.foreach(filter => filteringStage ! filter))
}
@@ -550,11 +563,11 @@ class PulsarMailQueue(
// received through pulsar will be eliminated by the filter stage as
// filters are stored in a set @see
org.apache.james.queue.pulsar.FilterStage.filters
filterStage ! filter
- producer.send(Json.stringify(Json.toJson(filter)))
+ producer.send(filter.asJson.noSpaces)
}
private def jsonStringToMailMetadata(json: String): MailMetadata =
- Json.fromJson[MailMetadata](Json.parse(json)).get
+ decode[MailMetadata](json).toOption.get
/**
* @inheritdoc
diff --git
a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 088088b169..4a2b468f4c 100644
---
a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++
b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.pulsar;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -68,7 +69,6 @@ import com.github.fge.lambdas.Throwing;
import com.sksamuel.pulsar4s.ConsumerMessage;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
import scala.jdk.javaapi.OptionConverters;
@Tag(Unstable.TAG)
@@ -150,7 +150,6 @@ public class PulsarMailQueueTest implements
MailQueueContract, MailQueueMetricCo
system);
}
- @Disabled("JAMES-3700 We need to define a deadletter policy for the Pulsar
MailQueue")
@Test
void badMessagesShouldNotAlterDelivery(DockerPulsarExtension.DockerPulsar
pulsar) throws Exception {
new JavaClient(pulsar.getConfiguration().brokerUri(),
@@ -161,11 +160,10 @@ public class PulsarMailQueueTest implements
MailQueueContract, MailQueueMetricCo
.name("name")
.build());
- MailQueue.MailQueueItem mail =
Flux.from(getMailQueue().deQueue()).onErrorResume(e ->
Mono.empty()).take(1).single().block();
+ MailQueue.MailQueueItem mail =
Flux.from(getMailQueue().deQueue()).take(1).single().block();
assertThat(mail.getMail().getName()).isEqualTo("name");
}
- @Disabled("JAMES-3700 We need to define a deadletter policy for the Pulsar
MailQueue")
@Test
void
badMessagesShouldBeMovedToADeadLetterTopic(DockerPulsarExtension.DockerPulsar
pulsar) throws Exception {
new JavaClient(pulsar.getConfiguration().brokerUri(),
@@ -176,13 +174,18 @@ public class PulsarMailQueueTest implements
MailQueueContract, MailQueueMetricCo
.name("name")
.build());
- try {
- Flux.from(getMailQueue().deQueue()).take(1).single().block();
- } catch (Exception e) {
- // Expected to fail
- }
+ getMailQueue().enQueue(defaultMail()
+ .name("name2")
+ .build());
+
+ Flux.from(getMailQueue().deQueue())
+ .delayElements(Duration.ofSeconds(3)) // retry is configured
to 1 second
+ .take(2) // we need to have 1 retry before BAD goes to DLQ
+ .collectList()
+ .block();
+
Optional<String> deadletterMessage = OptionConverters.toJava(new
JavaClient(pulsar.getConfiguration().brokerUri(),
- String.format("persistent://%s/James-%s/dead-letter",
pulsar.getConfiguration().namespace().asString(), mailQueueName.asString()))
+
String.format("persistent://%s/James-%s-subscription-%s-DLQ",
pulsar.getConfiguration().namespace().asString(), mailQueueName.asString(),
mailQueueName.asString()))
.consumeOne())
.map(ConsumerMessage::value);
assertThat(deadletterMessage).contains("BAD");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]