Hi Tecno Brain, > We have observed that some messages are consumed extremely delayed (even when > the median age is just a few milliseconds)
It sounds like you might have some consumer applications that are shutting down without acknowledging some messages. This is not necessarily a problem, but it likely explains your observations. > Our app usually starts consuming messages with a large age (redeliveries =0) > and then it eventually processes messages with an age of just a few > milliseconds. This is how pulsar consumers work. For a shared subscription, when a consumer connects, the broker delivers the oldest unacknowledged message that is not already delivered to a connected consumer. Note that when a consumer receives a message, that message will only be sent to another consumer if one of the following happens: 1. The consumer negatively acknowledges the message. (The ack timeout triggers a negative acknowledgement in the client.) 2. The consumer disconnects from the broker. (Note that this can currently happen due to load balancing.) When a message is acknowledged, subsequent redelivery would generally be considered a bug. > My guess is that a new consumer is processing the entire history in the > topic, looking for messages that had not been acknowledged. (since the > initial position is set to Earliest) If you did not reset the cursor, the new consumer is not going to the earliest position. > a) How can I avoid these "gaps" of messages apparently not being consumed > timely? Why are these not being delivered to the consumers already up and > running? My first note is that you'll want to understand how the receiverQueueSize impacts your workload. The receiverQueueSize is the number of messages a consumer fetches and buffers internally. Unless conditions 1 or 2 above are met, these messages cannot get delivered to another consumer. Note also that the ack timeout does not start counting until a message is received by the client application calling "receive". As such, an application that stops processing messages but is still able to keep the broker connection alive will hold on to these buffered messages. If you would like an application to only process one message at a time, you can set receiverQueueSize to 0. That can decrease processing efficiency and also requires using unbatched messages, which removes some key performance enhancements. Here is some documentation on the setting: https://pulsar.apache.org/docs/2.10.x/client-libraries-java#configure-consumer. In order to debug a bit more, it might be helpful to look at the topic's "internal stats". You can do this by running one of the following: pulsar-admin topics stats-internal <topic> or pulsar-admin topics partitioned-stats-internal <topic> The result will include a map of all cursors (basically subscriptions) for the topic, and in those maps, you'll see "individuallyDeletedMessages". Those are the ranges of messages that have been delivered. You'll likely see that you have holes in your subscription. These holes will map to messages that are getting redelivered. > maxTotalReceiverQueueSizeAcrossPartitions(1) ? This could help, but it will probably increase the latency associated with calling `receive`. - Michael On Fri, Nov 18, 2022 at 6:50 PM Tecno Brain <cerebrotecnolog...@gmail.com> wrote: > > Would I need to also use > > maxTotalReceiverQueueSizeAcrossPartitions(1) ? > > > On Fri, Nov 18, 2022 at 7:42 PM Tecno Brain <cerebrotecnolog...@gmail.com> > wrote: >> >> Hello All, >> >> We have a Pulsar cluster with a topic that uses 3 partitions. >> We are using Pulsar as a queue, a shared subscription. >> The number of consumers goes up and down constantly (we look at the size of >> the queue to scale up and down our number of consumers) >> >> We have observed that some messages are consumed extremely delayed (even >> when the median age is just a few milliseconds) >> >> The median age of the messages received is usually just a few milliseconds, >> with some probably in a few seconds (caused by we need to scale up our >> consumer cluster) >> >> But some messages are consumed minutes or even hours after they have been >> queued. >> >> Our app usually starts consuming messages with a large age (redeliveries =0) >> and then it eventually processes messages with an age of just a few >> milliseconds. >> >> I found that whenever the age was very very large (hours), it was a >> redelivery. (therefore, the large age at least has an explanation) >> >> My guess is that a new consumer is processing the entire history in the >> topic, looking for messages that had not been acknowledged. (since the >> initial position is set to Earliest) >> >> a) How can I avoid these "gaps" of messages apparently not being consumed >> timely? Why are these not being delivered to the consumers already up and >> running? >> >> b) How can I reduce the time taken for a message to be redelivered? >> >> Here is how we create the Pulsar Consumer: >> >> consumer = >> client >> .newConsumer(JSONSchema.of(Payload.class)) >> .topic(requestQueue) >> .subscriptionType(SubscriptionType.Shared) >> >> .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) >> .subscriptionName("queue-sample") >> .receiverQueueSize(1) >> .subscribe(); >> >> Any pointers are welcome. >> Thank you >>