On Fri, Jul 8, 2011 at 11:04 AM, Sander Mak <sander...@gmail.com> wrote:
> Could this be caused by the polling nature of the underlying Spring
> DefaultMessageListenerContainer class (which has 1 second intervals by
> default IIRC). However, I don't know how the internals of reply-to
> work and whether it uses the listenercontainer.
>

Yes the ReplyManager in camel-jms uses the spring listener container
to listen/poll replies.


>
> Sander
>
> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham <jnews...@referentia.com> wrote:
>>
>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when I
>> specify a custom replyTo destination on the endpoint url, the time it takes
>> for the producer to receive a reply increases drastically.  The curious
>> thing is that the time to receive a reply is almost exactly 1 second.  When
>> I remove the replyTo from the url, everything's fast again.
>>
>> I created a very simple, stand-alone test to demonstrate what I'm seeing.
>>  There is a server class [4] which runs an embedded instance of ActiveMQ and
>> simply replies to messages as they arrive; and a client [3] class which
>> simply sends messages to the server, and prints the elapsed time.  The
>> USE_REPLY_TO symbolic constant in the client determines whether a replyTo
>> value is added to the url or not.
>>
>> The client output when USE_REPLY_TO is false is shown as [1].  The client
>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>> trivial.  Am I doing something wrong, or is this a Camel and/or ActiveMQ
>> issue?
>>
>> Thanks!
>> Jim
>>
>>
>> [1] USE_REPLY_TO = false
>>
>> received reply in: 0.476 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> received reply in: 0.006 s
>> ...
>>
>>
>> [2] USE_REPLY_TO = true
>>
>> received reply in: 1.524 s
>> received reply in: 1.002 s
>> received reply in: 1.003 s
>> received reply in: 1.003 s
>> received reply in: 1.002 s
>> ...
>>
>>
>> [3] TestReplyToClient.java
>>
>> package test;
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToClient {
>>
>>  private static final boolean USE_REPLY_TO = false;
>>
>>  public static void main(String... args) throws Exception {
>>    // create camel context; configure activemq component for
>> tcp://localhost:7001
>>    CamelContext context = new DefaultCamelContext();
>>    ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
>>      null, null, "tcp://localhost:7001"));
>>    context.addComponent("activemq", activemqComponent);
>>    context.start();
>>
>>    // define url to send requests to
>>    String sendUrl = "activemq:queue:dest";
>>    if (USE_REPLY_TO) {
>>      sendUrl += "?replyTo=replyQueue";
>>    }
>>    System.err.println("sending to url: " + sendUrl);
>>
>>    // repeatedly send requests; measure elapsed time
>>    ProducerTemplate template = context.createProducerTemplate();
>>    while (true) {
>>      long startNanos = System.nanoTime();
>>      template.requestBody(sendUrl, "abc");
>>      long elapsedNanos = System.nanoTime() - startNanos;
>>      System.err.println(String.format("received reply in: %.3f s",
>> elapsedNanos / 1000000000.0));
>>    }
>>  }
>>
>> }
>>
>>
>> [4] TestReplyToServer.java
>>
>> package test;
>>
>> import org.apache.activemq.broker.BrokerService;
>> import org.apache.activemq.camel.component.ActiveMQComponent;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.impl.DefaultCamelContext;
>>
>> public class TestReplyToServer {
>>
>>  private static final String BROKER_NAME = "thebroker";
>>
>>  public static void main(String... args) throws Exception {
>>    startBroker();
>>    startCamel();
>>    Thread.sleep(Long.MAX_VALUE);
>>  }
>>
>>  private static void startBroker() throws Exception {
>>    BrokerService brokerService = new BrokerService();
>>    brokerService.setBrokerName(BROKER_NAME);
>>    brokerService.setSchedulerSupport(false);
>>    brokerService.setPersistent(false);
>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>    brokerService.start();
>>    brokerService.waitUntilStarted();
>>  }
>>
>>
>>  private static void startCamel() throws Exception {
>>    CamelContext context = new DefaultCamelContext();
>>
>>    ActiveMQComponent activemqComponent =
>> ActiveMQComponent.activeMQComponent();
>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>> BROKER_NAME));
>>    context.addComponent("activemq", activemqComponent);
>>
>>    final String receiveUrl = "activemq:queue:dest";
>>    context.addRoutes(new RouteBuilder() {
>>      @Override
>>      public void configure() throws Exception {
>>        from(receiveUrl).process(new Processor() {
>>          @Override
>>          public void process(Exchange exchange) throws Exception {
>>            System.err.println("received request");
>>            exchange.getOut().setBody("reply");
>>          }
>>        });
>>      }
>>    });
>>
>>    context.start();
>>    System.err.println("listening on url: " + receiveUrl);
>>  }
>>
>> }
>>
>>
>>
>>
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to