I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0. For some reason, when I specify a custom replyTo destination on the endpoint url, the time it takes for the producer to receive a reply increases drastically. The curious thing is that the time to receive a reply is almost exactly 1 second. When I remove the replyTo from the url, everything's fast again.

I created a very simple, stand-alone test to demonstrate what I'm seeing. There is a server class [4] which runs an embedded instance of ActiveMQ and simply replies to messages as they arrive; and a client [3] class which simply sends messages to the server, and prints the elapsed time. The USE_REPLY_TO symbolic constant in the client determines whether a replyTo value is added to the url or not.

The client output when USE_REPLY_TO is false is shown as [1]. The client output when USE_REPLY_TO is true is shown as [2]. The code is pretty trivial. Am I doing something wrong, or is this a Camel and/or ActiveMQ issue?

Thanks!
Jim


[1] USE_REPLY_TO = false

received reply in: 0.476 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
...


[2] USE_REPLY_TO = true

received reply in: 1.524 s
received reply in: 1.002 s
received reply in: 1.003 s
received reply in: 1.003 s
received reply in: 1.002 s
...


[3] TestReplyToClient.java

package test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToClient {

  private static final boolean USE_REPLY_TO = false;

  public static void main(String... args) throws Exception {
// create camel context; configure activemq component for tcp://localhost:7001
    CamelContext context = new DefaultCamelContext();
ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent();
    activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
      null, null, "tcp://localhost:7001"));
    context.addComponent("activemq", activemqComponent);
    context.start();

    // define url to send requests to
    String sendUrl = "activemq:queue:dest";
    if (USE_REPLY_TO) {
      sendUrl += "?replyTo=replyQueue";
    }
    System.err.println("sending to url: " + sendUrl);

    // repeatedly send requests; measure elapsed time
    ProducerTemplate template = context.createProducerTemplate();
    while (true) {
      long startNanos = System.nanoTime();
      template.requestBody(sendUrl, "abc");
      long elapsedNanos = System.nanoTime() - startNanos;
System.err.println(String.format("received reply in: %.3f s", elapsedNanos / 1000000000.0));
    }
  }

}


[4] TestReplyToServer.java

package test;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToServer {

  private static final String BROKER_NAME = "thebroker";

  public static void main(String... args) throws Exception {
    startBroker();
    startCamel();
    Thread.sleep(Long.MAX_VALUE);
  }

  private static void startBroker() throws Exception {
    BrokerService brokerService = new BrokerService();
    brokerService.setBrokerName(BROKER_NAME);
    brokerService.setSchedulerSupport(false);
    brokerService.setPersistent(false);
    brokerService.addConnector("tcp://0.0.0.0:7001");
    brokerService.start();
    brokerService.waitUntilStarted();
  }


  private static void startCamel() throws Exception {
    CamelContext context = new DefaultCamelContext();

ActiveMQComponent activemqComponent = ActiveMQComponent.activeMQComponent(); activemqComponent.setBrokerURL(String.format("vm://%s?create=false", BROKER_NAME));
    context.addComponent("activemq", activemqComponent);

    final String receiveUrl = "activemq:queue:dest";
    context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from(receiveUrl).process(new Processor() {
          @Override
          public void process(Exchange exchange) throws Exception {
            System.err.println("received request");
            exchange.getOut().setBody("reply");
          }
        });
      }
    });

    context.start();
    System.err.println("listening on url: " + receiveUrl);
  }

}




Reply via email to