On 7/11/2011 7:25 PM, Claus Ibsen wrote:
On Mon, Jul 11, 2011 at 10:12 PM, Jim Newsham<jnews...@referentia.com>  wrote:
Hi Claus,

Thanks for opening the jira issue, and for your comments.  To answer your
questions:

1.  We use fixed reply-to queues which are exclusive to Camel.
2.  We need a fixed reply-to queue to avoid losing reply messages in case of
disconnection (which would happen with temporary queues because they are
scoped to the lifetime of the connection).

I think we should add an option to the JMS component so you can
configure that the replyTo is to be consider as exclusive.

?replyTo=bar&exclusiveReplyTo=true

Something like this. Anyone got a better idea for a name of the option?
That would be fine for me. :) I don't have any better ideas regarding the parameter name.


Likewise I have pondered about if we should add an option to have
convention over configuration? So for example you can configure on the
jms component, a pattern of the reply to names.

jmsComponent.setExclusiveReplyTo(true);
jmsComponent.setReplyToPattern("${name}.reply");

So in this example Camel will automatic name the reply to queues, as
the request queue name + ".reply". So in the example above we can do

from X
to("jms:queue:bar")
to Y

So in this example since we configured the component with exclusive
reply to, and a reply to pattern as well. Then what happens is that
Camel will use a reply to queue named: bar.reply


Of course you can still configure all the options on the endpoint as
well if you like. And you can override the component settings so in
case you want a special reply to name you can do:

from X
to("jms:queue:bar?replyTo=special")
to Y

Any thoughts?

This wouldn't be useful for me since I don't have a one-to-one correspondence between request queues and reply queues, but I can see how it would be useful for people who do. Some people may want the ability to override the settings on a particular url, so that a temporary queue is used for that route.

Jim



Regards,
Jim

On 7/9/2011 5:52 AM, Claus Ibsen wrote:
Hi Jim

I have created a ticket and posted some comments about the issue
https://issues.apache.org/jira/browse/CAMEL-4202

Are you using a fixed reply to queue that is *exclusive* to the Camel
route?
Or is the queue used for other purposes as well?

Is there a special reason why you want to use a fixed reply to queue?



On Fri, Jul 8, 2011 at 11:14 PM, Jim Newsham<jnews...@referentia.com>
  wrote:
Hi Claus,

I enabled trace logging.  I'm attaching the logs (for both client and
server; both with and without custom replyTo) as a zip file -- not sure
if
the mailing list will filter it, we'll see.

I see that there are 5 messages in the client log which only appear when
a
custom replyTo is specified:  "Running purge task to see if any entries
has
been timed out", "There are 1 in the timeout map", "did not receive a
message", etc.  Here's an excerpt from each client log, for one exchange:

  From log for client without replyTo:

2011-07-08 10:55:32,354 [main] TRACE
org.apache.camel.component.jms.JmsProducer - Using inOut jms template
2011-07-08 10:55:32,361 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Executing
callback on JMS Session: ActiveMQSession
{id=ID:rsi-eng-newsham-61473-1310158531968-0:3:1,started=false}
2011-07-08 10:55:32,361 [main] TRACE
org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
2011-07-08 10:55:32,362 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Sending
JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
=
0, responseRequired = false, messageId = null, originalDestination =
null,
originalTransactionId = null, producerId = null, destination = null,
transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
brokerInTime = 0, brokerOutTime = 0, correlationId =
ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
false, type = null, priority = 0, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content =
null,
marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
false, droppable = false, text = abc}
2011-07-08 10:55:32,363 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
JMS
message to: queue://dest with message: ActiveMQTextMessage {commandId =
0,
responseRequired = false, messageId =
ID:rsi-eng-newsham-61473-1310158531968-0:3:1:1:1, originalDestination =
null, originalTransactionId = null, producerId = null, destination =
queue://dest, transactionId = null, expiration = 1310158552362, timestamp
=
1310158532362, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo =
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, persistent =
true, type = null, priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content =
null,
marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 0, properties = null, readOnlyProperties = false, readOnlyBody =
false, droppable = false, text = abc}
2011-07-08 10:55:32,368 [DefaultMessageListenerContainer-1] DEBUG
org.apache.camel.component.jms.reply.TemporaryQueueReplyManager -
Received
reply message with correlationID:
ID-rsi-eng-newsham-61472-1310158530715-0-4
->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
messageId =
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2, originalDestination =
null, originalTransactionId = null, producerId =
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
=
null, expiration = 0, timestamp = 1310158532367, arrival = 0,
brokerInTime =
1310158532367, brokerOutTime = 1310158532367, correlationId =
ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
true, type = null, priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content =
null,
marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
true,
droppable = false, text = reply}
2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] TRACE
org.apache.camel.component.jms.JmsBinding - Extracting body as a
TextMessage
from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
true, messageId = ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1:2,
originalDestination = null, originalTransactionId = null, producerId =
ID:rsi-eng-newsham-61470-1310158525476-2:1:1:1, destination =
temp-queue://ID:rsi-eng-newsham-61473-1310158531968-0:1:1, transactionId
=
null, expiration = 0, timestamp = 1310158532367, arrival = 0,
brokerInTime =
1310158532367, brokerOutTime = 1310158532367, correlationId =
ID-rsi-eng-newsham-61472-1310158530715-0-4, replyTo = null, persistent =
true, type = null, priority = 4, groupID = null, groupSequence = 0,
targetConsumerId = null, compressed = false, userID = null, content =
null,
marshalledProperties = null, dataStructure = null, redeliveryCounter = 0,
size = 0, properties = null, readOnlyProperties = true, readOnlyBody =
true,
droppable = false, text = reply}
2011-07-08 10:55:32,369 [DefaultMessageListenerContainer-1] DEBUG
org.apache.camel.component.jms.reply.TemporaryQueueReplyManager - Reply
received. Setting reply as OUT message: reply
received reply in: 0.015 s

  From log for client with replyTo:

