This is an automated email from the ASF dual-hosted git repository. jhelou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b98bd1f01a55871df233d3d718fc29cc09d58546 Author: Jean Helou <j...@xn--gml-cma.com> AuthorDate: Fri Sep 27 00:08:00 2024 +0200 [JAMES-4064] use non durable subscriptions for filter streams Since filter subscription names need to be unique, using durable subscriptions means the subscription will survive a server restart and pulsar has a limit on the total number of durable subscriptions. Eventually it will refuse new subscriptions. --- .../james/queue/pulsar/PulsarMailQueue.scala | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala index e77c432af3..cd70ad5aa0 100644 --- a/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala +++ b/server/queue/queue-pulsar/src/main/scala/org/apache/james/queue/pulsar/PulsarMailQueue.scala @@ -46,7 +46,7 @@ import org.apache.james.queue.pulsar.EnqueueId.EnqueueId import org.apache.james.server.core.MailImpl import org.apache.mailet._ import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException -import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema, SubscriptionInitialPosition, SubscriptionType} +import org.apache.pulsar.client.api.{DeadLetterPolicy, Schema, SubscriptionInitialPosition, SubscriptionMode, SubscriptionType} import org.reactivestreams.Publisher import org.slf4j.LoggerFactory @@ -307,16 +307,20 @@ class PulsarMailQueue( private lazy val (dequeueControl: Control, dequeuePublisher: Publisher[MailQueueItem], scheduledConsumerControl: Control) = startDequeuing() private val enqueue: SourceQueueWithComplete[(Mail, Duration, Promise[Done])] = enqueueFlow.run() private val requeue: SourceQueueWithComplete[ProducerMessage[MessageAsJson]] = requeueFlow.run() + + private val filterSubscription = Subscription("filter-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString) + private val scheduledFilterSubscription = Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString) + private val filtersCommandFlowControl: Control = filtersCommandFlow( filterTopic, - Subscription("filter-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString), + filterSubscription, filterStage ).run() private val scheduledFiltersCommandFlowControl: Control = filtersCommandFlow( filterScheduledTopic, - Subscription("filter-scheduled-subscription-" + config.name.asString() + "-" + UUID.randomUUID().toString), + scheduledFilterSubscription, filterScheduledStage ).run() @@ -344,14 +348,15 @@ class PulsarMailQueue( streams.source(() => client.consumer( - ConsumerConfig( - subscriptionName = filterSubscription, - topics = Seq(topic), - subscriptionType = Some(SubscriptionType.Shared), - subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest), + ConsumerConfig( + subscriptionName = filterSubscription, + topics = Seq(topic), + subscriptionType = Some(SubscriptionType.Shared), + subscriptionInitialPosition = Some(SubscriptionInitialPosition.Earliest), + subscriptionMode = Some(SubscriptionMode.NonDurable) + ) ) - ) - ).map(message => decode[Filter](message.value)) + ).map(message => decode[Filter](message.value)) .divertTo(logInvalidFilterPayload, when = _.isLeft) .map(_.toOption.get) .via(debugLogger("filterFlow")) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org