I've now changed my activemq.xml to the listing below, made the session transacted and set the acknowledge mode to SESSION_TRANSACTED.
Things were going well for me for a while, with the system processing 3,2 million messages without a hitch, and then everything stopped because the first component in the chain got lots of these: javax.jms.InvalidClientIDException: Broker: broker - Client: ID:rhost-59116-1263927611185-1:445 already connected from /127.0.0.1:56560 And for an hour now, since it stopped processing messages, the broker has been eating up almost 100% of the cpu for some reason I can't quite fathom (disk utilization is very low, and there is no message traffic passing through the broker). <beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://mortbay.com/schemas/jetty/1.0 http://jetty.mortbay.org/jetty.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="location" value="file:/etc/broker.properties" /> </bean> <broker id="broker" useJmx="true" brokerName="${broker.name}" start="true" xmlns="http://activemq.apache.org/schema/core" dataDirectory="${activemq.data}"> <destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" memoryLimit="32mb" strictOrderDispatch="true" producerFlowControl="false"> <pendingQueuePolicy> <vmQueueCursor /> </pendingQueuePolicy> </policyEntry> <policyEntry topic=">" memoryLimit="32mb" producerFlowControl="true" /> </policyEntries> </policyMap> </destinationPolicy> <managementContext> <managementContext useMBeanServer="true" jmxDomainName="org.apache.activemq" createMBeanServer="true" createConnector="false" connectorPort="1100" connectorPath="/jmxrmi"/> </managementContext> <persistenceAdapter> <kahaDB directory="${activemq.data}/${broker.name}" journalMaxFileLength="32mb" enableJournalDiskSyncs="false" indexWriteBatchSize="1000" indexCacheSize="1000"/> </persistenceAdapter> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="512mb" /> </memoryUsage> </systemUsage> </systemUsage> <transportConnectors> <transportConnector uri="nio://0.0.0.0:61616" /> </transportConnectors> </broker> <jetty xmlns="http://mortbay.com/schemas/jetty/1.0"> <connectors> <nioConnector port="61617"/> </connectors> <handlers> <webAppContext contextPath="/admin" resourceBase="${activemq.base}/webapps/admin" logUrlOnStart="true"/> </handlers> </jetty> </beans> rajdavies wrote: > > Yes fusemq is based on activemq - though its on a different release > cycle - if this a bug in activemq we'll fix it asap > On 18 Jan 2010, at 14:33, Geir Magnusson Jr. wrote: > >> >> On Jan 18, 2010, at 8:05 AM, Maarten_D wrote: >> >>> >>> Hi Rob, >>> Thanks for the reply. I'll give that article a read and see if I >>> can post >>> some client code. >>> >>> Geir: we're in the process of testing the same setup with Fuse >>> Message >>> Broker, which should give us some indication of where the problem >>> lies. >> >> Isn't that based on ActiveMQ? :) >> >> geir >> >>> >>> Regards, >>> Maarten >>> >>> >>> rajdavies wrote: >>>> >>>> I think we'd really need to see the client code for the consumer >>>> too - >>>> its likely that the consumers are not acknowledging messages >>>> properly >>>> - so no more messages will be dispatched - as the broker thinks that >>>> all the consumers are still busy processing. >>>> >>>> Things to be aware of when using Spring's JmsTemplate - >>>> http://activemq.apache.org/jmstemplate-gotchas.html >>>> and here http://activemq.apache.org/spring-support.html - but this >>>> article is extremely informative too - >>>> http://codedependents.com/2009/10/16/efficient-lightweight-jms-with-spring-and-activemq/ >>>> >>>> cheers, >>>> >>>> Rob >>>> On 18 Jan 2010, at 09:44, Maarten_D wrote: >>>> >>>>> >>>>> Anyone have any ideas? >>>>> >>>>> >>>>> Maarten_D wrote: >>>>>> >>>>>> Hi, >>>>>> I've somehow gotten myself into the situation where I've got >>>>>> 50,000+ >>>>>> messages stuck in a queue for no apparent reason. Allow me to >>>>>> explain: >>>>>> >>>>>> I've got a system where a component rapidly produces messages that >>>>>> are put >>>>>> on topic (around 400 per second). A second component listens to >>>>>> the >>>>>> topic, >>>>>> takes the information from the messages that it sees, repackages >>>>>> the >>>>>> information in another message which it puts a on queue. A third >>>>>> component >>>>>> eats from the queue, and processes the information in the >>>>>> messages. >>>>>> >>>>>> Under large load (upwards of 1.5m messages) we were experiencing >>>>>> system >>>>>> hangs (no more messages were getting through), so I let loose the >>>>>> usual >>>>>> stable of performance analysis tools (JConsole and Sar) to see >>>>>> what >>>>>> was >>>>>> going one. Using the graphs produced with the sar info, you can >>>>>> clearly >>>>>> see the points at which the producer flow control kicks in for the >>>>>> topic: >>>>>> the cpu io-wait skyrockets and the JMX counters for the topic >>>>>> grind >>>>>> to a >>>>>> halt. What's troubling, however, is that nothing else seems to be >>>>>> working >>>>>> either (ie the downstream queues that were filled from the topic >>>>>> don't >>>>>> seem to be emptied). >>>>>> >>>>>> Things got even stranger when I killed the producer, thereby >>>>>> cutting the >>>>>> influx of new messages to the topic. The system is now in a stable >>>>>> state, >>>>>> with the amount of enqueued messages on the topic equal to the >>>>>> dequeued >>>>>> amount. However, there are more than 50,000 messages in the first >>>>>> queue, >>>>>> waiting to be processed. All of the listeners that are supposed to >>>>>> eat >>>>>> from this queue are blocked with the following stacktrace: >>>>>> >>>>>> java.lang.Thread.State: TIMED_WAITING on java.lang.obj...@6e186c3f >>>>>> at java.lang.Object.wait(Native Method) >>>>>> at >>>>>> org >>>>>> .apache >>>>>> .activemq >>>>>> .MessageDispatchChannel.dequeue(MessageDispatchChannel.java:77) >>>>>> at >>>>>> org >>>>>> .apache >>>>>> .activemq >>>>>> .ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:428) >>>>>> at >>>>>> org >>>>>> .apache >>>>>> .activemq >>>>>> .ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:554) >>>>>> at >>>>>> org >>>>>> .springframework >>>>>> .jms >>>>>> .listener >>>>>> .AbstractPollingMessageListenerContainer >>>>>> .receiveMessage(AbstractPollingMessageListenerContainer.java:405) >>>>>> at >>>>>> org >>>>>> .springframework >>>>>> .jms >>>>>> .listener >>>>>> .AbstractPollingMessageListenerContainer >>>>>> .doReceiveAndExecute(AbstractPollingMessageListenerContainer.java: >>>>>> 308) >>>>>> at >>>>>> org >>>>>> .springframework >>>>>> .jms >>>>>> .listener >>>>>> .AbstractPollingMessageListenerContainer >>>>>> .receiveAndExecute(AbstractPollingMessageListenerContainer.java: >>>>>> 261) >>>>>> at >>>>>> org.springframework.jms.listener.DefaultMessageListenerContainer >>>>>> $ >>>>>> AsyncMessageListenerInvoker >>>>>> .invokeListener(DefaultMessageListenerContainer.java:982) >>>>>> at >>>>>> org.springframework.jms.listener.DefaultMessageListenerContainer >>>>>> $ >>>>>> AsyncMessageListenerInvoker >>>>>> .executeOngoingLoop(DefaultMessageListenerContainer.java:974) >>>>>> at >>>>>> org.springframework.jms.listener.DefaultMessageListenerContainer >>>>>> $ >>>>>> AsyncMessageListenerInvoker >>>>>> .run(DefaultMessageListenerContainer.java:876) >>>>>> at java.lang.Thread.run(Thread.java:619) >>>>>> >>>>>> I've used JConsole to stop and start the (tcp) connector several >>>>>> times, >>>>>> and each time (strangely) around 2075 messages have been eaten >>>>>> from >>>>>> the >>>>>> queue by the consumers, after which things freeze again. If I >>>>>> restart the >>>>>> entire broker, around 800 messages are eaten from the queue before >>>>>> things >>>>>> stagnate again. >>>>>> >>>>>> My basic question is: what is going on, and how can I prevent >>>>>> those >>>>>> messages from getting stuck in the queue? >>>>>> >>>>>> To make matters even more interesting, I ran another test a >>>>>> while ago >>>>>> where 10 million messages were fed to the same setup without a >>>>>> hitch. The >>>>>> only difference between that test and the one I'm running now >>>>>> was the >>>>>> nature of the message. The average size of a message in both >>>>>> sets is >>>>>> almost equal, but during the current test message sizes vary more >>>>>> than >>>>>> during the succesfull one. >>>>>> >>>>>> Anyone have any ideas? Below are a bunch of relevant settings an >>>>>> my >>>>>> activemq.xml config. We code to activemq using the Spring >>>>>> JMSTemplate. >>>>>> >>>>>> Thanks in advance, >>>>>> Maarten >>>>>> >>>>>> ActiveMQ: 5.3.0 >>>>>> Java: 1.6.0_17 >>>>>> Spring: 2.5.6 >>>>>> Connector URL: tcp://localhost:61616 >>>>>> JMS receivetimeout: 30000 >>>>>> JMS Acknowledgemode: CLIENT_ACKNOWLEDGE >>>>>> JMS Session transacted: false >>>>>> >>>>>> <beans xmlns="http://www.springframework.org/schema/beans" >>>>>> xmlns:amq="http://activemq.apache.org/schema/core" >>>>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >>>>>> xsi:schemaLocation="http://www.springframework.org/schema/beans >>>>>> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd >>>>>> http://activemq.apache.org/schema/core >>>>>> http://activemq.apache.org/schema/core/activemq-core.xsd >>>>>> http://mortbay.com/schemas/jetty/1.0 >>>>>> http://jetty.mortbay.org/jetty.xsd >>>>>> "> >>>>>> >>>>>> <broker id="broker" useJmx="true" brokerName="testbroker" >>>>>> start="true" >>>>>> xmlns="http://activemq.apache.org/schema/core" >>>>>> dataDirectory="/var/amqdata"> >>>>>> >>>>>> <destinationPolicy> >>>>>> <policyMap> >>>>>> <policyEntries> >>>>>> <policyEntry queue=">" memoryLimit="32 mb" >>>>>> producerFlowControl="true" /> >>>>>> <policyEntry topic=">" memoryLimit="32 mb" >>>>>> producerFlowControl="true" /> >>>>>> </policyEntries> >>>>>> </policyMap> >>>>>> </destinationPolicy> >>>>>> >>>>>> <managementContext> >>>>>> <managementContext >>>>>> useMBeanServer="true" >>>>>> jmxDomainName="org.apache.activemq" >>>>>> createMBeanServer="true" >>>>>> createConnector="false" >>>>>> connectorPort="1100" >>>>>> connectorPath="/jmxrmi"/> >>>>>> </managementContext> >>>>>> >>>>>> <persistenceAdapter> >>>>>> <amqPersistenceAdapter >>>>>> syncOnWrite="false" >>>>>> directory="/var/amqdata/testbroker" >>>>>> indexBinSize="8192" >>>>>> cleanupInterval="300000" >>>>>> indexPageSize="64 kb" >>>>>> maxFileLength="256 mb" >>>>>> archiveDataLogs="false"/> >>>>>> </persistenceAdapter> >>>>>> >>>>>> <systemUsage> >>>>>> <systemUsage> >>>>>> <memoryUsage> >>>>>> <memoryUsage limit="512 mb" /> >>>>>> </memoryUsage> >>>>>> </systemUsage> >>>>>> </systemUsage> >>>>>> >>>>>> <transportConnectors> >>>>>> <transportConnector uri="tcp://localhost:61616" /> >>>>>> </transportConnectors> >>>>>> >>>>>> </broker> >>>>>> >>>>>> </beans> >>>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://old.nabble.com/50k-%2B-messages-stuck-in-queue-with-all-consumers-blocking-on-receive-tp27162095p27208079.html >>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>>>> >>>> >>>> Rob Davies >>>> http://twitter.com/rajdavies >>>> I work here: http://fusesource.com >>>> My Blog: http://rajdavies.blogspot.com/ >>>> I'm writing this: http://www.manning.com/snyder/ >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>> -- >>> View this message in context: >>> http://old.nabble.com/50k-%2B-messages-stuck-in-queue-with-all-consumers-blocking-on-receive-tp27162095p27210211.html >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >>> >> > > Rob Davies > http://twitter.com/rajdavies > I work here: http://fusesource.com > My Blog: http://rajdavies.blogspot.com/ > I'm writing this: http://www.manning.com/snyder/ > > > > > > > -- View this message in context: http://old.nabble.com/50k-%2B-messages-stuck-in-queue-with-all-consumers-blocking-on-receive-tp27162095p27261165.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.