2011-07-08 10:52:10,075 [main] TRACE
org.apache.camel.component.jms.JmsProducer - Using inOut jms template
2011-07-08 10:52:10,081 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Executing
callback on JMS Session: ActiveMQSession
{id=ID:rsi-eng-newsham-61455-1310158328671-0:3:1,started=false}
2011-07-08 10:52:10,082 [main] TRACE
org.apache.camel.component.jms.JmsBinding - Using JmsMessageType: Text
2011-07-08 10:52:10,082 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate -
Sending
JMS message to: queue://dest with message: ActiveMQTextMessage {commandId
=
0, responseRequired = false, messageId = null, originalDestination =
null,
originalTransactionId = null, producerId = null, destination = null,
transactionId = null, expiration = 0, timestamp = 0, arrival = 0,
brokerInTime = 0, brokerOutTime = 0, correlationId =
ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo = queue://replyQueue,
persistent = false, type = null, priority = 0, groupID = null,
groupSequence
= 0, targetConsumerId = null, compressed = false, userID = null, content
=
null, marshalledProperties = null, dataStructure = null,
redeliveryCounter =
0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody
=
false, droppable = false, text = abc}
2011-07-08 10:52:10,083 [main] DEBUG
org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate - Sent
JMS
message to: queue://dest with message: ActiveMQTextMessage {commandId =
0,
responseRequired = false, messageId =
ID:rsi-eng-newsham-61455-1310158328671-0:3:1:1:1, originalDestination =
null, originalTransactionId = null, producerId = null, destination =
queue://dest, transactionId = null, expiration = 1310158350082, timestamp
=
1310158330082, arrival = 0, brokerInTime = 0, brokerOutTime = 0,
correlationId = ID-rsi-eng-newsham-61454-1310158327407-0-4, replyTo =
queue://replyQueue, persistent = true, type = null, priority = 4, groupID
=
null, groupSequence = 0, targetConsumerId = null, compressed = false,
userID
= null, content = null, marshalledProperties = null, dataStructure =
null,
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
false, readOnlyBody = false, droppable = false, text = abc}
2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
JmsReplyManagerTimeoutChecker[dest] TRACE
org.apache.camel.component.jms.reply.CorrelationMap - Running purge task
to
see if any entries has been timed out
2011-07-08 10:52:10,536 [Camel (camel-1) thread #0 -
JmsReplyManagerTimeoutChecker[dest] TRACE
org.apache.camel.component.jms.reply.CorrelationMap - There are 1 in the
timeout map
2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] DEBUG

org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
- Consumer [ActiveMQMessageConsumer {
value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:3, started=true }] of
session [ActiveMQSession
{id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}] did not
receive a message
2011-07-08 10:52:11,075 [DefaultMessageListenerContainer-1] TRACE
org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Using

MessageSelector[JMSCorrelationID='ID-rsi-eng-newsham-61454-1310158327407-0-4']
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG

org.apache.camel.component.jms.reply.PersistentQueueReplyManager$PersistentQueueMessageListenerContainer
- Received message of type [class
org.apache.activemq.command.ActiveMQTextMessage] from consumer
[ActiveMQMessageConsumer {
value=ID:rsi-eng-newsham-61455-1310158328671-0:1:1:4, started=true }] of
session [ActiveMQSession
{id=ID:rsi-eng-newsham-61455-1310158328671-0:1:1,started=true}]
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
org.apache.camel.component.jms.reply.PersistentQueueReplyManager -
Received
reply message with correlationID:
ID-rsi-eng-newsham-61454-1310158327407-0-4
->    ActiveMQTextMessage {commandId = 9, responseRequired = true,
messageId =
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2, originalDestination =
null, originalTransactionId = null, producerId =
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
queue://replyQueue, transactionId = null, expiration = 0, timestamp =
1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
1310158331077, correlationId =
ID-rsi-eng-newsham-61454-1310158327407-0-4,
replyTo = null, persistent = true, type = null, priority = 4, groupID =
null, groupSequence = 0, targetConsumerId = null, compressed = false,
userID
= null, content = null, marshalledProperties = null, dataStructure =
null,
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] TRACE
org.apache.camel.component.jms.JmsBinding - Extracting body as a
TextMessage
from JMS message: ActiveMQTextMessage {commandId = 9, responseRequired =
true, messageId = ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1:2,
originalDestination = null, originalTransactionId = null, producerId =
ID:rsi-eng-newsham-61452-1310158320230-2:1:1:1, destination =
queue://replyQueue, transactionId = null, expiration = 0, timestamp =
1310158330085, arrival = 0, brokerInTime = 1310158330085, brokerOutTime =
1310158331077, correlationId =
ID-rsi-eng-newsham-61454-1310158327407-0-4,
replyTo = null, persistent = true, type = null, priority = 4, groupID =
null, groupSequence = 0, targetConsumerId = null, compressed = false,
userID
= null, content = null, marshalledProperties = null, dataStructure =
null,
redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties =
true, readOnlyBody = true, droppable = false, text = reply}
2011-07-08 10:52:11,078 [DefaultMessageListenerContainer-1] DEBUG
org.apache.camel.component.jms.reply.PersistentQueueReplyManager - Reply
received. Setting reply as OUT message: reply
received reply in: 1.004 s

