Hi

Maybe this ticket can give some hints
https://issues.apache.org/activemq/browse/CAMEL-490

On Wed, Oct 21, 2009 at 2:15 PM, Eric Bouer <[email protected]> wrote:
>
> Hello.
> In order to optimize my InOut throughput I added explicit replyTo  on my JMS
> endpoint.
> This doesn't seems to be well behaving with camel threads and spring JMS
> caching.
> The following test case shows the problem.
> remove the explicit replyTo and everything works fine.
> your comments are welcome.
>
> ================
>
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
> import java.util.concurrent.Callable;
> import javax.jms.TextMessage;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.Session;
> import javax.jms.JMSException;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.CamelContext;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.activemq.camel.component.ActiveMQComponent;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.test.CamelTestSupport;
> import org.springframework.jms.connection.CachingConnectionFactory;
> import org.springframework.jms.core.JmsTemplate;
> import org.springframework.jms.core.MessageCreator;
> import static
> org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
>
> /**
>  * Unit test using a fixed replyTo specified on the JMS endpoint
>  *
>  * @version $Revision: 791824 $
>  */
> public class JmsJMSReplyToEndpointUsingInOutTest extends CamelTestSupport {
>
>    private ActiveMQComponent amq;
>    private static String MQURI = "failover:(tcp://localhost:61616)";
>   // private static String MQURI =
> "vm://localhost?broker.persistent=false&broker.useJmx=false";
>
>    public class Replier implements Callable {
>
>       �...@override
>                public Object call() throws Exception {
>                log.info("replier started");
>                JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>
>                final TextMessage msg = (TextMessage)
> jms.receive("nameRequestor");
>                assertEquals("What's your name", msg.getText());
>
>                // there should be a JMSReplyTo so we know where to send the
> reply
>                final Destination replyTo = msg.getJMSReplyTo();
>
>                // send reply
>               // Thread.sleep(10000);
>                jms.send(replyTo, new MessageCreator() {
>                    public Message createMessage(Session session) throws
> JMSException {
>                        TextMessage replyMsg = session.createTextMessage();
>                        replyMsg.setText("My name is Arnio");
>
> replyMsg.setJMSCorrelationID(msg.getJMSCorrelationID());
>                        return replyMsg;
>                    }
>                });
>                return null;
>            }
>    };
>    public void testCustomJMSReplyToInOut() throws Exception {
>
>        MockEndpoint mock = getMockEndpoint("mock:result");
>        mock.expectedBodiesReceived("My name is Arnio");
>        mock.expectedMessageCount(10);
>        // do not use Camel to send and receive to simulate a non Camel
> client
>        // use another thread to listen and send the reply
>        ExecutorService newFixedThreadPool =
> Executors.newFixedThreadPool(20);
>
>        for (int i = 0 ; i < 10 ; i++) {
>          newFixedThreadPool.submit(new Replier());
>        }
>
>        // now get started and send the first message that gets the ball
> rolling
>        JmsTemplate jms = new
> JmsTemplate(amq.getConfiguration().getConnectionFactory());
>        for (int i = 0 ; i < 10 ; i++) {
>        jms.send("hello", new MessageCreator() {
>            public Message createMessage(Session session) throws
> JMSException {
>                TextMessage msg = session.createTextMessage();
>                msg.setText("Hello, I'm here");
>                return msg;
>            }
>
>        });
>
>        log.info("Hello sent");
>        Thread.sleep(10);
>        }
>
>        Thread.sleep(5000);
>        assertMockEndpointsSatisfied();
>    }
>
>    protected RouteBuilder createRouteBuilder() throws Exception {
>        return new RouteBuilder() {
>
>            public void configure() throws Exception {
>                from("activemq:queue:hello")
>                   .threads()
>                    .process(new Processor() {
>                        public void process(Exchange exchange) throws
> Exception {
>
>                            exchange.getOut().setBody("What's your name");
>                        }
>                    })
>                    // use in out to get a reply as well
>
>                    .to(ExchangePattern.InOut,
> "activemq:queue:nameRequestor?replyTo=queue:namedReplyQueue) // Remove the
> replyTo and eveything works well
>                    .to("direct:replyprocessor");
>
>                    from("direct:replyprocessor").process(new Processor() {
>                            public void process(Exchange exchange) throws
> Exception {
>                                System.out.println("Here I am processing the
> reply " + exchange.getIn().getBody());
>                            }
>                    }).to("mock:result");
>            }
>        };
>    }
>
>    protected CamelContext createCamelContext() throws Exception {
>         CamelContext camelContext = super.createCamelContext();
>        amq = activeMQComponent(MQURI);
>        ActiveMQConnectionFactory targetFactory = new
> ActiveMQConnectionFactory(MQURI);
>         log.info("Using MQ CachingConnectionFactory");
>            CachingConnectionFactory cachedAMQConnectionFactory = new
> CachingConnectionFactory(targetFactory);
>
>        camelContext.addComponent("activemq",
> ActiveMQComponent.jmsComponent(cachedAMQConnectionFactory));
>
>        return camelContext;
>    }
>
>
> }
>
> ===========
> --
> View this message in context: 
> http://www.nabble.com/explicitly-setting-replyTo-doesn%27t-scale-with-.threads%28%29-and-JMS-cache.-tp25991334p25991334.html
> Sent from the Camel - Users (activemq) mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Reply via email to