Hi,

It is more like a ActiveMQ's issue.
Did you send this issue into the [email protected] ?

Willem

dcheckoway wrote:
> Help!
> 
> I have a transactional consumer invoked by Camel for messages on
> "testQueueA" -- it ends up producing/sending thousands of messages to the
> JMS queue "testQueueB".  Depending upon how many messages need to be sent,
> this scenario hangs.  This happens when talking to ActiveMQ via TCP.
> 
> I wrote a simplified test case, and one interesting thing I found was that
> the "bigger" my messages were, the fewer would be sent before everything
> hangs.  It implied to me that there was some sort of buffer limit being
> imposed.
> 
> Here's the code:
> 
> import java.util.logging.Logger;
> import org.apache.camel.*;
> import org.springframework.beans.factory.annotation.*;
> 
> public class ConsumerThatAlsoProduces {
>     Logger logger = Logger.getLogger(getClass().getName());
>     @Autowired
>     ProducerTemplate producerTemplate;
>     @Autowired
>     @Qualifier("testQueueB")
>     Endpoint testQueueB;
>     
>     public void onTestMessage(TestMessage msg) {
>         logger.info("Received from A: " + msg);
>         final int numToProduce = 20000;
>         logger.info("Producing " + numToProduce + " messages on queueB");
>         for (int k = 1; k <= numToProduce; ++k) {
>             logger.info("Sending " + k + " of " + numToProduce);
>             producerTemplate.sendBody(testQueueB, new TestMessage(k));
>         }
>     }
> }
> 
> When I force a thread dump, this is what I'm seeing for the thread of
> interest:
> 
> "DefaultMessageListenerContainer-1" prio=5 tid=0x00000001019ff800
> nid=0x111cae000 runnable [0x0000000111cab000..0x0000000111cadad0]
>    java.lang.Thread.State: RUNNABLE
>       at java.net.SocketOutputStream.socketWrite0(Native Method)
>       at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
>       at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
>       at
> org.apache.activemq.transport.tcp.TcpBufferedOutputStream.write(TcpBufferedOutputStream.java:96)
>       at java.io.DataOutputStream.write(DataOutputStream.java:90)
>       - locked <0x000000010703aa28> (a java.io.DataOutputStream)
>       at
> org.apache.activemq.openwire.v3.BaseDataStreamMarshaller.tightMarshalByteSequence2(BaseDataStreamMarshaller.java:432)
>       at
> org.apache.activemq.openwire.v3.MessageMarshaller.tightMarshal2(MessageMarshaller.java:173)
>       at
> org.apache.activemq.openwire.v3.ActiveMQMessageMarshaller.tightMarshal2(ActiveMQMessageMarshaller.java:90)
>       at
> org.apache.activemq.openwire.v3.ActiveMQObjectMessageMarshaller.tightMarshal2(ActiveMQObjectMessageMarshaller.java:90)
>       at
> org.apache.activemq.openwire.OpenWireFormat.marshal(OpenWireFormat.java:240)
>       - locked <0x000000010701bb78> (a
> org.apache.activemq.openwire.OpenWireFormat)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:166)
>       at
> org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:237)
>       - locked <0x000000010705a298> (a 
> java.util.concurrent.atomic.AtomicBoolean)
>       at
> org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:83)
>       at
> org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:104)
>       at
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
>       - locked <0x0000000106fdd0e8> (a java.lang.Object)
>       at
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
>       at
> org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
>       at
> org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
>       at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1676)
>       - locked <0x0000000106fe9f00> (a java.lang.Object)
>       at
> org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:231)
>       at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:74)
>       - locked <0x0000000107095d88> (a
> org.apache.activemq.ActiveMQMessageProducer)
>       at org.apache.activemq.pool.PooledProducer.send(PooledProducer.java:59)
>       at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:597)
>       at
> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:237)
>       at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:574)
>       at 
> org.springframework.jms.core.JmsTemplate$4.doInJms(JmsTemplate.java:551)
>       at 
> org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:471)
>       at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:548)
>       at 
> org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:301)
>       at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:165)
>       at
> org.apache.camel.impl.ProducerCache$1.doInProducer(ProducerCache.java:151)
>       at 
> org.apache.camel.impl.ProducerCache.doInProducer(ProducerCache.java:136)
>       at 
> org.apache.camel.impl.ProducerCache.sendExchange(ProducerCache.java:150)
>       at org.apache.camel.impl.ProducerCache.send(ProducerCache.java:86)
>       at
> org.apache.camel.impl.DefaultProducerTemplate.send(DefaultProducerTemplate.java:98)
>       at
> org.apache.camel.impl.DefaultProducerTemplate.sendBody(DefaultProducerTemplate.java:111)
>       at 
> ConsumerThatAlsoProduces.onTestMessage(ConsumerThatAlsoProduces.java:19)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:173)
>       at 
> org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:95)
>       at
> org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:111)
>       at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
>       at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
>       at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:61)
>       at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
>       at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
>       at
> org.apache.camel.spring.spi.TransactionErrorHandler$1.doInTransactionWithoutResult(TransactionErrorHandler.java:110)
>       at
> org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:33)
>       at
> org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:128)
>       at
> org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:80)
>       at
> org.apache.camel.processor.interceptor.StreamCachingInterceptor.process(StreamCachingInterceptor.java:52)
>       at
> org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:52)
>       at
> org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:147)
>       at
> org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:54)
>       at
> org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:48)
>       at
> org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:76)
>       at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:543)
>       at
> org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:482)
>       at
> org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:451)
>       at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:323)
>       at
> org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:241)
>       at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:982)
>       at
> org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:881)
>       at java.lang.Thread.run(Thread.java:637)
> 
> 
> I'm pretty sure the problem is that since my consumer is running inside a
> JMS transaction, the messages it tries to send are being "buffered" by the
> ActiveMQ server until the transaction commits.  Thus the catch...the
> transaction won't commit until my consumer returns, which won't happen until
> all of the messages are sent.
> 
> On this link  http://activemq.apache.org/how-do-transactions-work.html
> http://activemq.apache.org/how-do-transactions-work.html  I noticed this
> comment:
> 
> "Now the operations carried out on a transacted session inside a
> transaction, like a send message or acknowledge message, do not really
> perform a real send or acknowledge until the commit occurs. So the Broker
> explicitly handles these cases separately - essentially buffering up the
> commands until the commit occurs when the messages are really sent or
> acknowledged."
> 
> What I find interesting is that if I run an embedded ActiveMQ broker and
> talk via vm://localhost instead of tcp://localhost:61616, the problem goes
> away.  This ONLY happens when talking TCP.
> 
> FWIW, I've been using Camel 1.6.1 with ActiveMQ 5.2.0, but I have also
> tested this against ActiveMQ 5.3-SNAPSHOT (June 1, 2009), which uses Camel
> 2.0.  Both straight out of the box.  Same results with both old & new
> versions.
> 
> Is this a bug in ActiveMQ?  Does it really impose some weenie-small limit on
> what you can post back to the server while still in a transaction?  Is it a
> hard limit?  Configurable?  Have I screwed up my configuration?  Should I be
> using some different method to have my consumer in turn produce more
> messages transactionally?
> 
> I'm attaching a zip of my test (minus the "lib" dir, which contains all the
> typical stuff you'd expect in an ActiveMQ/Camel setup).  Maybe somebody can
> take a peek and let me know what I'm doing wrong.
> 
> http://www.nabble.com/file/p23824159/camel-activemq-bug.tar.gz
> camel-activemq-bug.tar.gz 
> http://www.nabble.com/file/p23824159/camel-activemq-bug.zip
> camel-activemq-bug.zip 
> 
> Help!  Thanks!!

Reply via email to