This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e0d600ff  JAMES-3700: implement a dead letter queue for pulsar 
mailqueue #13  (#2355)
09e0d600ff is described below

commit 09e0d600ffdf82e8eaae448c199f610f22573243
Author: Matthieu Baechler <[email protected]>
AuthorDate: Thu Jul 18 14:06:05 2024 +0200

     JAMES-3700: implement a dead letter queue for pulsar mailqueue #13  (#2355)
---
 pom.xml                                            | 16 ++++++
 server/queue/queue-pulsar/pom.xml                  | 27 +++------
 .../org/apache/james/queue/pulsar/Filter.scala     | 12 ++--
 .../james/queue/pulsar/PulsarMailQueue.scala       | 65 +++++++++++++---------
 .../james/queue/pulsar/PulsarMailQueueTest.java    | 23 ++++----
 5 files changed, 84 insertions(+), 59 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9ee34392d5..af05247c23 100644
--- a/pom.xml
+++ b/pom.xml
@@ -615,6 +615,7 @@
         <jakarta.activation-api.version>2.1.2</jakarta.activation-api.version>
         <slf4j.version>2.0.13</slf4j.version>
 
+        <circe.version>0.14.9</circe.version>
         <dnsjava.version>3.5.2</dnsjava.version>
         <junit.jupiter.version>5.10.2</junit.jupiter.version>
         <junit.platform.version>1.10.2</junit.platform.version>
@@ -2318,6 +2319,21 @@
                 <artifactId>refined_${scala.base}</artifactId>
                 <version>0.11.2</version>
             </dependency>
