On 11/29/2013 11:17 AM, Toralf Lund wrote:
On 29/11/13 11:19, Gordon Sim wrote:
On 11/29/2013 09:08 AM, Toralf Lund wrote:
One option to reduce the number of distinct senders, and therefore
benefit more from caching, is to use a single a sender per exchange
(this is with AMQP 0-10) and just set the subject of the response
explicitly to match the required routing key.
Right.

It's not entirely clear to me how that might be done, though. I mean,
the reply address is just a queue - there is no explicit binding or
anything.


The one issue here at present is that there is no way to create a
sender for the default exchange. That has been requested before (I
can't find a JIRA though) and it would perhaps be worth exploring.
Ideally it would be done in a way that would translate fairly
naturally to 1.0 as well.
Yes. Maybe this is what I'd need for the "simple queue" address?


However, if you are able to control the types of reply-addresses you
use (e.g. amq.direct/# where # will be expanded to a uuid),

Yep. I'm still at a stage where I can easily change the addressing scheme.

So, in this setup, I'd use the "#" bit as subject?

When creating the receiver for responses, use an address like e.g. "amq.direct/#; {node:{type:topic}}", then in the service, get the reply to address and rather than directly creating a sender for it, create a sender for the name portion, e.g.:

  const Address& address = request.getReplyTo();
  if (address) {
      Sender sender = session.getSender(address.getName());
      Message response;
      /* fill in response object*/
      response.setSubject(address.getSubject());
      sender.send(response);
  }

You can then combine that with caching of the senders, e.g. using the approach suggested by Bruno. I've attached a patch to the current client/server example that shows this working.

What the use of the exchange in the reply-to does is allow you to use fewer senders overall since many reply-to addresses can use the same exchange, which may free you from worrying about when to close them.
diff --git a/qpid/cpp/examples/messaging/client.cpp b/qpid/cpp/examples/messaging/client.cpp
index 983f0a8..dc8c9d0 100644
--- a/qpid/cpp/examples/messaging/client.cpp
+++ b/qpid/cpp/examples/messaging/client.cpp
@@ -25,6 +25,7 @@
 #include <qpid/messaging/Receiver.h>
 #include <qpid/messaging/Sender.h>
 #include <qpid/messaging/Session.h>
+#include <qpid/types/Uuid.h>
 
 #include <cstdlib>
 #include <iostream>
@@ -48,8 +49,10 @@ int main(int argc, char** argv) {
         Sender sender = session.createSender("service_queue");
 
         //create temp queue & receiver...
-        Receiver receiver = session.createReceiver("#");
-        Address responseQueue = receiver.getAddress();
+        std::stringstream replyto;
+        replyto << "amq.direct/" << qpid::types::Uuid(true) << "; {node:{type:topic}}";
+        Address responseQueue(replyto.str());
+        Receiver receiver = session.createReceiver(responseQueue);
 
 	// Now send some messages ...
 	string s[] = {
diff --git a/qpid/cpp/examples/messaging/server.cpp b/qpid/cpp/examples/messaging/server.cpp
index aa271d9..ed2ab57 100644
--- a/qpid/cpp/examples/messaging/server.cpp
+++ b/qpid/cpp/examples/messaging/server.cpp
@@ -51,10 +51,18 @@ int main(int argc, char** argv) {
             Message request = receiver.fetch();
             const Address& address = request.getReplyTo();
             if (address) {
-                Sender sender = session.createSender(address);
+                Sender sender;
+                try {
+                    sender = session.getSender(address.getName());
+                    std::cout << "Using existing sender for " << address.getName() << std::endl;
+                } catch (KeyError& e) {
+                    sender = session.createSender(address.getName());
+                    std::cout << "Created new sender for " << address.getName() << std::endl;
+                }
                 std::string s = request.getContent();
                 std::transform(s.begin(), s.end(), s.begin(), toupper);
                 Message response(s);
+                response.setSubject(address.getSubject());
                 sender.send(response);
                 std::cout << "Processed request: " 
                           << request.getContent() 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to