Hi Are you sending as persistent and as a durable topic. http://activemq.apache.org/why-do-i-not-receive-messages-on-my-durable-topic-subscription.html http://activemq.apache.org/how-do-durable-queues-and-topics-work.html
On Thu, Jul 17, 2014 at 12:22 PM, Richa <[email protected]> wrote: > Hi, > > I have a camel route having input endpoint as activemq topic. I have added a > selector to the topic for checking a header. But it looks like I am losing > messages on the topic. My topic configuration is as follows: > > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(); > > if(!routeDetails.getInputEndPoint().startsWith("failover")){ > //This method will generate the hostname which will > act as broker url in > this case. > getBasicUrl(ACTIVE_MQ_TOPIC_PREFIX, true, INPUT); > if(!Util.isNullOrEmpty(hostName,port)){ > connectionFactory.setBrokerURL(TCP + > hostName + COLON + port); > }else{ > throw new Exception("Broker url could > not be set as IP address/Port > could not be obtained from input."); > } > } > else{ > > connectionFactory.setBrokerURL(routeDetails.getInputEndPoint()); > } > StringBuilder inputLocations = new > StringBuilder(ACTIVE_MQ_TOPIC_PREFIX); > inputLocations.append(inputProperties[1]); > > inputLocations.append(QUESTION_MARK+"maxConcurrentConsumers=" + > (MIN_CONSUMERS+(int)(Math.random()*MAX_CONSUMERS))); > jmsEndpoint = (JmsEndpoint) > getContext().getEndpoint(inputLocations.toString()); > > PooledConnectionFactory pooledConnectionFactory = new > PooledConnectionFactory(); > pooledConnectionFactory.setMaxConnections(1); > > pooledConnectionFactory.setConnectionFactory(connectionFactory); > > ActiveMQConfiguration jmsConfiguration = new > ActiveMQConfiguration(); > jmsConfiguration.setPreserveMessageQos(true); > jmsConfiguration.setUsePooledConnection(false); > jmsConfiguration.setUseSingleConnection(true); > //Set the consumer type to custom which will help us > in getting our own > messageListenerContainer. > jmsConfiguration.setConsumerType(ConsumerType.Custom); > //Override the DMLC by creating our own > messageListenerContainer which > will help > //in cases when jms is down and we want to configure > the number of > retries. > > jmsConfiguration.setMessageListenerContainerFactory(new > MessageListenerContainer()); > > jmsConfiguration.setConnectionFactory(pooledConnectionFactory); > > jmsConfiguration.createMessageListenerContainer(jmsEndpoint); > > jmsEndpoint.setConfiguration(jmsConfiguration); > jmsEndpoint.setTransacted(true); > jmsEndpoint.setSelector("header1='abc'"); > > I am pushing two messages with the header1 = abc but only one of them is > picked by the route. The route does some processing on the first message and > then sleeps. > > Can someone tell me if there is some issue with the configuration of the jms > endpoint? > > Thanks, > Richa > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/ActiveMQ-topic-as-input-endpoint-in-Apache-Camel-tp5753957.html > Sent from the Camel - Users mailing list archive at Nabble.com. -- Claus Ibsen ----------------- Red Hat, Inc. Email: [email protected] Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen hawtio: http://hawt.io/ fabric8: http://fabric8.io/
