Hi,
I've somehow gotten myself into the situation where I've got 50,000+
messages stuck in a queue for no apparent reason. Allow me to
explain:
I've got a system where a component rapidly produces messages that
are put
on topic (around 400 per second). A second component listens to the
topic,
takes the information from the messages that it sees, repackages the
information in another message which it puts a on queue. A third
component
eats from the queue, and processes the information in the messages.
Under large load (upwards of 1.5m messages) we were experiencing
system
hangs (no more messages were getting through), so I let loose the
usual
stable of performance analysis tools (JConsole and Sar) to see what
was
going one. Using the graphs produced with the sar info, you can
clearly
see the points at which the producer flow control kicks in for the
topic:
the cpu io-wait skyrockets and the JMX counters for the topic grind
to a
halt. What's troubling, however, is that nothing else seems to be
working
either (ie the downstream queues that were filled from the topic
don't
seem to be emptied).
Things got even stranger when I killed the producer, thereby
cutting the
influx of new messages to the topic. The system is now in a stable
state,
with the amount of enqueued messages on the topic equal to the
dequeued
amount. However, there are more than 50,000 messages in the first
queue,
waiting to be processed. All of the listeners that are supposed to
eat
from this queue are blocked with the following stacktrace:
java.lang.Thread.State: TIMED_WAITING on java.lang.obj...@6e186c3f
at java.lang.Object.wait(Native Method)
at
org
.apache
.activemq
.MessageDispatchChannel.dequeue(MessageDispatchChannel.java:77)
at
org
.apache
.activemq
.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:428)
at
org
.apache
.activemq
.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:554)
at
org
.springframework
.jms
.listener
.AbstractPollingMessageListenerContainer
.receiveMessage(AbstractPollingMessageListenerContainer.java:405)
at
org
.springframework
.jms
.listener
.AbstractPollingMessageListenerContainer
.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:
308)
at
org
.springframework
.jms
.listener
.AbstractPollingMessageListenerContainer
.receiveAndExecute(AbstractPollingMessageListenerContainer.java:261)
at
org.springframework.jms.listener.DefaultMessageListenerContainer
$
AsyncMessageListenerInvoker
.invokeListener(DefaultMessageListenerContainer.java:982)
at
org.springframework.jms.listener.DefaultMessageListenerContainer
$
AsyncMessageListenerInvoker
.executeOngoingLoop(DefaultMessageListenerContainer.java:974)
at
org.springframework.jms.listener.DefaultMessageListenerContainer
$
AsyncMessageListenerInvoker
.run(DefaultMessageListenerContainer.java:876)
at java.lang.Thread.run(Thread.java:619)
I've used JConsole to stop and start the (tcp) connector several
times,
and each time (strangely) around 2075 messages have been eaten from
the
queue by the consumers, after which things freeze again. If I
restart the
entire broker, around 800 messages are eaten from the queue before
things
stagnate again.
My basic question is: what is going on, and how can I prevent those
messages from getting stuck in the queue?
To make matters even more interesting, I ran another test a while ago
where 10 million messages were fed to the same setup without a
hitch. The
only difference between that test and the one I'm running now was the
nature of the message. The average size of a message in both sets is
almost equal, but during the current test message sizes vary more
than
during the succesfull one.
Anyone have any ideas? Below are a bunch of relevant settings an my
activemq.xml config. We code to activemq using the Spring
JMSTemplate.
Thanks in advance,
Maarten
ActiveMQ: 5.3.0
Java: 1.6.0_17
Spring: 2.5.6
Connector URL: tcp://localhost:61616
JMS receivetimeout: 30000
JMS Acknowledgemode: CLIENT_ACKNOWLEDGE
JMS Session transacted: false
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://mortbay.com/schemas/jetty/1.0 http://jetty.mortbay.org/jetty.xsd
">
<broker id="broker" useJmx="true" brokerName="testbroker"
start="true"
xmlns="http://activemq.apache.org/schema/core"
dataDirectory="/var/amqdata">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" memoryLimit="32 mb"
producerFlowControl="true" />
<policyEntry topic=">" memoryLimit="32 mb"
producerFlowControl="true" />
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext
useMBeanServer="true"
jmxDomainName="org.apache.activemq"
createMBeanServer="true"
createConnector="false"
connectorPort="1100"
connectorPath="/jmxrmi"/>
</managementContext>
<persistenceAdapter>
<amqPersistenceAdapter
syncOnWrite="false"
directory="/var/amqdata/testbroker"
indexBinSize="8192"
cleanupInterval="300000"
indexPageSize="64 kb"
maxFileLength="256 mb"
archiveDataLogs="false"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="512 mb" />
</memoryUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616" />
</transportConnectors>
</broker>
</beans>