Hi,
It is more like a ActiveMQ's issue.
Did you send this issue into the [email protected] ?
Willem
dcheckoway wrote:
> Help!
>
> I have a transactional consumer invoked by Camel for messages on
> "testQueueA" -- it ends up producing/sending thousands of messages to the
> JMS queue "testQueueB". Depending upon how many messages need to be sent,
> this scenario hangs. This happens when talking to ActiveMQ via TCP.
>
> I wrote a simplified test case, and one interesting thing I found was that
> the "bigger" my messages were, the fewer would be sent before everything
> hangs. It implied to me that there was some sort of buffer limit being
> imposed.
>
> Here's the code:
>
> import java.util.logging.Logger;
> import org.apache.camel.*;
> import org.springframework.beans.factory.annotation.*;
>
> public class ConsumerThatAlsoProduces {
> Logger logger = Logger.getLogger(getClass().getName());
> @Autowired
> ProducerTemplate producerTemplate;
> @Autowired
> @Qualifier("testQueueB")
> Endpoint testQueueB;
>
> public void onTestMessage(TestMessage msg) {
> logger.info("Received from A: " + msg);
> final int numToProduce = 20000;
> logger.info("Producing " + numToProduce + " messages on queueB");
> for (int k = 1; k <= numToProduce; ++k) {
> logger.info("Sending " + k + " of " + numToProduce);
> producerTemplate.sendBody(testQueueB, new TestMessage(k));
> }
> }
> }
>
> When I force a thread dump, this is what I'm seeing for the thread of
> interest:
>
> "DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
> nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
> java.lang.Thread.State: RUNNABLE
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
> at
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
> at java.io.DataOutputStream.write(DataOutputStream.java:90)
> - locked <0x000000010703aa28> (a java.io.DataOutputStream)
> at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
> at
> org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
> at
> org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
> at
> org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
> at
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
> - locked <0x000000010701bb78> (a
> org.apache.activemq.openwire.OpenWireFormat)
> at
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
> at
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
> - locked <0x000000010705a298> (a
> java.util.concurrent.atomic.AtomicBoolean)
> at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
> at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
> at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
> - locked <0x0000000106fdd0e8> (a java.lang.Object)
> at
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> at
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
> at
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
> at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
> - locked <0x0000000106fe9f00> (a java.lang.Object)
> at
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
> - locked <0x0000000107095d88> (a
> org.apache.activemq.ActiveMQMessageProducer)
> at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
> at
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
> at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
> at
> org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
> at
> org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
> at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
> at
> org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
> at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
> at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
> at
> org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
> at
> org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
> at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
> at
> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
> at
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
> at
> ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
> at
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
> at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
> at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
> at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> at
> org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
> at
> org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
> at
> org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
> at
> org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
> at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
> at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
> at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
> at
> org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
> at
> org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
> at
> org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
> at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
> at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
> at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
> at java.lang.Thread.run(Thread.java:637)
>
>
> I'm pretty sure the problem is that since my consumer is running inside a
> JMS transaction, the messages it tries to send are being "buffered" by the
> ActiveMQ server until the transaction commits. Thus the catch...the
> transaction won't commit until my consumer returns, which won't happen until
> all of the messages are sent.
>
> On this link http://activemq.apache.org/how-do-transactions-work.html
> http://activemq.apache.org/how-do-transactions-work.html I noticed this
> comment:
>
> "Now the operations carried out on a transacted session inside a
> transaction, like a send message or acknowledge message, do not really
> perform a real send or acknowledge until the commit occurs. So the Broker
> explicitly handles these cases separately - essentially buffering up the
> commands until the commit occurs when the messages are really sent or
> acknowledged."
>
> What I find interesting is that if I run an embedded ActiveMQ broker and
> talk via vm://localhost instead of tcp://localhost:61616, the problem goes
> away. This ONLY happens when talking TCP.
>
> FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
> tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
> 2.0. Both straight out of the box. Same results with both old & new
> versions.
>
> Is this a bug in ActiveMQ? Does it really impose some weenie-small limit on
> what you can post back to the server while still in a transaction? Is it a
> hard limit? Configurable? Have I screwed up my configuration? Should I be
> using some different method to have my consumer in turn produce more
> messages transactionally?
>
> I'm attaching a zip of my test (minus the "lib" dir, which contains all the
> typical stuff you'd expect in an ActiveMQ/Camel setup). Maybe somebody can
> take a peek and let me know what I'm doing wrong.
>
> http://www.nabble.com/file/p23824159/camel-activemq-bug.tar.gz
> camel-activemq-bug.tar.gz
> http://www.nabble.com/file/p23824159/camel-activemq-bug.zip
> camel-activemq-bug.zip
>
> Help! Thanks!!