Thank you for the responses.

Essentially there are groups and each group has a set of messages
associated with it that need to get processed in order.  I want only
one consumer processing a particular group at a time.  I'd like to
distribute the work among a number of consumers.  I occasionally would
like to take a number of messages (jobs) for a particular group and
consolidate the operations.  For example when I see a sequence of jobs
together for a particular job I could trim some redundant work
potentially.

So the idea was to create a queue if it doesn't exist and have another
queue that lists the queues that have work.  All workers subscribe to
the shared hasWork queue and then subscribe to the queue that has
work.  If consumer A crashes, the set of queues it was subscribed to
become available again on the hasWork queue and other consumers can
continue .  When the work from that queue is gone, I can delete the
queue, accept the hasWork message and unsubscribe from it freeing
resources.

My concern was thinking a queue was empty and going to delete it and
losing messages because of a race between the producer (checking to
see if the queue is gone or not and the consumer deleting it for being
empty).

ActiveMQ has JMSGroupID that accomplishes some of this (consumer group
exclusivity with fail-over) but I want to use qpid (for numerous
reasons).

I didn't see that qpid had message selectors currently.

After your responses I thought I could

1) use the if_empty etc delete flags.  consumer deletes queue when it
is done and if there's no exception, unsubscribes.

2) unbind the queue from the exchange, process any messages, then
unsubscribe and delete.  If the producer tries to send to the unbound
queue, wont it throw an exception???

3) set an alternate exhange for the messages that producers will
subscribe to (this was my idea but it seems more complicated) then if
any messages get lost the producers will get them and can re-create
the work queue, the hasWork message and then re-send it.

4) something else??  If I have the queue auto-delete, won't I lose
messages if the consumer that is subscribed to that work queue
crashes?  Put all the work in one queue and have consumers release
messages that they aren't responsible for, group responsibility could
be managed via another tool or some sort of algorithm where messages
are broadcasted to consumers.

I was going to try some of this pretty soon and I just wanted to
bounce it off you guys before getting underway.

Thanks again,

Adam

On Mon, Jan 5, 2009 at 3:16 PM, Alan Conway <[email protected]> wrote:
> Carl Trieloff wrote:
>>
>> Adam Chase wrote:
>>>
>>> Premature sending (sorry) (the tab key killed me)
>>>
>>> So I have a system where I have groups of messages that need to be
>>> accumulated and then processed exclusively.  I might get a message for
>>> any group at any time.  The space of possible groups is quite large,
>>> but at any time I expect there will be a reasonable number of groups
>>> active.
>>>
>>> My plan was:
>>>
>>> produce(group, job)
>>> {
>>>      query_queue(group) //to see if it exists
>>>      if (exists)
>>>           send_message(group)
>>>      else
>>>           declare_queue(group, exclusive)
>>>           send_message(group)
>>>           send_message(queues_to_process, group)
>>>
>>> }
>>>
>>> consume()
>>> {
>>>      queueName = queue.receive(queues_to_process)
>>>      queue.subscribe(queueName)
>>>      queue.receive(queueName) //to get a bunch of messages
>>>
>>>      //later when the queue has been idle (for a while)
>>>      queue_delete(queueName)
>>>
>>> }
>>>
>>> Not sure if I've explained myself well here or not, but my question is
>>> 1) Is there a better way to accomplish this sort of thing and 2) can
>>> you think of a way around the race condition between queue_query and
>>> queue_delete?  Does the alternate_exchange setting help any?
>>>
>>> Thanks,
>>>
>>> Adam
>>>
>>
>> Are you trying/wanting to make sure messages are not sent a queue that is
>> going to be deleted?
>>
>> Carl.
>>
>>
>
> You can pass the if-empty and if-unused flags to queue.delete to avoid
> deleting a non-empty or in-use (has consumers) queue respectively. If the
> conditions for delete aren't all met you'll get an exception.
>
>

Reply via email to