Das, Kapali Tejeswar wrote:
Carl,
I didn't find queue browsing mentioned in the AMQP 0.8 spec. That
explains the fact that there is no implementation in Qpid to browse the
messages in the queue.
My work is essentially to implement the JMS QueueBrowser feature that
would return a java.util.Enumeration which the caller uses to browse the
messages. JMS doesn't provide any guideline as to how it should be
implemented. Ideally, we may want to take the snapshot of all the
messages in the queue and present to the user. That way it (browsing)
won't have any effect on any other threads that are actively
producing/consuming messages.
In order to build the snapshot of the messages, we need to be able to
read a message without removing it from the queue. SynchronousQueue
doesn't support this.
Any feedback is greatly appreciated.
Thanks and regards
Tej
The SynchronousQueue is I think only used in BasicMessageConsumer for
the case where the client is pulling messages from the consumer rather
than having them pushed to a MessageListener implementation. The queue
that holds all received messages for the session is of type
FlowControllingBlockingQueue (in the util package).
Regardless, as I think has been covered, the browse behaviour should
enumerate the messages on a broker queue without consuming them (i.e. it
shouldn't just browse the list of messages received but not delivered).
I don't believe there is a way of retrieving messages from a queue
without consuming them in the current protocol, so some change will need
to be proposed. There was a discussion about such a proposal a long time
ago amongst some members of this list. I've attached a very rough
outline of what I remember of that design.
The attached approach attempts to add a browsing function to AMQP that
can be used to implement the JMS queue browsing, but gives slightly
stronger semantics than required in order to potentially be more useful.
It allows a client to detect that the queue has been altered as it
browses, though whether it cares about this is implementation dependent.
Other designs are of course possible, this is just a suggestion as a
starting point (one useful modification could be the ability to retrieve
batches rather than just individual messages).
The broker would maintain start and end indices for each queue (on
enqueue, increment end and on dequeue, increment start).
Clients would issue browse requests, similar to get requests but with
three additional fields: start, end and offset. The start and end
would indicate the clients view of the state of the queue (if start
and end are both zero it implies the client has no view of the state
of the queue and browsing should start at the beginning). The offset
would indicate the item it wanted to retrieve relative to the start.
The broker would respond by sending its view of the start and end
position along with the requested message, a later message if the
requested position had already been consumed or an empty indicator if
there were no more messages. The offset of any returned message would
also be supplied.
In the simple case where no-one is consuming or publishing from/to the
queue, the start and end positions to the queue don't change and the
client can move the offset up to retrieve each message in turn.
The start and end indicators are only needed when the queue contents
are changing as it is being browsed.
An example might help make this clearer:
Imagine a queue that has 5 messages enqueued. Lets assume these
are thefirst 5 messages received by that queue and they are
identified by a, b, c, d and e. The queue will have set its end
counter to 5, but the start counter will still be 0.
The client now starts to browse the queue in question and issues a
browse request with start, end and offset all set to 0.
On receiving this request the broker recognises that the client is
assuming nothing about the state of the queue and sends back
message a, indicating start = 0, end = 5 and offset = 0.
The client gets this response and now knows that a is the first
message to the queue and that at the time of asking there were 5
messages in total.
It then moves to the next message by issuing a further browse
request with start = 0, end = 5, offset = 1.
But lets assume at this point another consumer has removed
messages a and b from the queue. The broker will have updated its
start count to 2. It then gets the browsing clients latest request
and realises that the client is attempting to get a message that
has already been removed (b). So it sends the next available
message instead i.e. c, along with start = 2, end = 5 and offset =
0.
On receiving this response the client knows that it missed a
message, but has the next available message anyway. Lets say it
decides to continue browsing and issues a new request, start = 2,
end = 5 and offset = 1.
In the meantime, c, d and e have been consumed, but f, g and h
have been enqueued. The brokers counts are start = 5, end = 8. It
handles the clients request by sending back the first available
message, f, along with the updated counts and an offset of 0.
The client again can detect that they have missed some messages
and also that the queue has had new messages appended to it.
The broker should even be able to handle expiration of message from
the middle of the queue by just returning the next message and
indicating its position as appropriate. That scenario could do with
being thought through in more detail though.
We would also need to decide on the size of the counters and whether
they could wrap-around etc.