Hi When doing request/reply over JMS using camel-jms then Camel uses the concurrentConsumers settings you may have configured on the component / endpoint.
So if you only want 1 consumer, then set that on the endpoint Though I have logged a ticket to allow having separate options for regular concurrent consumers vs for request/reply https://issues.apache.org/jira/browse/CAMEL-8503 On Tue, Mar 17, 2015 at 5:09 PM, agentalpha <yogesh.l...@gmail.com> wrote: > Hi Everyone, > > We are using Camel 2.13.2 and ActiveMQ 5.9.0. > The configuration of activemq broker is as follows: > <bean id="activemq" > class="org.apache.activemq.camel.component.ActiveMQComponent"> > <property name="configuration" ref="jmsConfig" /> > <property name="transacted" value="false" /> > <property name="acceptMessagesWhileStopping" value="false" /> > <property name="cacheLevelName" value="CACHE_CONSUMER" /> > </bean> > > <bean id="jmsConnectionFactory" > class="org.apache.activemq.ActiveMQConnectionFactory"> > <property name="brokerURL" value="$[ren.brokerUrl]" /> > <property name="useAsyncSend" value="true" /> > </bean> > > <bean id="pooledConnectionFactory" > class="org.apache.activemq.pool.PooledConnectionFactory" > init-method="start" destroy-method="stop"> > <property name="maxConnections" value="10" /> > <property name="maximumActiveSessionPerConnection" value="-1" > /> > <property name="expiryTimeout" value="0" /> > <property name="idleTimeout" value="0" /> > <property name="connectionFactory" ref="jmsConnectionFactory" > /> > </bean> > > <bean id="jmsConfig" > class="org.apache.camel.component.jms.JmsConfiguration"> > <property name="connectionFactory" > ref="pooledConnectionFactory" /> > <property name="concurrentConsumers" value="15" /> > <property name="maxConcurrentConsumers" value="15" /> > <property name="maxMessagesPerTask" value="5" /> > <property name="idleTaskExecutionLimit" value="0" /> > <property name="idleConsumerLimit" value="0" /> > </bean> > > > As mentioned, we have concurrent and max concurrent consumers having value > 15. > > We have following route sample: > <route id="openApiRoute" trace="true" autoStartup="false" > xmlns="http://camel.apache.org/schema/blueprint"> > <from uri="openAPI" /> > <onException> > <exception>java.lang.Throwable</exception> > <handled> > <constant>true</constant> > </handled> > <bean ref="errorHandlerBean" /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </onException> > <onException> > <exception>java.lang.Exception</exception> > <handled> > <constant>true</constant> > </handled> > <bean ref="errorHandlerBean" /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </onException> > <onException> > > <exception>org.apache.camel.ExchangeTimedOutException</exception> > <redeliveryPolicy logRetryAttempted="true" > retryAttemptedLogLevel="WARN" > > maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}" > > redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" /> > <handled> > <simple>${header.requestType} != > 'broadcast'</simple> > </handled> > <choice> > <when> > <simple>${header.requestType} > != 'broadcast'</simple> > <bean ref="errorHandlerBean" > /> > <convertBodyTo > type="com.company.openapi.SoapMap" /> > </when> > </choice> > </onException> > > <convertBodyTo type="java.util.Map" /> > <choice> > <when> > <simple>${header.requestType} == > 'broadcast'</simple> > <bean ref ="broadcasterBean"/> > <recipientList > parallelProcessing="true" > strategyRef="aggregatorStrategy" streaming="true" stopOnException="false" > prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}"> > > <header>recipientList</header> > </recipientList> > <process > ref="broadcastResultProcessor" /> > </when> > <when> > <simple>${header.requestType} == > 'forward'</simple> > <bean ref="dynamicRouterBean" > method="route" /> > </when> > <otherwise> > <bean ref="operationTypeNotSupported" > method="throwOperationTypeNotSupportedException" /> > </otherwise> > </choice> > <convertBodyTo type="com.company.openapi.SoapMap" /> > </route> > > > What we are essentially doing, is fetching a request on CXF and forwarding > it to a jms endpoint using IN/OUT messaging. We use dynamic router for > performing this operation dynamically. > A sample dynamic url for this jms endpoint is : > > activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive > > Now few problematic behaviors of automatic consumer creation on > responseQueue are: > 1. Whenever the route starts and requests are sent, 15 consumers are created > on the response queue. After that no matter how much the load is, these 15 > consumers are handling the requests properly (as per our tps expectations). > Now if we dynamically update the above sample jms url in our system (even > just the value of requestTimeout) and send another request, we see that the > reponseQ now has additional 15 (total of 30) consumers on it. And right > after this, most of the requests start failing for unable to map the > response to the request. > The message is sent in the request queue, picked up by the client, the > response is sent, which we can see being enqueued adn dequeued in the > response queue (from activeMQ web console), the response is fetched by our > application but QueueReplyManager fails to map it to original request, > mentioning Response received for unknown correlationId xyz. And just after a > few seconds we see the exchangetimeout happening for the message with same > correlationid. > Is this because the new set of 15 consumers got created in different session > than the original one and hence they are not able to share the > correlationId? > Even though I have maxConcurrentConsumers set to 15 for that specific queue, > why is it still increasing beyond that value? (for every change in url, it > keeps on increasing by 15). > > 2. We have same behaviour when we use the same queue/endpoint url for single > request and in receipient list. > The single forward request registers its own 15 consumers and when we > perform a receipient list url, it creates additional 15 consumers on the > queues (again maxconcurrentconsumers is 15 only). > > 3. When use the dynamic router (routing to the same jms in/out url) in > multiple routes, we always have 15 consumers on the response queue. > But we use receipient list in multiple routes (routing to same set of jms > in/out urls), it creates set of 15 consumers per route (meaning if we have 3 > routes it creates 45 consumers). > And again we start getting the same issue of response not being mapped to > the request for unknown correlation id. > > Is there any book/blog from where I can understand how and why is this > happening? How does camel/activemq creates and manages consumers on queues, > specially when we are using the building blocks of dynamic router, > receipient list, activemq endpoint etc. > > I have been trying to understand and find a correct solution for this since > couple of weeks now. > Please help me in this. > Thanks. > > BR! > Yogesh > > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Camel-ActiveMQ-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-more-than-mas-tp5764288.html > Sent from the Camel - Users mailing list archive at Nabble.com. -- Claus Ibsen ----------------- Red Hat, Inc. Email: cib...@redhat.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/