On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham <jnews...@referentia.com> wrote:
>
> Hi Claus,
>
> Thanks for opening the jira issue, and for your comments.  To answer your
> questions:
>
> 1.  We use fixed reply-to queues which are exclusive to Camel.
> 2.  We need a fixed reply-to queue to avoid losing reply messages in case of
> disconnection (which would happen with temporary queues because they are
> scoped to the lifetime of the connection).
>

I think we should add an option to the JMS component so you can
configure that the replyTo is to be consider as exclusive.

?replyTo=bar&exclusiveReplyTo=true

Something like this. Anyone got a better idea for a name of the option?

Likewise I have pondered about if we should add an option to have
convention over configuration? So for example you can configure on the
jms component, a pattern of the reply to names.

jmsComponent.setExclusiveReplyTo(true);
jmsComponent.setReplyToPattern("${name}.reply");

So in this example Camel will automatic name the reply to queues, as
the request queue name + ".reply". So in the example above we can do

from X
to("jms:queue:bar")
to Y

So in this example since we configured the component with exclusive
reply to, and a reply to pattern as well. Then what happens is that
Camel will use a reply to queue named: bar.reply


Of course you can still configure all the options on the endpoint as
well if you like. And you can override the component settings so in
case you want a special reply to name you can do:

from X
to("jms:queue:bar?replyTo=special")
to Y

Any thoughts?


