Would I need to also use maxTotalReceiverQueueSizeAcrossPartitions(1) ?
On Fri, Nov 18, 2022 at 7:42 PM Tecno Brain <cerebrotecnolog...@gmail.com> wrote: > Hello All, > > We have a Pulsar cluster with a topic that uses 3 partitions. > We are using Pulsar as a queue, a shared subscription. > The number of consumers goes up and down constantly (we look at the size > of the queue to scale up and down our number of consumers) > > We have observed that some messages are consumed extremely delayed (even > when the median age is just a few milliseconds) > > The median age of the messages received is usually just a few > milliseconds, with some probably in a few seconds (caused by we need to > scale up our consumer cluster) > > But some messages are consumed minutes or even hours after they have been > queued. > > Our app usually starts consuming messages with a large age (redeliveries > =0) and then it eventually processes messages with an age of just a few > milliseconds. > > I found that whenever the age was very very large (hours), it was a > redelivery. (therefore, the large age at least has an explanation) > > My guess is that a new consumer is processing the entire history in the > topic, looking for messages that had not been acknowledged. (since the > initial position is set to Earliest) > > a) How can I avoid these "gaps" of messages apparently not being consumed > timely? Why are these not being delivered to the consumers already up and > running? > > b) How can I reduce the time taken for a message to be redelivered? > > Here is how we create the Pulsar Consumer: > > consumer = > client > .newConsumer(JSONSchema.of(Payload.class)) > .topic(requestQueue) > .subscriptionType(SubscriptionType.Shared) > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) > .subscriptionName("queue-sample") > .receiverQueueSize(1) > .subscribe(); > > Any pointers are welcome. > Thank you > >