Hi Qpid users / experts, I need to limit the number of consumers concurrently processing messages considered to be in the same group, across multiple queues, and was wondering if anyone has ideas about how to do it. We’re using the Java broker and client, and have multiple queues, each with multiple listeners, each listener’s session listening to multiple queues. Some messages are associated with groups, and for a given group we want at most K listeners processing messages from the group at any given time. The messages are enqueued to multiple queues, and it’s possible for messages from the same group to be in different queues.
If messages in the same group can go into only one queue, then the message groups feature will give us what we need (it’d work directly with K = 1 and with K > 1 we can tweak the grouping value, e.g., hash it to one of 1 to K and append the number to the grouping value). But since messages considered to be in the same group can be in different queues, the feature is not enough for our case. Since it looks like the broker side doesn’t have what we need exactly, we’re thinking about how to do this from the client side. We’re thinking along the lines of having some semaphore object per group, shared between the different listeners, and whenever a listener receives a message, it will try to acquire a permit from the semaphore for that group. If it’s able to acquire a permit, then process the message and release the permit upon completion. If it’s not able to acquire a permit, reenqueue the message in some way. For example: 1) Reenqueue the message back to the same queue so it can be retried right away. But this would lead to a lot of churning when permits are not available for a while, so we’ve ruled this out. 2) Same as #1, but sleep for a short while first so we wouldn’t have the high churning. But since each listener’s session is responsible for multiple queues, this can decrease the throughput of other queues. 3) Enqueue the message to a special queue that stores messages waiting for a permit, a queue that is not listened to by anyone. A periodic sweeper job will wake up once in a while, say every minute, and pulls all the messages off of the waiting queue and reenqueues them to their respective original queues. But throughput would be limited by sweeper interval. 4) Like #3, but don’t use a periodic sweeper. Instead, when a listener that was able to acquire a permit is done with a message, look up the next waiting message of the same group in the waiting queue using a JMS selector, and reenqueue it back to the original queue. But look up performance might be bad if queue depth is high. Each of these has some drawbacks. Does anyone have ideas about other possible approaches (maybe entirely different from the above), or has done something similar? Thanks, Helen
