I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0. For some reason, when I
specify a custom replyTo destination on the endpoint url, the time it
takes for the producer to receive a reply increases drastically. The
curious thing is that the time to receive a reply is almost exactly 1
second. When I remove the replyTo from the url, everything's fast again.
I created a very simple, stand-alone test to demonstrate what I'm
seeing. There is a server class [4] which runs an embedded instance of
ActiveMQ and simply replies to messages as they arrive; and a client [3]
class which simply sends messages to the server, and prints the elapsed
time. The USE_REPLY_TO symbolic constant in the client determines
whether a replyTo value is added to the url or not.
The client output when USE_REPLY_TO is false is shown as [1]. The
client output when USE_REPLY_TO is true is shown as [2]. The code is
pretty trivial. Am I doing something wrong, or is this a Camel and/or
ActiveMQ issue?
Thanks!
Jim
[1] USE_REPLY_TO = false
received reply in: 0.476 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
...
[2] USE_REPLY_TO = true
received reply in: 1.524 s
received reply in: 1.002 s
received reply in: 1.003 s
received reply in: 1.003 s
received reply in: 1.002 s
...
[3] TestReplyToClient.java
package test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;
public class TestReplyToClient {
private static final boolean USE_REPLY_TO = false;
public static void main(String... args) throws Exception {
// create camel context; configure activemq component for
tcp://localhost:7001
CamelContext context = new DefaultCamelContext();
ActiveMQComponent activemqComponent =
ActiveMQComponent.activeMQComponent();
activemqComponent.setConnectionFactory(new ActiveMQConnectionFactory(
null, null, "tcp://localhost:7001"));
context.addComponent("activemq", activemqComponent);
context.start();
// define url to send requests to
String sendUrl = "activemq:queue:dest";
if (USE_REPLY_TO) {
sendUrl += "?replyTo=replyQueue";
}
System.err.println("sending to url: " + sendUrl);
// repeatedly send requests; measure elapsed time
ProducerTemplate template = context.createProducerTemplate();
while (true) {
long startNanos = System.nanoTime();
template.requestBody(sendUrl, "abc");
long elapsedNanos = System.nanoTime() - startNanos;
System.err.println(String.format("received reply in: %.3f s",
elapsedNanos / 1000000000.0));
}
}
}
[4] TestReplyToServer.java
package test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
public class TestReplyToServer {
private static final String BROKER_NAME = "thebroker";
public static void main(String... args) throws Exception {
startBroker();
startCamel();
Thread.sleep(Long.MAX_VALUE);
}
private static void startBroker() throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName(BROKER_NAME);
brokerService.setSchedulerSupport(false);
brokerService.setPersistent(false);
brokerService.addConnector("tcp://0.0.0.0:7001");
brokerService.start();
brokerService.waitUntilStarted();
}
private static void startCamel() throws Exception {
CamelContext context = new DefaultCamelContext();
ActiveMQComponent activemqComponent =
ActiveMQComponent.activeMQComponent();
activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
BROKER_NAME));
context.addComponent("activemq", activemqComponent);
final String receiveUrl = "activemq:queue:dest";
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(receiveUrl).process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.err.println("received request");
exchange.getOut().setBody("reply");
}
});
}
});
context.start();
System.err.println("listening on url: " + receiveUrl);
}
}