Submitted https://issues.apache.org/activemq/browse/AMQ-2612 & attached test case.
Thanks, Pavel On Tue, Feb 16, 2010 at 4:04 PM, Pavel <[email protected]> wrote: > Hi Gary, > > prefetch=1 alone would not work for my case; > > We build CXF web services with JMS transport on top of ActiveMQ. > > And syncronous cyclic "(client)->A->B->A" calls sometimes lead to deadlock: > "B->A" message gets into prefetch buffer of the very same consumer that > currently handles "(client)->A" message. > We have load tests that highlight this issue. And polling consumer was a > workaround until we figured out the reconnect implications. > > I'll try to rework my example into unit test and then file a jira as you > suggested. > > Thanks, > Pavel > > > On Mon, Feb 15, 2010 at 10:13 PM, Gary Tully <[email protected]> wrote: > >> Have not looked into the test case yet, but when prefetch=0, a consumer is >> polling for messages. The broker will not actively dispatch messages to >> that >> consumer. On each call to receive, the consumer sends a pull command to >> the >> broker and then waits for a message dispatch. If failover occurs between >> the >> send of the pull and before the dispatch, the receive will remain blocked >> as >> the pull command will be lost and not replayed. >> >> One solution is to have the state tracker track message pull commands so >> that they can be replayed. Afaik that is not there at the moment. Can you >> raise an enhancement request for that? >> >> For your use case though, is prefetch=1 an option? >> >> On 15 February 2010 17:35, Pavel <[email protected]> wrote: >> >> > Here is another example, this time activemq + Spring; no CXF at all. >> > >> > Any ideas? >> > >> > Thanks, >> > Pavel >> > >> > >> > On Sat, Feb 13, 2010 at 3:04 PM, Pavel <[email protected]> wrote: >> > >> >> Hi, >> >> >> >> I'm hitting an issue with activeMQ reconnect. >> >> >> >> We are using CXF and SOAP over JMS with ActiveMQ. Also, at some point >> we >> >> started using consumer.prefetchSize=0. This is to prevent deadlocks >> when >> >> doing cyclic synchronous webservice calls, like >> A.foo()->B.bar()->A.baz(). >> >> >> >> So, attached is a small example, echo-like webservice called over JMS >> >> every 2 seconds. >> >> ActiveMQ 5.3 or Fuse 5.3.0.5. >> >> >> >> activemq.broker.url=failover:(tcp://localhost:61616) >> >> >> jms.destinationQueueName=SAMPLE___SERVICE___QUEUE?consumer.prefetchSize=0 >> >> >> >> >> >> All works well, until AMQ restart. Once restart happens, client seems >> to >> >> reconnect (I can see consumer in a web console), but starts timing out >> [1]. >> >> AMQ logs contain several messages like this: >> >> DEBUG | SAMPLE___SERVICE___QUEUE toPageIn: 1, Inflight: 0, >> >> pagedInMessages.size 5 >> >> >> >> at some point this follows by a bunch of >> >> DEBUG | Message expired Message >> >> ID:EPBYMINW0436-4688-1266064267343-0:0:22:1:1 dropped=false acked=false >> >> locked=false >> >> >> >> >> >> After application restart things are back to normal - until the next >> >> reconnect. >> >> >> >> If I simply start consumers with AMQ down, then start AMQ - connect >> works >> >> fine and messages start flowing. So it looks like the issue is with >> >> reconnect only. >> >> >> >> Am I hitting some known issue? Are there solutions/workarounds for >> that? >> >> >> >> >> >> [1] Client stacktrace. >> >> Caused by: java.lang.RuntimeException: Timeout receiving message with >> >> correlationId e10ec6062aec42d899e3c36d1171c6530000000000000032 >> >> at >> >> >> org.apache.cxf.transport.jms.JMSConduit.sendExchange(JMSConduit.java:206) >> >> at >> >> >> org.apache.cxf.transport.jms.JMSOutputStream.doClose(JMSOutputStream.java:56) >> >> at >> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:185) >> >> at >> >> >> org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47) >> >> at >> >> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:188) >> >> at >> >> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66) >> >> at >> >> >> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62) >> >> at >> >> >> org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:236) >> >> at >> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:472) >> >> at >> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:302) >> >> at >> org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:254) >> >> at >> >> org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73) >> >> at >> >> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:123) >> >> >> >> >> >> Thanks, >> >> Pavel >> > >> > >> > >> > >> >> >> -- >> http://blog.garytully.com >> >> Open Source Integration >> http://fusesource.com >> > > >
