On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham <jnews...@referentia.com> wrote: > > Hi Claus, > > Thanks for opening the jira issue, and for your comments. To answer your > questions: > > 1. We use fixed reply-to queues which are exclusive to Camel. > 2. We need a fixed reply-to queue to avoid losing reply messages in case of > disconnection (which would happen with temporary queues because they are > scoped to the lifetime of the connection). >
I think we should add an option to the JMS component so you can configure that the replyTo is to be consider as exclusive. ?replyTo=bar&exclusiveReplyTo=true Something like this. Anyone got a better idea for a name of the option? Likewise I have pondered about if we should add an option to have convention over configuration? So for example you can configure on the jms component, a pattern of the reply to names. jmsComponent.setExclusiveReplyTo(true); jmsComponent.setReplyToPattern("${name}.reply"); So in this example Camel will automatic name the reply to queues, as the request queue name + ".reply". So in the example above we can do from X to("jms:queue:bar") to Y So in this example since we configured the component with exclusive reply to, and a reply to pattern as well. Then what happens is that Camel will use a reply to queue named: bar.reply Of course you can still configure all the options on the endpoint as well if you like. And you can override the component settings so in case you want a special reply to name you can do: from X to("jms:queue:bar?replyTo=special") to Y Any thoughts? > Regards, > Jim > > On 7/9/2011 5:52 AM, Claus Ibsen wrote: >> >> Hi Jim >> >> I have created a ticket and posted some comments about the issue >> https://issues.apache.org/jira/browse/CAMEL-4202 >> >> Are you using a fixed reply to queue that is *exclusive* to the Camel >> route? >> Or is the queue used for other purposes as well? >> >> Is there a special reason why you want to use a fixed reply to queue? >> >> >> >> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jnews...@referentia.com> >> wrote: >>> >>> Hi Claus, >>> >>> I enabled trace logging. I'm attaching the logs (for both client and >>> server; both with and without custom replyTo) as a zip file -- not sure >>> if >>> the mailing list will filter it, we'll see. >>> >>> I see that there are 5 messages in the client log which only appear when >>> a >>> custom replyTo is specified: "Running purge task to see if any entries >>> has >>> been timed out", "There are 1 in the timeout map", "did not receive a >>> message", etc. Here's an excerpt from each client log, for one exchange: >>> >>> From log for client without replyTo: >>> >>> 2011-07-08 10:55:32,354 [main] TRACE >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template >>> 2011-07-08 10:55:32,361 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - >>> Executing >>> callback on JMS Session: ActiveMQSession >>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false} >>> 2011-07-08 10:55:32,361 [main] TRACE >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text >>> 2011-07-08 10:55:32,362 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - >>> Sending >>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId >>> = >>> 0, responseRequired = false, messageId = null, originalDestination = >>> null, >>> originalTransactionId = null, producerId = null, destination = null, >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0, >>> brokerInTime = 0, brokerOutTime = 0, correlationId = >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent = >>> false, type = null, priority = 0, groupID = null, groupSequence = 0, >>> targetConsumerId = null, compressed = false, userID = null, content = >>> null, >>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody = >>> false, droppable = false, text = abc} >>> 2011-07-08 10:55:32,363 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent >>> JMS >>> message to: queue://dest with message: ActiveMQTextMessage {commandId = >>> 0, >>> responseRequired = false, messageId = >>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination = >>> null, originalTransactionId = null, producerId = null, destination = >>> queue://dest, transactionId = null, expiration = 1310158552362, timestamp >>> = >>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0, >>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent = >>> true, type = null, priority = 4, groupID = null, groupSequence = 0, >>> targetConsumerId = null, compressed = false, userID = null, content = >>> null, >>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, >>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody = >>> false, droppable = false, text = abc} >>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - >>> Received >>> reply message with correlationID: >>> ID-rsi-eng-newsham-61472-1310158530715-0-4 >>> -> ActiveMQTextMessage {commandId = 9, responseRequired = true, >>> messageId = >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination = >>> null, originalTransactionId = null, producerId = >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination = >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId >>> = >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0, >>> brokerInTime = >>> 1310158532367, brokerOutTime = 1310158532367, correlationId = >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent = >>> true, type = null, priority = 4, groupID = null, groupSequence = 0, >>> targetConsumerId = null, compressed = false, userID = null, content = >>> null, >>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = >>> true, >>> droppable = false, text = reply} >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a >>> TextMessage >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired = >>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, >>> originalDestination = null, originalTransactionId = null, producerId = >>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination = >>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId >>> = >>> null, expiration = 0, timestamp = 1310158532367, arrival = 0, >>> brokerInTime = >>> 1310158532367, brokerOutTime = 1310158532367, correlationId = >>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent = >>> true, type = null, priority = 4, groupID = null, groupSequence = 0, >>> targetConsumerId = null, compressed = false, userID = null, content = >>> null, >>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, >>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody = >>> true, >>> droppable = false, text = reply} >>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG >>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply >>> received. Setting reply as OUT message: reply >>> received reply in: 0.015 s >>> >>> From log for client with replyTo: >>> >>> 2011-07-08 10:52:10,075 [main] TRACE >>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template >>> 2011-07-08 10:52:10,081 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - >>> Executing >>> callback on JMS Session: ActiveMQSession >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false} >>> 2011-07-08 10:52:10,082 [main] TRACE >>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text >>> 2011-07-08 10:52:10,082 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - >>> Sending >>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId >>> = >>> 0, responseRequired = false, messageId = null, originalDestination = >>> null, >>> originalTransactionId = null, producerId = null, destination = null, >>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0, >>> brokerInTime = 0, brokerOutTime = 0, correlationId = >>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue, >>> persistent = false, type = null, priority = 0, groupID = null, >>> groupSequence >>> = 0, targetConsumerId = null, compressed = false, userID = null, content >>> = >>> null, marshalledProperties = null, dataStructure = null, >>> redeliveryCounter = >>> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody >>> = >>> false, droppable = false, text = abc} >>> 2011-07-08 10:52:10,083 [main] DEBUG >>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent >>> JMS >>> message to: queue://dest with message: ActiveMQTextMessage {commandId = >>> 0, >>> responseRequired = false, messageId = >>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination = >>> null, originalTransactionId = null, producerId = null, destination = >>> queue://dest, transactionId = null, expiration = 1310158350082, timestamp >>> = >>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0, >>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = >>> queue://replyQueue, persistent = true, type = null, priority = 4, groupID >>> = >>> null, groupSequence = 0, targetConsumerId = null, compressed = false, >>> userID >>> = null, content = null, marshalledProperties = null, dataStructure = >>> null, >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = >>> false, readOnlyBody = false, droppable = false, text = abc} >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 - >>> JmsReplyManagerTimeoutChecker[dest] TRACE >>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task >>> to >>> see if any entries has been timed out >>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 - >>> JmsReplyManagerTimeoutChecker[dest] TRACE >>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the >>> timeout map >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG >>> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer >>> - Consumer [ActiveMQMessageConsumer { >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of >>> session [ActiveMQSession >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not >>> receive a message >>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using >>> >>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4'] >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG >>> >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer >>> - Received message of type [class >>> org.apache.activemq.command.ActiveMQTextMessage] from consumer >>> [ActiveMQMessageConsumer { >>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of >>> session [ActiveMQSession >>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - >>> Received >>> reply message with correlationID: >>> ID-rsi-eng-newsham-61454-1310158327407-0-4 >>> -> ActiveMQTextMessage {commandId = 9, responseRequired = true, >>> messageId = >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination = >>> null, originalTransactionId = null, producerId = >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination = >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp = >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime = >>> 1310158331077, correlationId = >>> ID-rsi-eng-newsham-61454-1310158327407-0-4, >>> replyTo = null, persistent = true, type = null, priority = 4, groupID = >>> null, groupSequence = 0, targetConsumerId = null, compressed = false, >>> userID >>> = null, content = null, marshalledProperties = null, dataStructure = >>> null, >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = >>> true, readOnlyBody = true, droppable = false, text = reply} >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE >>> org.apache.camel.component.jms.JmsBinding - Extracting body as a >>> TextMessage >>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired = >>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, >>> originalDestination = null, originalTransactionId = null, producerId = >>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination = >>> queue://replyQueue, transactionId = null, expiration = 0, timestamp = >>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime = >>> 1310158331077, correlationId = >>> ID-rsi-eng-newsham-61454-1310158327407-0-4, >>> replyTo = null, persistent = true, type = null, priority = 4, groupID = >>> null, groupSequence = 0, targetConsumerId = null, compressed = false, >>> userID >>> = null, content = null, marshalledProperties = null, dataStructure = >>> null, >>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = >>> true, readOnlyBody = true, droppable = false, text = reply} >>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG >>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply >>> received. Setting reply as OUT message: reply >>> received reply in: 1.004 s >>> >>> Thanks, >>> Jim >>> >>> >>> >>> On 7/7/2011 10:59 PM, Claus Ibsen wrote: >>>> >>>> Can you enable TRACE logging at org.apache.camel.component.jms and run >>>> it for both examples. >>>> To see what happens. >>>> >>>> >>>> >>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jnews...@referentia.com> >>>> wrote: >>>>> >>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0. For some reason, when >>>>> I >>>>> specify a custom replyTo destination on the endpoint url, the time it >>>>> takes >>>>> for the producer to receive a reply increases drastically. The curious >>>>> thing is that the time to receive a reply is almost exactly 1 second. >>>>> When >>>>> I remove the replyTo from the url, everything's fast again. >>>>> >>>>> I created a very simple, stand-alone test to demonstrate what I'm >>>>> seeing. >>>>> There is a server class [4] which runs an embedded instance of >>>>> ActiveMQ >>>>> and >>>>> simply replies to messages as they arrive; and a client [3] class which >>>>> simply sends messages to the server, and prints the elapsed time. The >>>>> USE_REPLY_TO symbolic constant in the client determines whether a >>>>> replyTo >>>>> value is added to the url or not. >>>>> >>>>> The client output when USE_REPLY_TO is false is shown as [1]. The >>>>> client >>>>> output when USE_REPLY_TO is true is shown as [2]. The code is pretty >>>>> trivial. Am I doing something wrong, or is this a Camel and/or >>>>> ActiveMQ >>>>> issue? >>>>> >>>>> Thanks! >>>>> Jim >>>>> >>>>> >>>>> [1] USE_REPLY_TO = false >>>>> >>>>> received reply in: 0.476 s >>>>> received reply in: 0.006 s >>>>> received reply in: 0.006 s >>>>> received reply in: 0.006 s >>>>> received reply in: 0.006 s >>>>> ... >>>>> >>>>> >>>>> [2] USE_REPLY_TO = true >>>>> >>>>> received reply in: 1.524 s >>>>> received reply in: 1.002 s >>>>> received reply in: 1.003 s >>>>> received reply in: 1.003 s >>>>> received reply in: 1.002 s >>>>> ... >>>>> >>>>> >>>>> [3] TestReplyToClient.java >>>>> >>>>> package test; >>>>> >>>>> import org.apache.activemq.ActiveMQConnectionFactory; >>>>> import org.apache.activemq.camel.component.ActiveMQComponent; >>>>> import org.apache.camel.CamelContext; >>>>> import org.apache.camel.ProducerTemplate; >>>>> import org.apache.camel.impl.DefaultCamelContext; >>>>> >>>>> public class TestReplyToClient { >>>>> >>>>> private static final boolean USE_REPLY_TO = false; >>>>> >>>>> public static void main(String... args) throws Exception { >>>>> // create camel context; configure activemq component for >>>>> tcp://localhost:7001 >>>>> CamelContext context = new DefaultCamelContext(); >>>>> ActiveMQComponent activemqComponent = >>>>> ActiveMQComponent.activeMQComponent(); >>>>> activemqComponent.setConnectionFactory(new >>>>> ActiveMQConnectionFactory( >>>>> null, null, "tcp://localhost:7001")); >>>>> context.addComponent("activemq", activemqComponent); >>>>> context.start(); >>>>> >>>>> // define url to send requests to >>>>> String sendUrl = "activemq:queue:dest"; >>>>> if (USE_REPLY_TO) { >>>>> sendUrl += "?replyTo=replyQueue"; >>>>> } >>>>> System.err.println("sending to url: " + sendUrl); >>>>> >>>>> // repeatedly send requests; measure elapsed time >>>>> ProducerTemplate template = context.createProducerTemplate(); >>>>> while (true) { >>>>> long startNanos = System.nanoTime(); >>>>> template.requestBody(sendUrl, "abc"); >>>>> long elapsedNanos = System.nanoTime() - startNanos; >>>>> System.err.println(String.format("received reply in: %.3f s", >>>>> elapsedNanos / 1000000000.0)); >>>>> } >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> [4] TestReplyToServer.java >>>>> >>>>> package test; >>>>> >>>>> import org.apache.activemq.broker.BrokerService; >>>>> import org.apache.activemq.camel.component.ActiveMQComponent; >>>>> import org.apache.camel.CamelContext; >>>>> import org.apache.camel.Exchange; >>>>> import org.apache.camel.Processor; >>>>> import org.apache.camel.builder.RouteBuilder; >>>>> import org.apache.camel.impl.DefaultCamelContext; >>>>> >>>>> public class TestReplyToServer { >>>>> >>>>> private static final String BROKER_NAME = "thebroker"; >>>>> >>>>> public static void main(String... args) throws Exception { >>>>> startBroker(); >>>>> startCamel(); >>>>> Thread.sleep(Long.MAX_VALUE); >>>>> } >>>>> >>>>> private static void startBroker() throws Exception { >>>>> BrokerService brokerService = new BrokerService(); >>>>> brokerService.setBrokerName(BROKER_NAME); >>>>> brokerService.setSchedulerSupport(false); >>>>> brokerService.setPersistent(false); >>>>> brokerService.addConnector("tcp://0.0.0.0:7001"); >>>>> brokerService.start(); >>>>> brokerService.waitUntilStarted(); >>>>> } >>>>> >>>>> >>>>> private static void startCamel() throws Exception { >>>>> CamelContext context = new DefaultCamelContext(); >>>>> >>>>> ActiveMQComponent activemqComponent = >>>>> ActiveMQComponent.activeMQComponent(); >>>>> activemqComponent.setBrokerURL(String.format("vm://%s?create=false", >>>>> BROKER_NAME)); >>>>> context.addComponent("activemq", activemqComponent); >>>>> >>>>> final String receiveUrl = "activemq:queue:dest"; >>>>> context.addRoutes(new RouteBuilder() { >>>>> @Override >>>>> public void configure() throws Exception { >>>>> from(receiveUrl).process(new Processor() { >>>>> @Override >>>>> public void process(Exchange exchange) throws Exception { >>>>> System.err.println("received request"); >>>>> exchange.getOut().setBody("reply"); >>>>> } >>>>> }); >>>>> } >>>>> }); >>>>> >>>>> context.start(); >>>>> System.err.println("listening on url: " + receiveUrl); >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >> > > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/