This is an automated email from the ASF dual-hosted git repository.

jhelou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b98bd1f01a55871df233d3d718fc29cc09d58546
Author: Jean Helou <j...@xn--gml-cma.com>
AuthorDate: Fri Sep 27 00:08:00 2024 +0200

    [JAMES-4064] use non durable subscriptions for filter streams
    
    Since filter subscription names need to be unique, using durable 
subscriptions means the subscription will survive a server restart and
    pulsar has a limit on the total number of durable subscriptions. Eventually 
it will refuse new subscriptions.
---
 .../james/queue/pulsar/PulsarMailQueue.scala       | 25 +++++++++++++---------
 1 file changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
index e77c432af3..cd70ad5aa0 100644
--- 
a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
+++ 
b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala
@@ -46,7 +46,7 @@ import org.apache.james.queue.pulsar.EnqueueId.EnqueueId
 import org.apache.james.server.core.MailImpl
 import org.apache.mailet._
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException
-import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema, 
SubscriptionInitialPosition, SubscriptionType}
+import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema, 
SubscriptionInitialPosition, SubscriptionMode, SubscriptionType}
 import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
 
@@ -307,16 +307,20 @@ class PulsarMailQueue(
   private lazy val (dequeueControl: Control, dequeuePublisher: 
Publisher[MailQueueItem], scheduledConsumerControl: Control) = startDequeuing()
   private val enqueue: SourceQueueWithComplete[(Mail, Duration, 
Promise[Done])] = enqueueFlow.run()
   private val requeue: SourceQueueWithComplete[ProducerMessage[MessageAsJson]] 
= requeueFlow.run()
+
+  private val filterSubscription = Subscription("filter-subscription-" + 
config.name.asString() + "-" + UUID.randomUUID().toString)
+  private val scheduledFilterSubscription = 
Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + 
UUID.randomUUID().toString)
+
   private val filtersCommandFlowControl: Control =
     filtersCommandFlow(
       filterTopic,
-      Subscription("filter-subscription-" + config.name.asString() + "-" + 
UUID.randomUUID().toString),
+      filterSubscription,
       filterStage
     ).run()
   private val scheduledFiltersCommandFlowControl: Control =
     filtersCommandFlow(
       filterScheduledTopic,
-      Subscription("filter-scheduled-subscription-" + config.name.asString() + 
"-" + UUID.randomUUID().toString),
+      scheduledFilterSubscription,
       filterScheduledStage
     ).run()
 
@@ -344,14 +348,15 @@ class PulsarMailQueue(
 
     streams.source(() =>
       client.consumer(
-        ConsumerConfig(
-          subscriptionName = filterSubscription,
-          topics = Seq(topic),
-          subscriptionType = Some(SubscriptionType.Shared),
-          subscriptionInitialPosition = 
Some(SubscriptionInitialPosition.Earliest),
+          ConsumerConfig(
+            subscriptionName = filterSubscription,
+            topics = Seq(topic),
+            subscriptionType = Some(SubscriptionType.Shared),
+            subscriptionInitialPosition = 
Some(SubscriptionInitialPosition.Earliest),
+            subscriptionMode = Some(SubscriptionMode.NonDurable)
+          )
         )
-      )
-    ).map(message => decode[Filter](message.value))
+      ).map(message => decode[Filter](message.value))
       .divertTo(logInvalidFilterPayload, when = _.isLeft)
       .map(_.toOption.get)
       .via(debugLogger("filterFlow"))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to