Thanks,
Jim



On 7/7/2011 10:59 PM, Claus Ibsen wrote:
Can you enable TRACE logging at org.apache.camel.component.jms and run
it for both examples.
To see what happens.



On Fri, Jul 8, 2011 at 3:18 AM, Jim Newsham<jnews...@referentia.com>
  wrote:
I'm using Camel 2.7.1 on top of ActiveMQ 5.5.0.  For some reason, when
I
specify a custom replyTo destination on the endpoint url, the time it
takes
for the producer to receive a reply increases drastically.  The curious
thing is that the time to receive a reply is almost exactly 1 second.
  When
I remove the replyTo from the url, everything's fast again.

I created a very simple, stand-alone test to demonstrate what I'm
seeing.
  There is a server class [4] which runs an embedded instance of
ActiveMQ
and
simply replies to messages as they arrive; and a client [3] class which
simply sends messages to the server, and prints the elapsed time.  The
USE_REPLY_TO symbolic constant in the client determines whether a
replyTo
value is added to the url or not.

The client output when USE_REPLY_TO is false is shown as [1].  The
client
output when USE_REPLY_TO is true is shown as [2].  The code is pretty
trivial.  Am I doing something wrong, or is this a Camel and/or
ActiveMQ
issue?

Thanks!
Jim


[1] USE_REPLY_TO = false

received reply in: 0.476 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
received reply in: 0.006 s
...


[2] USE_REPLY_TO = true

received reply in: 1.524 s
received reply in: 1.002 s
received reply in: 1.003 s
received reply in: 1.003 s
received reply in: 1.002 s
...


[3] TestReplyToClient.java

package test;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToClient {

  private static final boolean USE_REPLY_TO = false;

  public static void main(String... args) throws Exception {
    // create camel context; configure activemq component for
tcp://localhost:7001
    CamelContext context = new DefaultCamelContext();
    ActiveMQComponent activemqComponent =
ActiveMQComponent.activeMQComponent();
    activemqComponent.setConnectionFactory(new
ActiveMQConnectionFactory(
      null, null, "tcp://localhost:7001"));
    context.addComponent("activemq", activemqComponent);
    context.start();

    // define url to send requests to
    String sendUrl = "activemq:queue:dest";
    if (USE_REPLY_TO) {
      sendUrl += "?replyTo=replyQueue";
    }
    System.err.println("sending to url: " + sendUrl);

    // repeatedly send requests; measure elapsed time
    ProducerTemplate template = context.createProducerTemplate();
    while (true) {
      long startNanos = System.nanoTime();
      template.requestBody(sendUrl, "abc");
      long elapsedNanos = System.nanoTime() - startNanos;
      System.err.println(String.format("received reply in: %.3f s",
elapsedNanos / 1000000000.0));
    }
  }

}


[4] TestReplyToServer.java

package test;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class TestReplyToServer {

  private static final String BROKER_NAME = "thebroker";

  public static void main(String... args) throws Exception {
    startBroker();
    startCamel();
    Thread.sleep(Long.MAX_VALUE);
  }

  private static void startBroker() throws Exception {
    BrokerService brokerService = new BrokerService();
    brokerService.setBrokerName(BROKER_NAME);
    brokerService.setSchedulerSupport(false);
    brokerService.setPersistent(false);
    brokerService.addConnector("tcp://0.0.0.0:7001");
    brokerService.start();
    brokerService.waitUntilStarted();
  }


  private static void startCamel() throws Exception {
    CamelContext context = new DefaultCamelContext();

    ActiveMQComponent activemqComponent =
ActiveMQComponent.activeMQComponent();
    activemqComponent.setBrokerURL(String.format("vm://%s?create=false",
BROKER_NAME));
    context.addComponent("activemq", activemqComponent);

    final String receiveUrl = "activemq:queue:dest";
    context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from(receiveUrl).process(new Processor() {
          @Override
          public void process(Exchange exchange) throws Exception {
            System.err.println("received request");
            exchange.getOut().setBody("reply");
          }
        });
      }
    });

    context.start();
    System.err.println("listening on url: " + receiveUrl);
  }

}










Reply via email to