Hi, I've been investigating Camel recently and today I was taking a
look
at message throughput in ActiveMQ when Camel routing is being used.
I have been doing simplistic testing so far, just using the example
message Producer class to pump lots of text messages through a queue.
When it's a simple queue with no consumer, things seem fine.
However, if I have a Camel RouteBuilder as a consumer of this queue,
forwarding to another queue in the same ActiveMQ instance then it
seems
possible to reproducibly make the queue hang. This is a permanent
hang,
with a restart of activemq required to fix the problem as far as I
can
tell.
I was in two minds about if this should be raised in the Camel
forum or
this one, but the problem seems to lie in ActiveMQ (judging by the
absence
of camel classes in any of the stack traces) and I suspect Camel is
just a
useful way to introduce an unknown situation which causes the
problem.
The story briefly is:
Start ActiveMQ (see version info later) with a simple camel route
enabled
(from activemq:a to activemq:b for instance)
Start up a modified Producer class that just sends lots of messages
(flat
out) to "a".
Each message is 25500 characters (~50KB)
After a while the ActiveMQ console will indicate that the Kaha
store is
being used. (happens after a few thousand for me but will be machine
specific)
Shortly after this, the producer will stall.
Restarting the producer does not help.
Running a consumer to clear the queue does not help.
Restarting ActiveMQ does help (of course!).
I have tried this with three AMQs so far (4.1.1 hangs with no
apparent
reason, 5.0 Snapshot from 12th august gets a thread deadlock and 5.0
snapshot from 15th august hangs for no apparent reason).
In each case, the producer is stuck writing to a socket (I've seen
this
reported against AMQ 4.0.1 but no fix was put in place as the cause
was
not identified). Stacktrace is :
"main" prio=6 tid=0x00038278 nid=0xe9c runnable
[0x0007f000..0x0007fc3c]
at java.net.SocketOutputStream.socketWrite0(Native Method)
at
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
at java.net.SocketOutputStream.write(SocketOutputStream.java:
136)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBu
fferedOutputStream.java:105)
at java.io.DataOutputStream.flush(DataOutputStream.java:106)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.ja
va:120)
at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMoni
tor.java:144)
- locked <0x22f99060> (a
org.apache.activemq.transport.InactivityMonitor
$2)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.
java:82)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatN
egotiator.java:90)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.ja
va:40)
- locked <0x22f9c5f0> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorre
lator.java:59)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnec
tion.java:1148)
at
org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1583)
- locked <0x22fa0118> (a java.lang.Object)
at
org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu
cer.java:226)
at
org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa
geProducerSupport.java:240)
at ProducerTool.sendLoop(ProducerTool.java:137)
at ProducerTool.run(ProducerTool.java:99)
at ProducerTool.main(ProducerTool.java:60)
The deadlock found in AMQ 5 from 12th august is as follows:
Found one Java-level deadlock:
=============================
"ActiveMQ Transport: tcp:///127.0.0.1:4445":
waiting to lock monitor 0x0003f84c (object 0x094f3700, a
java.lang.Object),
which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4443"
"ActiveMQ Transport: tcp:///127.0.0.1:4443":
waiting to lock monitor 0x0003f72c (object 0x094f5ee8, a
org.apache.activemq.b
roker.region.cursors.FilePendingMessageCursor),
which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4445"
Java stack information for the threads listed above:
===================================================
"ActiveMQ Transport: tcp:///127.0.0.1:4445":
at
org.apache.activemq.memory.UsageManager.increaseUsage(UsageManager.ja
va:157)
- waiting to lock <0x094f3700> (a java.lang.Object)
at
org.apache.activemq.command.Message.incrementReferenceCount(Message.j
ava:585)
at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.ne
xt(FilePendingMessageCursor.java:192)
- locked <0x094f5ee8> (a
org.apache.activemq.broker.region.cursors.FileP
endingMessageCursor)
at
org.apache.activemq.broker.region.cursors.StoreQueueCursor.next(Store
QueueCursor.java:129)
- locked <0x094f3908> (a
org.apache.activemq.broker.region.cursors.Store
QueueCursor)
at
org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1015)
- locked <0x094f3908> (a
org.apache.activemq.broker.region.cursors.Store
QueueCursor)
at
org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:106
4)
- locked <0x094f3998> (a java.lang.Object)
at
org.apache.activemq.broker.region.Queue.sendMessage(Queue.java:993)
at
org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:478)
at org.apache.activemq.broker.region.Queue.send(Queue.java:
436)
at
org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion.
java:328)
at
org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java
:402)
at
org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.j
ava:221)
at
org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125)
at
org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeD
estinationBroker.java:95)
at
org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilt
er.java:135)
at
org.apache.activemq.broker.TransportConnection.processMessage(Transpo
rtConnection.java:474)
at
org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.jav
a:623)
at
org.apache.activemq.broker.TransportConnection.service(TransportConne
ction.java:320)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC
onnection.java:216)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt
er.java:67)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm
atNegotiator.java:129)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM
onitor.java:124)
- locked <0x09502968> (a
org.apache.activemq.transport.InactivityMonitor
$1)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSup
port.java:83)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav
a:150)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
137)
at java.lang.Thread.run(Thread.java:595)
"ActiveMQ Transport: tcp:///127.0.0.1:4443":
at
org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.on
MemoryUseChanged(FilePendingMessageCursor.java:249)
- waiting to lock <0x094f5ee8> (a
org.apache.activemq.broker.region.curs
ors.FilePendingMessageCursor)
at
org.apache.activemq.memory.UsageManager.fireEvent(UsageManager.java:3
58)
at
org.apache.activemq.memory.UsageManager.setPercentUsage(UsageManager.
java:328)
- locked <0x094f3700> (a java.lang.Object)
at
org.apache.activemq.memory.UsageManager.decreaseUsage(UsageManager.ja
va:181)
at
org.apache.activemq.command.Message.decrementReferenceCount(Message.j
ava:602)
- locked <0x0961f980> (a
org.apache.activemq.command.ActiveMQTextMessage
)
at
org.apache.activemq.broker.region.IndirectMessageReference.drop(Indir
ectMessageReference.java:137)
- locked <0x09612fe0> (a
org.apache.activemq.broker.region.IndirectMessa
geReference)
at
org.apache.activemq.broker.region.QueueSubscription.acknowledge(Queue
Subscription.java:56)
at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Pr
efetchSubscription.java:193)
- locked <0x094f0178> (a
org.apache.activemq.broker.region.QueueSubscrip
tion)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(Abstract
Region.java:340)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBrok
er.java:427)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionB
roker.java:191)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java
:73)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java
:73)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBro
kerFilter.java:87)
at
org.apache.activemq.broker.TransportConnection.processMessageAck(Tran
sportConnection.java:480)
at
org.apache.activemq.command.MessageAck.visit(MessageAck.java:184)
at
org.apache.activemq.broker.TransportConnection.service(TransportConne
ction.java:320)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC
onnection.java:216)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt
er.java:67)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm
atNegotiator.java:129)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM
onitor.java:124)
- locked <0x094e9eb0> (a
org.apache.activemq.transport.InactivityMonitor
$1)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSup
port.java:83)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav
a:150)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:
137)
at java.lang.Thread.run(Thread.java:595)
Found 1 deadlock.
The "Hang for no reason" thread dump is in this file
http://www.nabble.com/file/p12181133/AMQ5%2BThreadDump.txt
AMQ5+ThreadDump.txt . In this one, I am posting to a queue called
DataQueue and this is being forwarded to a queue called TOOL.DEFAULT.
I'm not an expert on AMQ so perhaps I've just not configured
something
correctly. It seems to me that when the number of entries in the "a"
queue gets too long, AMQ decides to buffer them in a persistent store
(kaha) and this is where things fall over. The use of Camel as a
router
slows down the consumption of messages from queue "a" sufficiently to
allow the backlog to build up and cause kaha to kick in.
Some things don't make sense to me though - for instance if I just
send
messages to a queue and don't consume them then should they not
also end
up in a kaha store once I've sent enough? My first experiment once I
found this problem was to send messages to queue "a" without any
consumer
- this didn't cause a problem until AMQ ran out of memory. This
suggests
that kaha didn't kick in at all?
Anyhow, sorry for the long post but this seems a significant
problem. I
realise the message throughput I have used is pretty extreme but
there
does seem to be some nasty race condition between message arrival,
consumption and the kaha store which could perhaps arise at any time.
Thanks,
-Dominic