in order for the producer to block the send on the client, you need to
configure a producer window size that is less than the store limit.
If it s 2*max message size, then there can be two pending sends on the
broker and the third will block the producer before the store limit
kicks in and blocks the destination.

ActiveMQConnectionFactory connctionFactory = ...
connctionFactory.setProducerWindowSize(1024000);

see: http://activemq.apache.org/producer-flow-control.html


On 19 September 2012 19:26, Oleg Dulin <oleg.du...@gmail.com> wrote:
> Here is my flow:
>
> Queue A has a store limit set to 15% and producerFlowControl turned on.
>
> Thread "foo" publishes on queue A
> Thread "bar" listens for A, does something, and publishes on queue B
> Thread "bat" listens for B, does something and publishes on queue C
> Thread "boo" listens for C, does something and only ocassionally publishes
> on A
>
> Store reaches 15%.
>
> Foo gets stopped, rightfully so.
>
> The whole thing is now blocked. Using Jconsole:
>
> foo is blocked on this:
>
> Stack trace:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:315)
> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
> org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1760)
>   - locked java.lang.Object@407f434d
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
>
> bar is now blocked on this as well it seems waiting for foo to free up this
> lock:
>
> Stack trace:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:315)
> org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87)
> org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1284)
> org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1760)
>   - locked java.lang.Object@202ed609
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
>
> and boo is blocked trying to send it back to queue A, as expected.
>
> So, store is not getting freed up.
>
> Why ? This has got to be a misconfiguration...
>
>
>
> --
> Regards,
> Oleg Dulin
> NYC Java Big Data Engineer
> http://www.olegdulin.com/
>
>



-- 
http://fusesource.com
http://blog.garytully.com

Reply via email to