On 03/14/2012 06:13 PM, Jeff Armstrong wrote:
Jeff: What happens if a session has unacked messages, the client dies
(blade goes down), and then the client on the other blade becomes
active and creates a session before the broker realizes that the old
session should be gone? I'm assuming there is some sort of session
timeout and this could occur. If so, I would expect the new session
not to immediately get the redeliveries until AFTER the broker times
out the old session - is there a mechanism to handle this case?
Yes, that is the case. You need the broker to discover the old session
has failed before the client subscribed on a new session gives up
waiting for new messages. You can control the time to detect a failed
connection using heartbeats (see ConnectionSettings).
Also,
When I examined the queue in RingQueuePolicy (member 'queue') and the
one in SemanticState (member 'unacked'), both appeared to have the
same messages - the ones that should have been redelivered but
weren't.
Which sessions unacked messages were you examining? If the old ones,
that would certainly imply the broker has not detected it has failed (as
once it does so that list of unacked deliveries will no longer exist).
Jeff: But, if a message is deleted due to the ring queue size being exceeded,
shouldn't that message be fully deleted and removed from all lists? Otherwise
the message never truly gets deleted.
It is deleted in the sense that it is removed from the store and will
never get redelivered.
The subscribers session tracks all the messages it has delivered. Those
records are still needed, though once the message is deleted from the
queue there would be no need for them to reference the actual message
data. Doing that would require some messy code and to my mind is not a
critical issue. As I say there have been some changes to the queue code
that with a little extra work would allow a much more efficient
implementation of ring queues.
Jeff: I basically copied the same options/setup.
1) I create an exchange:
qpid-config add exchange direct EXCHANGENAME
2) I start 2 clients that each create a queue and bind to the same binding and
then subscribe to it:
QueueOptions queueOptions;
queueOptions.setSizePolicy(RING, 524288000, 0); // 500 MB
session.queueDeclare(arg::queue = queueName, arg::arguments =
queueOptions);
session.exchangeBind(queueName, exchangeName, bindingName);
SubscriptionSettings subSettings;
subSettings.autoAck = 0;
subSettings.acceptMode = ACCEPT_MODE_EXPLICIT;
subSettings.completionMode = COMPLETE_ON_ACCEPT;
subscriptions.subscribe(localQueue, queueName, subSettings);
3) I start a sender client that continuously sends messages to the binding that
I set up:
Message msg;
msg.setData(msgStr);
msg.getDeliveryProperties().setRoutingKey(bindingName);
msg.getDeliveryProperties().setDeliveryMode(qpid::framing::PERSISTENT);
uint32_t messageCount = 0;
while (true) {
session.messageTransfer(qpid::client::arg::content=msg,
qpid::client::arg::destination=exchangeName);
++messageCount;
}
4) The 2 receiver clients will acquire 100k messages each and never accept
them. Then they will enter a loop where they acquire 200k messages, then ack
them all.
5) In gdb, I examine the queue during the accept() on the broker, and it does
not have a policy, so it doesn't call RingQueuePolicy::dequeued().
Only thing I can think of there is that perhaps a queue of that name
already exists.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]