Brent Villalobos wrote:
This is my first time using QPID and I'm trying to write a simple producer/consumer setup.

Great; thanks for giving it a try! We'll be grateful for any feedback on what we can improve. Please feel free to keep asking questions and we'll do our best to get the answers to you.

What am I doing wrong that all the messages in my queue are wiped out after I consume one message?

The issue is that the broker is sending the client as many messages as available and because acknowledgements are turned off (no_ack=True), it assumes that delivery is always successful so dequeues those messages immediately.

The no_ack mode is really intended for cases where every client has their own temporary queue bound to an exchange (i.e. a transient pub sub model).

You probably need to turn acking on in your case so that messages are only dequeued when the application acknowledges their receipt:

reply = channel.basic_consume(queue="message_queue", no_ack=False)

or even just:

reply = channel.basic_consume(queue="message_queue")

as no_ack is False by default.

You then need to ack every message you receive:

channel.basic_ack(delivery_tag=msg.delivery_tag)

or to acknowledge all messages up to a particular message:

channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)

This will also make your application tolerant of client failures. Messages not yet acked will be redelivered once the client reconnects and consumes from the queue again.

You can also control the prefetching by sending basic_qos commands to the broker. This only has an effect when acking is turned on though. e.g.

channel.basic_qos(prefetch_count=1)# or whatever value seems appropriate

If you set it to 1 you have to acknowledge every message before another one is sent. This minimises redelivery in the event of a failure but will also slow things down.

A final option with 0-8 is to use basic_get instead of basic_consume (though in my view its a little more cumbersome). E.g.

reply = channel.basic_get(no_ack=False)
if (reply.method.name == "get_ok"):
   print "Consumer gets this message: ", msg.content.body
   channel.basic_ack(delivery_tag=reply.delivery_tag)
else:
   print "Queue was empty"


fyi: The 0-10 version of the protocol which the AMQP WG has recently voted through in its final form has a more powerful flow control mechanism and unifies the functionality of get and consume. Support for that is currently in progress on trunk (c++ broker and c++, python & java clients will be the first available, java broker following shortly after).



Reply via email to