I believe I've noticed something similar using a topic... any ideas? Thanks, mrh
DominicTulley wrote: > > Hi, I've been investigating Camel recently and today I was taking a look > at message throughput in ActiveMQ when Camel routing is being used. > > I have been doing simplistic testing so far, just using the example > message Producer class to pump lots of text messages through a queue. > When it's a simple queue with no consumer, things seem fine. > > However, if I have a Camel RouteBuilder as a consumer of this queue, > forwarding to another queue in the same ActiveMQ instance then it seems > possible to reproducibly make the queue hang. This is a permanent hang, > with a restart of activemq required to fix the problem as far as I can > tell. > > I was in two minds about if this should be raised in the Camel forum or > this one, but the problem seems to lie in ActiveMQ (judging by the absence > of camel classes in any of the stack traces) and I suspect Camel is just a > useful way to introduce an unknown situation which causes the problem. > > The story briefly is: > > Start ActiveMQ (see version info later) with a simple camel route enabled > (from activemq:a to activemq:b for instance) > Start up a modified Producer class that just sends lots of messages (flat > out) to "a". > Each message is 25500 characters (~50KB) > After a while the ActiveMQ console will indicate that the Kaha store is > being used. (happens after a few thousand for me but will be machine > specific) > Shortly after this, the producer will stall. > Restarting the producer does not help. > Running a consumer to clear the queue does not help. > Restarting ActiveMQ does help (of course!). > > I have tried this with three AMQs so far (4.1.1 hangs with no apparent > reason, 5.0 Snapshot from 12th august gets a thread deadlock and 5.0 > snapshot from 15th august hangs for no apparent reason). > > In each case, the producer is stuck writing to a socket (I've seen this > reported against AMQ 4.0.1 but no fix was put in place as the cause was > not identified). Stacktrace is : > > "main" prio=6 tid=0x00038278 nid=0xe9c runnable [0x0007f000..0x0007fc3c] > at java.net.SocketOutputStream.socketWrite0(Native Method) > at > java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) > at java.net.SocketOutputStream.write(SocketOutputStream.java:136) > at > org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBu > fferedOutputStream.java:105) > at java.io.DataOutputStream.flush(DataOutputStream.java:106) > at > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.ja > va:120) > at > org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMoni > tor.java:144) > - locked <0x22f99060> (a > org.apache.activemq.transport.InactivityMonitor > $2) > at > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter. > java:82) > at > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatN > egotiator.java:90) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.ja > va:40) > - locked <0x22f9c5f0> (a java.lang.Object) > at > org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorre > lator.java:59) > at > org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnec > tion.java:1148) > at > org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1583) > - locked <0x22fa0118> (a java.lang.Object) > at > org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProdu > cer.java:226) > at > org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessa > geProducerSupport.java:240) > at ProducerTool.sendLoop(ProducerTool.java:137) > at ProducerTool.run(ProducerTool.java:99) > at ProducerTool.main(ProducerTool.java:60) > > > The deadlock found in AMQ 5 from 12th august is as follows: > > Found one Java-level deadlock: > ============================= > "ActiveMQ Transport: tcp:///127.0.0.1:4445": > waiting to lock monitor 0x0003f84c (object 0x094f3700, a > java.lang.Object), > which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4443" > "ActiveMQ Transport: tcp:///127.0.0.1:4443": > waiting to lock monitor 0x0003f72c (object 0x094f5ee8, a > org.apache.activemq.b > roker.region.cursors.FilePendingMessageCursor), > which is held by "ActiveMQ Transport: tcp:///127.0.0.1:4445" > > Java stack information for the threads listed above: > =================================================== > "ActiveMQ Transport: tcp:///127.0.0.1:4445": > at > org.apache.activemq.memory.UsageManager.increaseUsage(UsageManager.ja > va:157) > - waiting to lock <0x094f3700> (a java.lang.Object) > at > org.apache.activemq.command.Message.incrementReferenceCount(Message.j > ava:585) > at > org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.ne > xt(FilePendingMessageCursor.java:192) > - locked <0x094f5ee8> (a > org.apache.activemq.broker.region.cursors.FileP > endingMessageCursor) > at > org.apache.activemq.broker.region.cursors.StoreQueueCursor.next(Store > QueueCursor.java:129) > - locked <0x094f3908> (a > org.apache.activemq.broker.region.cursors.Store > QueueCursor) > at > org.apache.activemq.broker.region.Queue.doPageIn(Queue.java:1015) > - locked <0x094f3908> (a > org.apache.activemq.broker.region.cursors.Store > QueueCursor) > at > org.apache.activemq.broker.region.Queue.pageInMessages(Queue.java:106 > 4) > - locked <0x094f3998> (a java.lang.Object) > at > org.apache.activemq.broker.region.Queue.sendMessage(Queue.java:993) > at > org.apache.activemq.broker.region.Queue.doMessageSend(Queue.java:478) > > at org.apache.activemq.broker.region.Queue.send(Queue.java:436) > at > org.apache.activemq.broker.region.AbstractRegion.send(AbstractRegion. > java:328) > at > org.apache.activemq.broker.region.RegionBroker.send(RegionBroker.java > :402) > at > org.apache.activemq.broker.TransactionBroker.send(TransactionBroker.j > ava:221) > at > org.apache.activemq.broker.BrokerFilter.send(BrokerFilter.java:125) > at > org.apache.activemq.broker.CompositeDestinationBroker.send(CompositeD > estinationBroker.java:95) > at > org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilt > er.java:135) > at > org.apache.activemq.broker.TransportConnection.processMessage(Transpo > rtConnection.java:474) > at > org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.jav > a:623) > at > org.apache.activemq.broker.TransportConnection.service(TransportConne > ction.java:320) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC > onnection.java:216) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt > er.java:67) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm > atNegotiator.java:129) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM > onitor.java:124) > - locked <0x09502968> (a > org.apache.activemq.transport.InactivityMonitor > $1) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSup > port.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav > a:150) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 137) > at java.lang.Thread.run(Thread.java:595) > "ActiveMQ Transport: tcp:///127.0.0.1:4443": > at > org.apache.activemq.broker.region.cursors.FilePendingMessageCursor.on > MemoryUseChanged(FilePendingMessageCursor.java:249) > - waiting to lock <0x094f5ee8> (a > org.apache.activemq.broker.region.curs > ors.FilePendingMessageCursor) > at > org.apache.activemq.memory.UsageManager.fireEvent(UsageManager.java:3 > 58) > at > org.apache.activemq.memory.UsageManager.setPercentUsage(UsageManager. > java:328) > - locked <0x094f3700> (a java.lang.Object) > at > org.apache.activemq.memory.UsageManager.decreaseUsage(UsageManager.ja > va:181) > at > org.apache.activemq.command.Message.decrementReferenceCount(Message.j > ava:602) > - locked <0x0961f980> (a > org.apache.activemq.command.ActiveMQTextMessage > ) > at > org.apache.activemq.broker.region.IndirectMessageReference.drop(Indir > ectMessageReference.java:137) > - locked <0x09612fe0> (a > org.apache.activemq.broker.region.IndirectMessa > geReference) > at > org.apache.activemq.broker.region.QueueSubscription.acknowledge(Queue > Subscription.java:56) > at > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(Pr > efetchSubscription.java:193) > - locked <0x094f0178> (a > org.apache.activemq.broker.region.QueueSubscrip > tion) > at > org.apache.activemq.broker.region.AbstractRegion.acknowledge(Abstract > Region.java:340) > at > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBrok > er.java:427) > at > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionB > roker.java:191) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java > :73) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java > :73) > at > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBro > kerFilter.java:87) > at > org.apache.activemq.broker.TransportConnection.processMessageAck(Tran > sportConnection.java:480) > at > org.apache.activemq.command.MessageAck.visit(MessageAck.java:184) > at > org.apache.activemq.broker.TransportConnection.service(TransportConne > ction.java:320) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportC > onnection.java:216) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilt > er.java:67) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireForm > atNegotiator.java:129) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityM > onitor.java:124) > - locked <0x094e9eb0> (a > org.apache.activemq.transport.InactivityMonitor > $1) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSup > port.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.jav > a:150) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java: > 137) > at java.lang.Thread.run(Thread.java:595) > > Found 1 deadlock. > > The "Hang for no reason" thread dump is in this file > http://www.nabble.com/file/p12181133/AMQ5%2BThreadDump.txt > AMQ5+ThreadDump.txt . In this one, I am posting to a queue called > DataQueue and this is being forwarded to a queue called TOOL.DEFAULT. > > I'm not an expert on AMQ so perhaps I've just not configured something > correctly. It seems to me that when the number of entries in the "a" > queue gets too long, AMQ decides to buffer them in a persistent store > (kaha) and this is where things fall over. The use of Camel as a router > slows down the consumption of messages from queue "a" sufficiently to > allow the backlog to build up and cause kaha to kick in. > > Some things don't make sense to me though - for instance if I just send > messages to a queue and don't consume them then should they not also end > up in a kaha store once I've sent enough? My first experiment once I > found this problem was to send messages to queue "a" without any consumer > - this didn't cause a problem until AMQ ran out of memory. This suggests > that kaha didn't kick in at all? > > Anyhow, sorry for the long post but this seems a significant problem. I > realise the message throughput I have used is pretty extreme but there > does seem to be some nasty race condition between message arrival, > consumption and the kaha store which could perhaps arise at any time. > > Thanks, > > -Dominic > > > > > > > > > > -- View this message in context: http://www.nabble.com/Queue-%22hangs%22-when-Kaha-Store-kicks-in.-Restart-required.-tp12181133s2354p14257739.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.