Would I need to also use

maxTotalReceiverQueueSizeAcrossPartitions(1) ?


On Fri, Nov 18, 2022 at 7:42 PM Tecno Brain <cerebrotecnolog...@gmail.com>
wrote:

> Hello All,
>
> We have a Pulsar cluster with a topic that uses 3 partitions.
> We are using Pulsar as a queue, a shared subscription.
> The number of consumers goes up and down constantly (we look at the size
> of the queue to scale up and down our number of consumers)
>
> We have observed that some messages are consumed extremely delayed (even
> when the median age is just a few milliseconds)
>
> The median age of the messages received is usually just a few
> milliseconds, with some probably in a few seconds (caused by we need to
> scale up our consumer cluster)
>
> But some messages are consumed minutes or even hours after they have been
> queued.
>
> Our app usually starts consuming messages with a large age (redeliveries
> =0) and then it eventually processes messages with an age of just a few
> milliseconds.
>
> I found that whenever the age was very very large (hours),  it was  a
> redelivery. (therefore, the large age at least has an explanation)
>
> My guess is that a new consumer is processing the entire history in the
> topic, looking for messages that had not been acknowledged. (since the
> initial position is set to Earliest)
>
> a) How can I avoid these "gaps" of messages apparently not being consumed
> timely?  Why are these not being delivered to the consumers already up and
> running?
>
> b) How can I reduce the time taken for a message to be redelivered?
>
> Here is how we create the Pulsar Consumer:
>
>  consumer =
>         client
>             .newConsumer(JSONSchema.of(Payload.class))
>             .topic(requestQueue)
>             .subscriptionType(SubscriptionType.Shared)
>
> .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
>             .subscriptionName("queue-sample")
>             .receiverQueueSize(1)
>             .subscribe();
>
> Any pointers are welcome.
> Thank you
>
>

Reply via email to