+            <dependency>
+                <groupId>io.circe</groupId>
+                <artifactId>circe-core_${scala.base}</artifactId>
+                <version>${circe.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.circe</groupId>
+                <artifactId>circe-generic_${scala.base}</artifactId>
+                <version>${circe.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>io.circe</groupId>
+                <artifactId>circe-parser_${scala.base}</artifactId>
+                <version>${circe.version}</version>
+            </dependency>
             <dependency>
                 <groupId>io.cucumber</groupId>
                 <artifactId>cucumber-guice</artifactId>
diff --git a/server/queue/queue-pulsar/pom.xml 
b/server/queue/queue-pulsar/pom.xml
index 3e58bd508b..5027488e7c 100644
--- a/server/queue/queue-pulsar/pom.xml
+++ b/server/queue/queue-pulsar/pom.xml
@@ -100,21 +100,20 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>com.chuusai</groupId>
-            <artifactId>shapeless_2.13</artifactId>
+            <groupId>eu.timepit</groupId>
+            <artifactId>refined_${scala.base}</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.clever-cloud.pulsar4s</groupId>
-            <artifactId>pulsar4s-play-json_${scala.base}</artifactId>
-            <version>2.9.0</version>
+            <groupId>io.circe</groupId>
+            <artifactId>circe-core_${scala.base}</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.typesafe.play</groupId>
-            <artifactId>play-json_${scala.base}</artifactId>
+            <groupId>io.circe</groupId>
+            <artifactId>circe-generic_${scala.base}</artifactId>
         </dependency>
         <dependency>
-            <groupId>eu.timepit</groupId>
-            <artifactId>refined_${scala.base}</artifactId>
+            <groupId>io.circe</groupId>
+            <artifactId>circe-parser_${scala.base}</artifactId>
         </dependency>
         <dependency>
             <groupId>jakarta.annotation</groupId>
@@ -124,16 +123,6 @@
             <groupId>jakarta.inject</groupId>
             <artifactId>jakarta.inject-api</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.julienrf</groupId>
-            <artifactId>play-json-derived-codecs_${scala.base}</artifactId>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.chuusai</groupId>
-                    <artifactId>shapeless_2.13</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
index 62ae58e14a..c10f303bf3 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/Filter.scala
@@ -18,9 +18,9 @@
  ****************************************************************/
 package org.apache.james.queue.pulsar
 
-import julienrf.json.derived
 import com.sksamuel.pulsar4s.SequenceId
-import play.api.libs.json.{Json, OFormat}
+import io.circe.generic.semiauto._
+import io.circe.{Codec, Decoder, Encoder}
 
 private[pulsar] sealed trait Filter {
   def lastSequenceId: SequenceId
@@ -29,8 +29,12 @@ private[pulsar] sealed trait Filter {
 }
 
 private[pulsar] object Filter {
-  implicit val sequenceIdFormat: OFormat[SequenceId] = Json.format[SequenceId]
-  implicit val filterOFormat: OFormat[Filter] = derived.oformat()
+  implicit val sequenceIdFormat: Codec[SequenceId] =
+    Codec.from(
+      Decoder.decodeLong.map(SequenceId),
+      Encoder.encodeLong.contramap(_.value))
+
+  implicit val filterOFormat: Codec[Filter] = deriveCodec
 
   case class ByName(name: String, lastSequenceId: SequenceId) extends Filter {
     def matches(mailMetadata: MailMetadata): Boolean = mailMetadata.name == 
name
diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index f11fe39506..28ce68cd18 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -19,10 +19,11 @@
 
 package org.apache.james.queue.pulsar
 
+import cats.implicits.toShow
+
 import java.time.{Instant, ZonedDateTime, Duration => JavaDuration}
 import java.util.concurrent.TimeUnit
 import java.util.{Date, UUID}
-
 import org.apache.pekko.actor.{ActorRef, ActorSystem}
 import org.apache.pekko.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, 
Source, SourceQueueWithComplete, StreamConverters}
 import org.apache.pekko.stream.{Attributes, OverflowStrategy}
@@ -45,10 +46,9 @@ import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
 import org.apache.james.server.core.MailImpl
 import org.apache.mailet._
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
-import org.apache.pulsar.client.api.{Schema, SubscriptionInitialPosition, 
SubscriptionType}
+import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema, 
SubscriptionInitialPosition, SubscriptionType}
 import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
-import play.api.libs.json._
 
 import scala.concurrent._
 import scala.concurrent.duration._
@@ -56,16 +56,19 @@ import scala.jdk.CollectionConverters._
 import scala.jdk.DurationConverters._
 import scala.math.Ordered.orderingToOrdered
 import scala.util.Failure
+import io.circe.generic.semiauto._
+import io.circe.syntax.EncoderOps
+import io.circe.{Codec, Decoder, Encoder}
+import io.circe._
+import io.circe.parser._
 
 private[pulsar] object serializers {
-  implicit val headerFormat: Format[Header] = Json.format[Header]
-  implicit val enqueueIdFormat: Format[EnqueueId] = new Format[EnqueueId] {
-    override def writes(o: EnqueueId): JsValue = JsString(o.value)
-
-    override def reads(json: JsValue): JsResult[EnqueueId] =
-      json.validate[String].map(EnqueueId.apply).flatMap(_.fold(JsError.apply, 
JsSuccess(_)))
-  }
-  implicit val mailMetadataFormat: Format[MailMetadata] = 
Json.format[MailMetadata]
+  implicit val headerCodec: Codec[Header] = deriveCodec
+  implicit val enqueueIdCodec: Codec[EnqueueId] = Codec.from(
+    Decoder.decodeString.emap(EnqueueId.apply),
+    Encoder.encodeString.contramap(_.value)
+  )
+  implicit val mailMetadataCodec: Codec[MailMetadata] = deriveCodec
 }
 
 private[pulsar] object schemas {
@@ -150,7 +153,7 @@ class PulsarMailQueue(
     Source.fromPublisher(saveMimeMessage(mail.getMessage))
       .map { partsId =>
         val mailMetadata = MailMetadata.of(EnqueueId.generate(), mail, partsId)
-        val payload = Json.stringify(Json.toJson(mailMetadata))
+        val payload = mailMetadata.asJson.noSpaces
         (payload, duration, enqueued)
       }
 
@@ -196,7 +199,8 @@ class PulsarMailQueue(
         topics = Seq(topic),
         subscriptionType = Some(SubscriptionType.Shared),
         subscriptionInitialPosition = 
Some(SubscriptionInitialPosition.Earliest),
-        negativeAckRedeliveryDelay = Some(1.second)
+        negativeAckRedeliveryDelay = Some(1.second),
+        deadLetterPolicy = 
Some(DeadLetterPolicy.builder().maxRedeliverCount(1).initialSubscriptionName("dead-letter-sub-name").build())
       )
     )
 
@@ -240,12 +244,20 @@ class PulsarMailQueue(
       
.toMat(Sink.asPublisher[MailQueue.MailQueueItem](true).withAttributes(Attributes.inputBuffer(initial
 = 1, max = 1)))(Keep.both)
   }
 
+  private def decodeOrFail(message: CommittableMessage[String]): 
Source[(MailMetadata, CommittableMessage[MessageAsJson]), NotUsed] =
+    decode[MailMetadata](message.message.value).map(_ -> message) match {
+      case Right(value) => Source.single(value)
+      case Left(value) =>
+        logger.error("unable to parse message {}", value.show)
+        Source.lazyFuture(() => message.nack()).flatMapConcat(_ => 
Source.empty)
+    }
+
+
   private def filteringFlow(filterActor: ActorRef) = {
     implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS)
-    Flow.apply[CommittableMessage[String]].map(message =>
-      (Json.fromJson[MailMetadata](Json.parse(message.message.value)).get,
-        message)
-    ).ask[(Option[MailMetadata], Option[MimeMessagePartsId], 
CommittableMessage[String])](filterActor)
+    Flow.apply[CommittableMessage[String]]
+      .flatMapConcat(decodeOrFail)
+      .ask[(Option[MailMetadata], Option[MimeMessagePartsId], 
CommittableMessage[String])](filterActor)
       .flatMapConcat {
         case (None, Some(partsId), committableMessage) =>
           Source.lazyFuture(() => committableMessage.ack())
@@ -323,11 +335,12 @@ class PulsarMailQueue(
    * @see [[FilterStage]]
    */
   private def filtersCommandFlow(topic: Topic, filterSubscription: 
Subscription, filteringStage: ActorRef) = {
-    val logInvalidFilterPayload = Flow.apply[JsResult[Filter]]
-      .collectType[JsError]
-      .map(error => "unable to parse filter" + 
Json.prettyPrint(JsError.toJson(error)))
+    val logInvalidFilterPayload = Flow.apply[Either[Error, Filter]]
+      .collect { case Left(error) => error }
+      .map(error => "unable to parse filter " + error.show)
       .log("filterFlow")
-      .addAttributes(Attributes.logLevels(onElement = 
Attributes.LogLevels.Error)).to(Sink.ignore)
+      .addAttributes(Attributes.logLevels(onElement = 
Attributes.LogLevels.Error))
+      .to(Sink.ignore)
 
     streams.source(() =>
       client.consumer(
@@ -338,9 +351,9 @@ class PulsarMailQueue(
           subscriptionInitialPosition = 
Some(SubscriptionInitialPosition.Earliest),
         )
       )
-    ).map(message => Json.fromJson[Filter](Json.parse(message.value)))
-      .divertTo(logInvalidFilterPayload, when = _.isError)
-      .map(_.get)
+    ).map(message => decode[Filter](message.value))
+      .divertTo(logInvalidFilterPayload, when = _.isLeft)
+      .map(_.toOption.get)
       .via(debugLogger("filterFlow"))
       .to(Sink.foreach(filter => filteringStage ! filter))
   }
@@ -550,11 +563,11 @@ class PulsarMailQueue(
     // received through pulsar will be eliminated by the filter stage as
     // filters are stored in a set @see 
org.apache.james.queue.pulsar.FilterStage.filters
     filterStage ! filter
-    producer.send(Json.stringify(Json.toJson(filter)))
+    producer.send(filter.asJson.noSpaces)
   }
 
   private def jsonStringToMailMetadata(json: String): MailMetadata =
-    Json.fromJson[MailMetadata](Json.parse(json)).get
+    decode[MailMetadata](json).toOption.get
 
   /**
    * @inheritdoc
diff --git 
a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
 
b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
index 088088b169..4a2b468f4c 100644
--- 
a/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
+++ 
b/server/queue/queue-pulsar/src/test/java/org/apache/james/queue/pulsar/PulsarMailQueueTest.java
@@ -22,6 +22,7 @@ package org.apache.james.queue.pulsar;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -68,7 +69,6 @@ import com.github.fge.lambdas.Throwing;
 import com.sksamuel.pulsar4s.ConsumerMessage;
 
 import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 import scala.jdk.javaapi.OptionConverters;
 
 @Tag(Unstable.TAG)
@@ -150,7 +150,6 @@ public class PulsarMailQueueTest implements 
MailQueueContract, MailQueueMetricCo
                 system);
     }
 
-    @Disabled("JAMES-3700 We need to define a deadletter policy for the Pulsar 
MailQueue")
     @Test
     void badMessagesShouldNotAlterDelivery(DockerPulsarExtension.DockerPulsar 
pulsar) throws Exception {
         new JavaClient(pulsar.getConfiguration().brokerUri(),
@@ -161,11 +160,10 @@ public class PulsarMailQueueTest implements 
MailQueueContract, MailQueueMetricCo
                 .name("name")
                 .build());
 
-        MailQueue.MailQueueItem mail = 
Flux.from(getMailQueue().deQueue()).onErrorResume(e -> 
Mono.empty()).take(1).single().block();
+        MailQueue.MailQueueItem mail = 
Flux.from(getMailQueue().deQueue()).take(1).single().block();
         assertThat(mail.getMail().getName()).isEqualTo("name");
     }
 
-    @Disabled("JAMES-3700 We need to define a deadletter policy for the Pulsar 
MailQueue")
     @Test
     void 
badMessagesShouldBeMovedToADeadLetterTopic(DockerPulsarExtension.DockerPulsar 
pulsar) throws Exception {
         new JavaClient(pulsar.getConfiguration().brokerUri(),
@@ -176,13 +174,18 @@ public class PulsarMailQueueTest implements 
MailQueueContract, MailQueueMetricCo
                 .name("name")
                 .build());
 
-        try {
-            Flux.from(getMailQueue().deQueue()).take(1).single().block();
-        } catch (Exception e) {
-            // Expected to fail
-        }
+        getMailQueue().enQueue(defaultMail()
+                .name("name2")
+                .build());
+
+        Flux.from(getMailQueue().deQueue())
+                .delayElements(Duration.ofSeconds(3)) // retry is configured 
to 1 second
+                .take(2) // we need to have 1 retry before BAD goes to DLQ
+                .collectList()
+                .block();
+
         Optional<String> deadletterMessage = OptionConverters.toJava(new 
JavaClient(pulsar.getConfiguration().brokerUri(),
-                        String.format("persistent://%s/James-%s/dead-letter", 
pulsar.getConfiguration().namespace().asString(), mailQueueName.asString()))
+                        
String.format("persistent://%s/James-%s-subscription-%s-DLQ", 
pulsar.getConfiguration().namespace().asString(), mailQueueName.asString(), 
mailQueueName.asString()))
                         .consumeOne())
                 .map(ConsumerMessage::value);
         assertThat(deadletterMessage).contains("BAD");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to