Hi Everyone, 

We are using Camel 2.13.2 and ActiveMQ 5.9.0. 
The configuration of activemq broker is as follows: 
        <bean id="activemq"
class="org.apache.activemq.camel.component.ActiveMQComponent">
                <property name="configuration" ref="jmsConfig" />
                <property name="transacted" value="false" />
                <property name="acceptMessagesWhileStopping" value="false"
/>
                <property name="cacheLevelName" value="CACHE_CONSUMER" />
        </bean>
        
        <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="$[ren.brokerUrl]" />
                <property name="useAsyncSend" value="true" />
        </bean>

        <bean id="pooledConnectionFactory"
class="org.apache.activemq.pool.PooledConnectionFactory" 
                init-method="start" destroy-method="stop">
                <property name="maxConnections" value="10" />
                <property name="maximumActiveSessionPerConnection"
value="-1" />
                <property name="expiryTimeout" value="0" />
                <property name="idleTimeout" value="0" />
                <property name="connectionFactory"
ref="jmsConnectionFactory" />
        </bean>
        
        <bean id="jmsConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
                <property name="connectionFactory"
ref="pooledConnectionFactory" />
                <property name="concurrentConsumers" value="15" />
                <property name="maxConcurrentConsumers" value="15" />
                <property name="maxMessagesPerTask" value="5" />
                <property name="idleTaskExecutionLimit" value="0" />
                <property name="idleConsumerLimit" value="0" />
        </bean>


As mentioned, we have concurrent and max concurrent consumers having value
15. 

We have following route sample: 
<route id="openApiRoute" trace="true" autoStartup="false" 
                        xmlns="http://camel.apache.org/schema/blueprint";>
                        <from uri="openAPI" />
                        <onException>
                                <exception>java.lang.Throwable</exception>
                                <handled>
                                        <constant>true</constant>
                                </handled>
                                <bean ref="errorHandlerBean" />
                                <convertBodyTo
type="com.company.openapi.SoapMap" />
                        </onException>
                        <onException>
                                <exception>java.lang.Exception</exception>
                                <handled>
                                        <constant>true</constant>
                                </handled>
                                <bean ref="errorHandlerBean" />
                                <convertBodyTo
type="com.company.openapi.SoapMap" />
                        </onException>
                        <onException>
                               
<exception>org.apache.camel.ExchangeTimedOutException</exception>
                                <redeliveryPolicy logRetryAttempted="true" 
                                        retryAttemptedLogLevel="WARN" 
                                       
maximumRedeliveries="{{ren.context.camel.openApiRoute.maximumRedeliveries}}" 
                                       
redeliveryDelay="{{ren.context.camel.openApiRoute.redeliveryDelay}}" />
                                <handled>
                                        <simple>${header.requestType} !=
'broadcast'</simple>
                                </handled>
                                <choice>
                                        <when>
                                               
<simple>${header.requestType} != 'broadcast'</simple>
                                                <bean ref="errorHandlerBean"
/>
                                                <convertBodyTo
type="com.company.openapi.SoapMap" />
                                        </when>
                                </choice>
                        </onException>
                        
                        <convertBodyTo type="java.util.Map" />
                        <choice>
                                <when>
                                        <simple>${header.requestType} ==
'broadcast'</simple>
                                        <bean ref ="broadcasterBean"/>
                                        <recipientList
parallelProcessing="true" strategyRef="aggregatorStrategy" streaming="true"
stopOnException="false"
prop:timeout="{{ren.context.camel.httpRoute.aggregatorTimeout}}">
                                                       
<header>recipientList</header>
                                        </recipientList>
                                        <process
ref="broadcastResultProcessor" />
                                </when>
                                <when>
                                        <simple>${header.requestType} ==
'forward'</simple>
                                        <bean ref="dynamicRouterBean"
method="route" />
                                </when>
                                <otherwise>
                                        <bean
ref="operationTypeNotSupported"
method="throwOperationTypeNotSupportedException" />
                                </otherwise>
                        </choice>
                        <convertBodyTo type="com.company.openapi.SoapMap" />
</route>


What we are essentially doing, is fetching a request on CXF and forwarding
it to a jms endpoint using IN/OUT messaging. We use dynamic router for
performing this operation dynamically. 
A sample dynamic url for this jms endpoint is : 

activemq:queue:soapRequestQ2?exchangePattern=InOut&asyncConsumer=true&useMessageIDAsCorrelationID=false&requestTimeout=5000&transacted=false&replyTo=responseQ&replyToType=Exclusive
 

Now few problematic behaviors of automatic consumer creation on
responseQueue are: 
1. Whenever the route starts and requests are sent, 15 consumers are created
on the response queue. After that no matter how much the load is, these 15
consumers are handling the requests properly (as per our tps expectations). 
Now if we dynamically update the above sample jms url in our system (even
just the value of requestTimeout) and send another request, we see that the
reponseQ now has additional 15 (total of 30) consumers on it. And right
after this, most of the requests start failing for unable to map the
response to the request. 
The message is sent in the request queue, picked up by the client, the
response is sent, which we can see being enqueued adn dequeued in the
response queue (from activeMQ web console), the response is fetched by our
application but QueueReplyManager fails to map it to original request,
mentioning Response received for unknown correlationId xyz. And just after a
few seconds we see the exchangetimeout happening for the message with same
correlationid. 
Is this because the new set of 15 consumers got created in different session
than the original one and hence they are not able to share the
correlationId? 
Even though I have maxConcurrentConsumers set to 15 for that specific queue,
why is it still increasing beyond that value? (for every change in url, it
keeps on increasing by 15). 

2. We have same behaviour when we use the same queue/endpoint url for single
request and in receipient list. 
The single forward request registers its own 15 consumers and when we
perform a receipient list url, it creates additional 15 consumers on the
queues (again maxconcurrentconsumers is 15 only). 

3. When use the dynamic router (routing to the same jms in/out url) in
multiple routes, we always have 15 consumers on the response queue. 
But we use receipient list in multiple routes (routing to same set of jms
in/out urls), it creates set of 15 consumers per route (meaning if we have 3
routes it creates 45 consumers). 
And again we start getting the same issue of response not being mapped to
the request for unknown correlation id. 

Is there any book/blog from where I can understand how and why is this
happening? How does camel/activemq creates and manages consumers on queues,
specially when we are using the building blocks of dynamic router,
receipient list, activemq endpoint etc. 

I have been trying to understand and find a correct solution for this since
couple of weeks now. 
Please help me in this. 
Thanks. 

BR! 
Yogesh 



--
View this message in context: 
http://activemq.2283324.n4.nabble.com/ActiveMQ-using-in-Camel-In-OUT-endpoint-creates-additional-consumers-on-response-queue-which-are-mors-tp4693377.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to