Hi Maybe this ticket can give some hints https://issues.apache.org/activemq/browse/CAMEL-490
On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <[email protected]> wrote: > > Hello. > In order to optimize my InOut throughput I added explicit replyTo on my JMS > endpoint. > This doesn't seems to be well behaving with camel threads and spring JMS > caching. > The following test case shows the problem. > remove the explicit replyTo and everything works fine. > your comments are welcome. > > ================ > > import java.util.concurrent.ExecutorService; > import java.util.concurrent.Executors; > import java.util.concurrent.Callable; > import javax.jms.TextMessage; > import javax.jms.Destination; > import javax.jms.Message; > import javax.jms.Session; > import javax.jms.JMSException; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.CamelContext; > import org.apache.camel.component.mock.MockEndpoint; > import org.apache.camel.builder.RouteBuilder; > import org.apache.activemq.camel.component.ActiveMQComponent; > import org.apache.camel.ExchangePattern; > import org.apache.camel.test.CamelTestSupport; > import org.springframework.jms.connection.CachingConnectionFactory; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessageCreator; > import static > org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent; > > /** > * Unit test using a fixed replyTo specified on the JMS endpoint > * > * @version $Revision: 791824 $ > */ > public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport { > > private ActiveMQComponent amq; > private static String MQURI = "failover:(tcp://localhost:61616)"; > // private static String MQURI = > "vm://localhost?broker.persistent=false&broker.useJmx=false"; > > public class Replier implements Callable { > > �...@override > public Object call() throws Exception { > log.info("replier started"); > JmsTemplate jms = new > JmsTemplate(amq.getConfiguration().getConnectionFactory()); > > final TextMessage msg = (TextMessage) > jms.receive("nameRequestor"); > assertEquals("What's your name", msg.getText()); > > // there should be a JMSReplyTo so we know where to send the > reply > final Destination replyTo = msg.getJMSReplyTo(); > > // send reply > // Thread.sleep(10000); > jms.send(replyTo, new MessageCreator() { > public Message createMessage(Session session) throws > JMSException { > TextMessage replyMsg = session.createTextMessage(); > replyMsg.setText("My name is Arnio"); > > replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID()); > return replyMsg; > } > }); > return null; > } > }; > public void testCustomJMSReplyToInOut() throws Exception { > > MockEndpoint mock = getMockEndpoint("mock:result"); > mock.expectedBodiesReceived("My name is Arnio"); > mock.expectedMessageCount(10); > // do not use Camel to send and receive to simulate a non Camel > client > // use another thread to listen and send the reply > ExecutorService newFixedThreadPool = > Executors.newFixedThreadPool(20); > > for (int i = 0 ; i < 10 ; i++) { > newFixedThreadPool.submit(new Replier()); > } > > // now get started and send the first message that gets the ball > rolling > JmsTemplate jms = new > JmsTemplate(amq.getConfiguration().getConnectionFactory()); > for (int i = 0 ; i < 10 ; i++) { > jms.send("hello", new MessageCreator() { > public Message createMessage(Session session) throws > JMSException { > TextMessage msg = session.createTextMessage(); > msg.setText("Hello, I'm here"); > return msg; > } > > }); > > log.info("Hello sent"); > Thread.sleep(10); > } > > Thread.sleep(5000); > assertMockEndpointsSatisfied(); > } > > protected RouteBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > > public void configure() throws Exception { > from("activemq:queue:hello") > .threads() > .process(new Processor() { > public void process(Exchange exchange) throws > Exception { > > exchange.getOut().setBody("What's your name"); > } > }) > // use in out to get a reply as well > > .to(ExchangePattern.InOut, > "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the > replyTo and eveything works well > .to("direct:replyprocessor"); > > from("direct:replyprocessor").process(new Processor() { > public void process(Exchange exchange) throws > Exception { > System.out.println("Here I am processing the > reply " + exchange.getIn().getBody()); > } > }).to("mock:result"); > } > }; > } > > protected CamelContext createCamelContext() throws Exception { > CamelContext camelContext = super.createCamelContext(); > amq = activeMQComponent(MQURI); > ActiveMQConnectionFactory targetFactory = new > ActiveMQConnectionFactory(MQURI); > log.info("Using MQ CachingConnectionFactory"); > CachingConnectionFactory cachedAMQConnectionFactory = new > CachingConnectionFactory(targetFactory); > > camelContext.addComponent("activemq", > ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory)); > > return camelContext; > } > > > } > > =========== > -- > View this message in context: > http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html > Sent from the Camel - Users (activemq) mailing list archive at Nabble.com. > > -- Claus Ibsen Apache Camel Committer Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
