Hi Everyone, We are using Camel 2.13.2 and ActiveMQ 5.9.0. The configuration of activemq broker is as follows: <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="configuration" ref="jmsConfig" /> <property name="transacted" value="false" /> <property name="acceptMessagesWhileStopping" value="false" /> <property name="cacheLevelName" value="CACHE_CONSUMER" /> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="$[ren.brokerUrl]" /> <property name="useAsyncSend" value="true" /> </bean>
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> <property name="maxConnections" value="10" /> <property name="maximumActiveSessionPerConnection" value="-1" /> <property name="expiryTimeout" value="0" /> <property name="idleTimeout" value="0" /> <property name="connectionFactory" ref="jmsConnectionFactory" /> </bean> <bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="pooledConnectionFactory" /> <property name="concurrentConsumers" value="15" /> <property name="maxConcurrentConsumers" value="15" /> <property name="maxMessagesPerTask" value="5" /> <property name="idleTaskExecutionLimit" value="0" /> <property name="idleConsumerLimit" value="0" /> </bean> As mentioned, we have concurrent and max concurrent consumers having value 15. We have following route sample: <route id="openApiRoute" trace="true" autoStartup="false" xmlns="http://camel.apache.org/schema/blueprint"> <from uri="openAPI" /> <onException> <exception>java.lang.Throwable</exception> <handled> <constant>true</constant> </handled> <bean ref="errorHandlerBean" /> <convertBodyTo type="com.company.openapi.SoapMap" /> </onException> <onException> <exception>java.lang.Exception</exception> <handled> <constant>true</constant> </handled> <bean ref="errorHandlerBean" /> <convertBodyTo type="com.company.openapi.SoapMap" /> </onException> <onException> <exception>org.apache.camel.ExchangeTimedOutException</exception> <redeliveryPolicy logRetryAttempted="true" retryAttemptedLogLevel="WARN" maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}" redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" /> <handled> <simple>${header.requestType} != 'broadcast'</simple> </handled> <choice> <when> <simple>${header.requestType} != 'broadcast'</simple> <bean ref="errorHandlerBean" /> <convertBodyTo type="com.company.openapi.SoapMap" /> </when> </choice> </onException> <convertBodyTo type="java.util.Map" /> <choice> <when> <simple>${header.requestType} == 'broadcast'</simple> <bean ref ="broadcasterBean"/> <recipientList parallelProcessing="true" strategyRef="aggregatorStrategy" streaming="true" stopOnException="false" prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}"> <header>recipientList</header> </recipientList> <process ref="broadcastResultProcessor" /> </when> <when> <simple>${header.requestType} == 'forward'</simple> <bean ref="dynamicRouterBean" method="route" /> </when> <otherwise> <bean ref="operationTypeNotSupported" method="throwOperationTypeNotSupportedException" /> </otherwise> </choice> <convertBodyTo type="com.company.openapi.SoapMap" /> </route> What we are essentially doing, is fetching a request on CXF and forwarding it to a jms endpoint using IN/OUT messaging. We use dynamic router for performing this operation dynamically. A sample dynamic url for this jms endpoint is : activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive Now few problematic behaviors of automatic consumer creation on responseQueue are: 1. Whenever the route starts and requests are sent, 15 consumers are created on the response queue. After that no matter how much the load is, these 15 consumers are handling the requests properly (as per our tps expectations). Now if we dynamically update the above sample jms url in our system (even just the value of requestTimeout) and send another request, we see that the reponseQ now has additional 15 (total of 30) consumers on it. And right after this, most of the requests start failing for unable to map the response to the request. The message is sent in the request queue, picked up by the client, the response is sent, which we can see being enqueued adn dequeued in the response queue (from activeMQ web console), the response is fetched by our application but QueueReplyManager fails to map it to original request, mentioning Response received for unknown correlationId xyz. And just after a few seconds we see the exchangetimeout happening for the message with same correlationid. Is this because the new set of 15 consumers got created in different session than the original one and hence they are not able to share the correlationId? Even though I have maxConcurrentConsumers set to 15 for that specific queue, why is it still increasing beyond that value? (for every change in url, it keeps on increasing by 15). 2. We have same behaviour when we use the same queue/endpoint url for single request and in receipient list. The single forward request registers its own 15 consumers and when we perform a receipient list url, it creates additional 15 consumers on the queues (again maxconcurrentconsumers is 15 only). 3. When use the dynamic router (routing to the same jms in/out url) in multiple routes, we always have 15 consumers on the response queue. But we use receipient list in multiple routes (routing to same set of jms in/out urls), it creates set of 15 consumers per route (meaning if we have 3 routes it creates 45 consumers). And again we start getting the same issue of response not being mapped to the request for unknown correlation id. Is there any book/blog from where I can understand how and why is this happening? How does camel/activemq creates and manages consumers on queues, specially when we are using the building blocks of dynamic router, receipient list, activemq endpoint etc. I have been trying to understand and find a correct solution for this since couple of weeks now. Please help me in this. Thanks. BR! Yogesh -- View this message in context: http://activemq.2283324.n4.nabble.com/ActiveMQ-using-in-Camel-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-mors-tp4693377.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.