Hi folks,
I am struggling trying to configure the java broker (0.32) with message group
queues with the documentation
(https://qpid.apache.org/releases/qpid-0.32/java-broker/book/index.html)
Can anyone shed some light on how to configure message group queues in the java
broker and nodejs clients/producers (https://github.com/postwait/node-amqp)?
Any help or ideas would be much appreciated.
Note I can and have been passing messages from the producer to the consumers
just fine. Now I'd like to force certain messages to be consumed/ordered by
using message group queues.
I'm using:
QPID java broker 0.32 configured with a json file where I setup the
group_header_key to "group_id"
I set the context group_header_key using the broker management interface.
{
"id" : "5c6aaf7f-0b21-4b2e-be6a-effec29202bf",
"name" : "alert.condition.detected",
"type" : "standard",
"durable" : true,
"lifetimePolicy" : "PERMANENT",
"context" : {
"qpid.group_header_key" : "group_id"
},
"exclusive" : "NONE",
"messageDurability" : "DEFAULT",
"owner" : null,
"lastUpdatedBy" : null,
"lastUpdatedTime" : 0,
"createdBy" : null,
"createdTime" : 0,
"bindings" : [ {
"exchange" : "8128f935-e5e5-4953-9e47-6c0fc3cfa1bf",
"id" : "08c5d3d4-0454-4107-8498-b579713bf90a",
"name" : "alert.condition.detected",
"durable" : true,
"lastUpdatedBy" : "ANONYMOUS",
"lastUpdatedTime" : 1430868669580,
"createdBy" : "ANONYMOUS",
"createdTime" : 1430868669580
}, {
"exchange" : "8128f935-e5e5-4953-9e47-6c0fc3cfa1bf",
"id" : "64ab6b8e-b58a-453d-a341-dc0dbf5f91d9",
"name" : "condition.detected",
"durable" : true,
"lastUpdatedBy" : "admin",
"lastUpdatedTime" : 1433803383881,
"createdBy" : "admin",
"createdTime" : 1433803383881
} ]
},
In the node js client I create the queue with the arguments option.
Connection.queue( topic, {
autoDelete: false,
durable: true,
arguments: {
'qpid.group_header_key': 'group_id',
'qpid.shared_msg_group': 0
})
In the node js producer I send (publish the msg on the AMQP exchange) including
the headers option
Exchange.publish(<topic>, body, {
headers: {
group_id: 5
})
When I publish without consuming I can view the message in the QPID broker
management tool. And it shows the header being set.
[cid:[email protected]]
However when I start up my two consumers the messages are distributed to which
ever consumer is available.
- Produce 6 messages 3 in group 1, 3 in group 5
- Two consumers one set to delay 60 seconds before sending the ack
back to qpid
- Prefetch for each consumer is 1.
- The second consumer ends up processing 5 messages, 3 from group 1
and the two remaining from group 5.
- I was expecting the group 5 messages to be held up in the queue
until the delayed consumer acked the first group 5 message.
Again, any help would be much appreciated.
Thanks,
Scott