Hi all,
I'm writing a little application called QueueFuse which is a QMF2 based
application.
The idea is that it listens for the queueThresholdExceeded Event and if
one occurs it recovers the name of the queue that caused the Event and
"blows a fuse" to that queue.
I've got a couple of options when this occurs. My first option is to use
the queue name to trigger a QMF2 queue delete that goes something like this:
QmfData arguments = new QmfData();
arguments.setValue("type", "queue");
arguments.setValue("name", queueName);
try
{
_broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
This works really well and is kind of a QMF2 version of
https://issues.apache.org/jira/browse/QPID-3247, thought it's obviously
quite brutal.
I've also tried a slightly more subtle approach of removing bindings to
the offending queue by recovering the binding referencing the queue that
caused the event and dereferencing the exchange from the binding to the
the exchange name.
The unbind call is as follows:
String bindingIdentifier = exchangeName + "/" + queueName + "/" +
bindingKey;
QmfData arguments = new QmfData();
arguments.setValue("type", "binding");
arguments.setValue("name", bindingIdentifier);
try
{
_broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
This *appears* to work initially and the producer carries on producing
for much longer than had I not triggered the binding delete, however
eventually the producer hangs with:
ItemProducer: exception: Exception when sending message
javax.jms.JMSException: Exception when sending message
at
org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240)
at
org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501)
at
org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456)
at
org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283)
at ItemProducer.<init>(ItemProducer.java:58)
at ItemProducer.main(ItemProducer.java:130)
Caused by: org.apache.qpid.transport.SessionException: timed out waiting
for completion
at org.apache.qpid.transport.Session.invoke(Session.java:688)
at org.apache.qpid.transport.Session.invoke(Session.java:559)
at
org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96)
at
org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226)
... 5 more
As I say if I delete the queue the producer carries on ad infinitum
(though clearly the messages are falling on the floor), but I can't see
why I should get the exception above by dynamically deleting the binding.
Another thing I've noticed is that if the consumer isn't *too* much
slower than the producer, but slow enough to trigger the Event the
unbind works OK, but if the consumer is really slow (or non-existent) I
get the exception above.
I've tried using a sleep to deliberately slow down a consumer and I did
reach a point where the producer would hang for a while then eventually
carry on, but if I then slowed the consumer down further then I get the
exception.
Does anyone have any idea what should cause this exception and why it
should occur when I unbind a slow consumer from a fast producer.
Any neat thoughts for resolving this?
As a slight aside I can't unbind things bound to the default direct
exchange (I think that's illegal in AMQP) so is the only way to protect
producers from slow consumers bound to that exchange to delete the
queue? (obviously if the queue was a ring queue that would work, but
that's not what I'm trying to figure out :-) ).
Cheers,
Frase
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]