On second thought, this is perhaps not a bug. I can't think of a way
that the broker could be modified to "fix" this.
To solve your problem, you might try setting the flow-stop threshold
significantly higher than the even threshold so that your fuse program
can delete the binding before the producer gets flow-stopped.
Also, the queue has a boolean statistic called "flowStopped" that
indicates whether it is applying back-pressure to producers. You can
check this value after deleting the binding and if True, either delete
the queue or remove enough messages such that the flow-resume threshold
is crossed downward.
-Ted
On 12/07/2011 03:04 PM, Ted Ross wrote:
Frase,
I think you've uncovered a bug in the broker.
If a congested queue causes a flow-control stop (i.e. the broker
withholds acks to the producer until the queue reduces in size),
unbinding the queue from the exchange will not cause a flow-resume
like deleting the queue will. This is why your producer is hanging.
The messages it produced into the queue above the flow-stop threshold
have not been acknowledged.
-Ted
On 12/04/2011 03:25 PM, Fraser Adams wrote:
Hi all,
I'm writing a little application called QueueFuse which is a QMF2
based application.
The idea is that it listens for the queueThresholdExceeded Event and
if one occurs it recovers the name of the queue that caused the Event
and "blows a fuse" to that queue.
I've got a couple of options when this occurs. My first option is to
use the queue name to trigger a QMF2 queue delete that goes something
like this:
QmfData arguments = new QmfData();
arguments.setValue("type", "queue");
arguments.setValue("name", queueName);
try
{
_broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
This works really well and is kind of a QMF2 version of
https://issues.apache.org/jira/browse/QPID-3247, thought it's
obviously quite brutal.
I've also tried a slightly more subtle approach of removing bindings
to the offending queue by recovering the binding referencing the
queue that caused the event and dereferencing the exchange from the
binding to the the exchange name.
The unbind call is as follows:
String bindingIdentifier = exchangeName + "/" + queueName + "/" +
bindingKey;
QmfData arguments = new QmfData();
arguments.setValue("type", "binding");
arguments.setValue("name", bindingIdentifier);
try
{
_broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
This *appears* to work initially and the producer carries on
producing for much longer than had I not triggered the binding
delete, however eventually the producer hangs with:
ItemProducer: exception: Exception when sending message
javax.jms.JMSException: Exception when sending message
at
org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:240)
at
org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:501)
at
org.apache.qpid.client.BasicMessageProducer.sendImpl(BasicMessageProducer.java:456)
at
org.apache.qpid.client.BasicMessageProducer.send(BasicMessageProducer.java:283)
at ItemProducer.<init>(ItemProducer.java:58)
at ItemProducer.main(ItemProducer.java:130)
Caused by: org.apache.qpid.transport.SessionException: timed out
waiting for completion
at org.apache.qpid.transport.Session.invoke(Session.java:688)
at org.apache.qpid.transport.Session.invoke(Session.java:559)
at
org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96)
at
org.apache.qpid.client.BasicMessageProducer_0_10.sendMessage(BasicMessageProducer_0_10.java:226)
... 5 more
As I say if I delete the queue the producer carries on ad infinitum
(though clearly the messages are falling on the floor), but I can't
see why I should get the exception above by dynamically deleting the
binding.
Another thing I've noticed is that if the consumer isn't *too* much
slower than the producer, but slow enough to trigger the Event the
unbind works OK, but if the consumer is really slow (or non-existent)
I get the exception above.
I've tried using a sleep to deliberately slow down a consumer and I
did reach a point where the producer would hang for a while then
eventually carry on, but if I then slowed the consumer down further
then I get the exception.
Does anyone have any idea what should cause this exception and why it
should occur when I unbind a slow consumer from a fast producer.
Any neat thoughts for resolving this?
As a slight aside I can't unbind things bound to the default direct
exchange (I think that's illegal in AMQP) so is the only way to
protect producers from slow consumers bound to that exchange to
delete the queue? (obviously if the queue was a ring queue that would
work, but that's not what I'm trying to figure out :-) ).
Cheers,
Frase
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]