> Regards,
> Jim
>
> On 7/9/2011 5:52 AM, Claus Ibsen wrote:
>>
>> Hi Jim
>>
>> I have created a ticket and posted some comments about the issue
>> https://issues.apache.org/jira/browse/CAMEL-4202
>>
>> Are you using a fixed reply to queue that is *exclusive* to the Camel
>> route?
>> Or is the queue used for other purposes as well?
>>
>> Is there a special reason why you want to use a fixed reply to queue?
>>
>>
>>
>> On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jnews...@referentia.com>
>>  wrote:
>>>
>>> Hi Claus,
>>>
>>> I enabled trace logging.  I'm attaching the logs (for both client and
>>> server; both with and without custom replyTo) as a zip file -- not sure
>>> if
>>> the mailing list will filter it, we'll see.
>>>
>>> I see that there are 5 messages in the client log which only appear when
>>> a
>>> custom replyTo is specified:  "Running purge task to see if any entries
>>> has
>>> been timed out", "There are 1 in the timeout map", "did not receive a
>>> message", etc.  Here's an excerpt from each client log, for one exchange:
>>>
>>>  From log for client without replyTo:
>>>
>>> 2011-07-08 10:55:32,354 [main] TRACE
>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>> 2011-07-08 10:55:32,361 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Executing
>>> callback on JMS Session: ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
>>> 2011-07-08 10:55:32,361 [main] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>> 2011-07-08 10:55:32,362 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Sending
>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>> =
>>> 0, responseRequired = false, messageId = null, originalDestination =
>>> null,
>>> originalTransactionId = null, producerId = null, destination = null,
>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>> false, type = null, priority = 0, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:55:32,363 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>> JMS
>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>> 0,
>>> responseRequired = false, messageId =
>>> ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
>>> null, originalTransactionId = null, producerId = null, destination =
>>> queue://dest, transactionId = null, expiration = 1310158552362, timestamp
>>> =
>>> 1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>> correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
>>> Received
>>> reply message with correlationID:
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4
>>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>> messageId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
>>> null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>> =
>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>> brokerInTime =
>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>> true,
>>> droppable = false, text = reply}
>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>> TextMessage
>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>> true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
>>> originalDestination = null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
>>> temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
>>> =
>>> null, expiration = 0, timestamp = 1310158532367, arrival = 0,
>>> brokerInTime =
>>> 1310158532367, brokerOutTime = 1310158532367, correlationId =
>>> ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
>>> true, type = null, priority = 4, groupID = null, groupSequence = 0,
>>> targetConsumerId = null, compressed = false, userID = null, content =
>>> null,
>>> marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
>>> size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
>>> true,
>>> droppable = false, text = reply}
>>> 2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
>>> received. Setting reply as OUT message: reply
>>> received reply in: 0.015 s
>>>
>>>  From log for client with replyTo:
>>>
>>> 2011-07-08 10:52:10,075 [main] TRACE
>>> org.apache.camel.component.jms.JmsProducer - Using inOut jms template
>>> 2011-07-08 10:52:10,081 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Executing
>>> callback on JMS Session: ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
>>> 2011-07-08 10:52:10,082 [main] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
>>> 2011-07-08 10:52:10,082 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
>>> Sending
>>> JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
>>> =
>>> 0, responseRequired = false, messageId = null, originalDestination =
>>> null,
>>> originalTransactionId = null, producerId = null, destination = null,
>>> transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
>>> brokerInTime = 0, brokerOutTime = 0, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
>>> persistent = false, type = null, priority = 0, groupID = null,
>>> groupSequence
>>> = 0, targetConsumerId = null, compressed = false, userID = null, content
>>> =
>>> null, marshalledProperties = null, dataStructure = null,
>>> redeliveryCounter =
>>> 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
>>> =
>>> false, droppable = false, text = abc}
>>> 2011-07-08 10:52:10,083 [main] DEBUG
>>> org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
>>> JMS
>>> message to: queue://dest with message: ActiveMQTextMessage {commandId =
>>> 0,
>>> responseRequired = false, messageId =
>>> ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
>>> null, originalTransactionId = null, producerId = null, destination =
>>> queue://dest, transactionId = null, expiration = 1310158350082, timestamp
>>> =
>>> 1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
>>> correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
>>> queue://replyQueue, persistent = true, type = null, priority = 4, groupID
>>> =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> false, readOnlyBody = false, droppable = false, text = abc}
>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>> org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
>>> to
>>> see if any entries has been timed out
>>> 2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
>>> JmsReplyManagerTimeoutChecker[dest] TRACE
>>> org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
>>> timeout map
>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG
>>>
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>> - Consumer [ActiveMQMessageConsumer {
>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
>>> session [ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
>>> receive a message
>>> 2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using
>>>
>>> MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>>
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
>>> - Received message of type [class
>>> org.apache.activemq.command.ActiveMQTextMessage] from consumer
>>> [ActiveMQMessageConsumer {
>>> value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
>>> session [ActiveMQSession
>>> {id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
>>> Received
>>> reply message with correlationID:
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4
>>> ->  ActiveMQTextMessage {commandId = 9, responseRequired = true,
>>> messageId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
>>> null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>> 1310158331077, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> true, readOnlyBody = true, droppable = false, text = reply}
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
>>> org.apache.camel.component.jms.JmsBinding - Extracting body as a
>>> TextMessage
>>> from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
>>> true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
>>> originalDestination = null, originalTransactionId = null, producerId =
>>> ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
>>> queue://replyQueue, transactionId = null, expiration = 0, timestamp =
>>> 1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
>>> 1310158331077, correlationId =
>>> ID-rsi-eng-newsham-61454-1310158327407-0-4,
>>> replyTo = null, persistent = true, type = null, priority = 4, groupID =
>>> null, groupSequence = 0, targetConsumerId = null, compressed = false,
>>> userID
>>> = null, content = null, marshalledProperties = null, dataStructure =
>>> null,
>>> redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
>>> true, readOnlyBody = true, droppable = false, text = reply}
>>> 2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
>>> org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
>>> received. Setting reply as OUT message: reply
>>> received reply in: 1.004 s
>>>
>>> Thanks,
>>> Jim
>>>
>>>
>>>
>>> On 7/7/2011 10:59 PM, Claus Ibsen wrote:
>>>>
>>>> Can you enable TRACE logging at org.apache.camel.component.jms and run
>>>> it for both examples.
>>>> To see what happens.
>>>>
>>>>
>>>>
>>>> On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jnews...@referentia.com>
>>>>  wrote:
>>>>>
>>>>> I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
>>>>> I
>>>>> specify a custom replyTo destination on the endpoint url, the time it
>>>>> takes
>>>>> for the producer to receive a reply increases drastically.  The curious
>>>>> thing is that the time to receive a reply is almost exactly 1 second.
>>>>>  When
>>>>> I remove the replyTo from the url, everything's fast again.
>>>>>
>>>>> I created a very simple, stand-alone test to demonstrate what I'm
>>>>> seeing.
>>>>>  There is a server class [4] which runs an embedded instance of
>>>>> ActiveMQ
>>>>> and
>>>>> simply replies to messages as they arrive; and a client [3] class which
>>>>> simply sends messages to the server, and prints the elapsed time.  The
>>>>> USE_REPLY_TO symbolic constant in the client determines whether a
>>>>> replyTo
>>>>> value is added to the url or not.
>>>>>
>>>>> The client output when USE_REPLY_TO is false is shown as [1].  The
>>>>> client
>>>>> output when USE_REPLY_TO is true is shown as [2].  The code is pretty
>>>>> trivial.  Am I doing something wrong, or is this a Camel and/or
>>>>> ActiveMQ
>>>>> issue?
>>>>>
>>>>> Thanks!
>>>>> Jim
>>>>>
>>>>>
>>>>> [1] USE_REPLY_TO = false
>>>>>
>>>>> received reply in: 0.476 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> received reply in: 0.006 s
>>>>> ...
>>>>>
>>>>>
>>>>> [2] USE_REPLY_TO = true
>>>>>
>>>>> received reply in: 1.524 s
>>>>> received reply in: 1.002 s
>>>>> received reply in: 1.003 s
>>>>> received reply in: 1.003 s
>>>>> received reply in: 1.002 s
>>>>> ...
>>>>>
>>>>>
>>>>> [3] TestReplyToClient.java
>>>>>
>>>>> package test;
>>>>>
>>>>> import org.apache.activemq.ActiveMQConnectionFactory;
>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>> import org.apache.camel.CamelContext;
>>>>> import org.apache.camel.ProducerTemplate;
>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>
>>>>> public class TestReplyToClient {
>>>>>
>>>>>  private static final boolean USE_REPLY_TO = false;
>>>>>
>>>>>  public static void main(String... args) throws Exception {
>>>>>    // create camel context; configure activemq component for
>>>>> tcp://localhost:7001
>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>    ActiveMQComponent activemqComponent =
>>>>> ActiveMQComponent.activeMQComponent();
>>>>>    activemqComponent.setConnectionFactory(new
>>>>> ActiveMQConnectionFactory(
>>>>>      null, null, "tcp://localhost:7001"));
>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>    context.start();
>>>>>
>>>>>    // define url to send requests to
>>>>>    String sendUrl = "activemq:queue:dest";
>>>>>    if (USE_REPLY_TO) {
>>>>>      sendUrl += "?replyTo=replyQueue";
>>>>>    }
>>>>>    System.err.println("sending to url: " + sendUrl);
>>>>>
>>>>>    // repeatedly send requests; measure elapsed time
>>>>>    ProducerTemplate template = context.createProducerTemplate();
>>>>>    while (true) {
>>>>>      long startNanos = System.nanoTime();
>>>>>      template.requestBody(sendUrl, "abc");
>>>>>      long elapsedNanos = System.nanoTime() - startNanos;
>>>>>      System.err.println(String.format("received reply in: %.3f s",
>>>>> elapsedNanos / 1000000000.0));
>>>>>    }
>>>>>  }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>> [4] TestReplyToServer.java
>>>>>
>>>>> package test;
>>>>>
>>>>> import org.apache.activemq.broker.BrokerService;
>>>>> import org.apache.activemq.camel.component.ActiveMQComponent;
>>>>> import org.apache.camel.CamelContext;
>>>>> import org.apache.camel.Exchange;
>>>>> import org.apache.camel.Processor;
>>>>> import org.apache.camel.builder.RouteBuilder;
>>>>> import org.apache.camel.impl.DefaultCamelContext;
>>>>>
>>>>> public class TestReplyToServer {
>>>>>
>>>>>  private static final String BROKER_NAME = "thebroker";
>>>>>
>>>>>  public static void main(String... args) throws Exception {
>>>>>    startBroker();
>>>>>    startCamel();
>>>>>    Thread.sleep(Long.MAX_VALUE);
>>>>>  }
>>>>>
>>>>>  private static void startBroker() throws Exception {
>>>>>    BrokerService brokerService = new BrokerService();
>>>>>    brokerService.setBrokerName(BROKER_NAME);
>>>>>    brokerService.setSchedulerSupport(false);
>>>>>    brokerService.setPersistent(false);
>>>>>    brokerService.addConnector("tcp://0.0.0.0:7001");
>>>>>    brokerService.start();
>>>>>    brokerService.waitUntilStarted();
>>>>>  }
>>>>>
>>>>>
>>>>>  private static void startCamel() throws Exception {
>>>>>    CamelContext context = new DefaultCamelContext();
>>>>>
>>>>>    ActiveMQComponent activemqComponent =
>>>>> ActiveMQComponent.activeMQComponent();
>>>>>    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
>>>>> BROKER_NAME));
>>>>>    context.addComponent("activemq", activemqComponent);
>>>>>
>>>>>    final String receiveUrl = "activemq:queue:dest";
>>>>>    context.addRoutes(new RouteBuilder() {
>>>>>      @Override
>>>>>      public void configure() throws Exception {
>>>>>        from(receiveUrl).process(new Processor() {
>>>>>          @Override
>>>>>          public void process(Exchange exchange) throws Exception {
>>>>>            System.err.println("received request");
>>>>>            exchange.getOut().setBody("reply");
>>>>>          }
>>>>>        });
>>>>>      }
>>>>>    });
>>>>>
>>>>>    context.start();
>>>>>    System.err.println("listening on url: " + receiveUrl);
>>>>>  }
>>>>>
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to