Hi,

I have a camel route having input endpoint as activemq topic. I have added a
selector to the topic for checking a header. But it looks like I am losing
messages on the topic. My topic configuration is as follows: 

ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory();
                        
if(!routeDetails.getInputEndPoint().startsWith("failover")){
                        //This method will generate the hostname which will act 
as broker url in
this case.
                        getBasicUrl(ACTIVE_MQ_TOPIC_PREFIX, true, INPUT);
                                if(!Util.isNullOrEmpty(hostName,port)){
                                        connectionFactory.setBrokerURL(TCP + 
hostName + COLON + port);
                                }else{
                                        throw new Exception("Broker url could 
not be set as IP address/Port
could not be obtained from input.");
                                }
                        }
                        else{
                                
connectionFactory.setBrokerURL(routeDetails.getInputEndPoint());
                        }
                        StringBuilder inputLocations = new 
StringBuilder(ACTIVE_MQ_TOPIC_PREFIX);
                        inputLocations.append(inputProperties[1]);
                        
inputLocations.append(QUESTION_MARK+"maxConcurrentConsumers=" + 
(MIN_CONSUMERS+(int)(Math.random()*MAX_CONSUMERS)));
                        jmsEndpoint = (JmsEndpoint)
getContext().getEndpoint(inputLocations.toString());
                        
                        PooledConnectionFactory pooledConnectionFactory = new
PooledConnectionFactory();
                        pooledConnectionFactory.setMaxConnections(1);
                        
pooledConnectionFactory.setConnectionFactory(connectionFactory);
                        
                        ActiveMQConfiguration jmsConfiguration = new 
ActiveMQConfiguration();
                        jmsConfiguration.setPreserveMessageQos(true);
                        jmsConfiguration.setUsePooledConnection(false);
                        jmsConfiguration.setUseSingleConnection(true);
                        //Set the consumer type to custom which will help us in 
getting our own
messageListenerContainer.
                        jmsConfiguration.setConsumerType(ConsumerType.Custom);
                        //Override the DMLC by creating our own 
messageListenerContainer which
will help
                        //in cases when jms is down and we want to configure 
the number of
retries.
                        jmsConfiguration.setMessageListenerContainerFactory(new
MessageListenerContainer());
                        
jmsConfiguration.setConnectionFactory(pooledConnectionFactory);
                        
jmsConfiguration.createMessageListenerContainer(jmsEndpoint);
                        
                        jmsEndpoint.setConfiguration(jmsConfiguration);
                        jmsEndpoint.setTransacted(true);
                        jmsEndpoint.setSelector("header1='abc'");

I am pushing two messages with the header1 = abc but only one of them is
picked by the route. The route does some processing on the first message and
then sleeps.

Can someone tell me if there is some issue with the configuration of the jms
endpoint?

Thanks,
Richa 




--
View this message in context: 
http://camel.465427.n5.nabble.com/ActiveMQ-topic-as-input-endpoint-in-Apache-Camel-tp5753957.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to