Hi,

when I set message reply_to="~" the messenger changes that to "amqp://<name>" in outward_munge.

is such address format for addressing the container by name required by some AMQP spec or is this a messenger/proton specific feature?

for something like this:

  m = Messenger("www.google.com:80")
  msg = Message()
  msg.address = "amqp://127.0.0.1:12345"
  msg.reply_to = "~"
  m.send(1)
  sys.exit(0)

the reply_to will be amqp://www.google.com:80

The server from the testcase will *appear* to work, because the messenger is not given a chance to notice connection teardown before it needs to resolve the reply_to address.

Apply the attached patch 0001-... and run:

# window 1 'server' that sends two replies to each request
$ PN_TRACE_FRM=1 ./recv -rr
# window 2 'google'
$ PN_TRACE_FRM=1 ./recv amqp://~127.0.0.1:23456/
# window 3 'client' with an unfortunate name that dies before it can receive the response
$ PN_TRACE_FRM=1 ./send -n127.0.0.1:23456 -rw

which forces server to send two replies, one by one and give the client the chance to go away.

I think it might be better for that address to read "amqp:///<name>" so that the 'magic' is signaled by empty domain which is otherwise not valid.

or maybe even "amqp:///~<name>/" so that it doesn't pollute the the 'normal' part of the address namespace?

The attached patch 0002-... implements the former solution.

Now the server dies with:
recv.c:162: unable to connect to amqp:///127.0.0.1:23456: connect: Address family not supported by protocol family

The other beneficial effect of this change is that 'server' has no more sender links with null address which also solves the problem from my previous question.

Bozzo
From a296f585c95b3d2f9001e9c60e4970769581fb04 Mon Sep 17 00:00:00 2001
From: Bozo Dragojevic <bozzo.dragoje...@gmail.com>
Date: Sun, 3 Nov 2013 17:07:18 +0100
Subject: [PATCH 1/2] allow sending and expecting more than one reply allow
 unclean exit after sending a message with expected reply

---
 examples/messenger/c/recv.c | 30 +++++++++++++++++++++++++-----
 examples/messenger/c/send.c | 45 +++++++++++++++++++++++++++++++++++++++------
 2 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/examples/messenger/c/recv.c b/examples/messenger/c/recv.c
index fe7b153..5fac9df 100644
--- a/examples/messenger/c/recv.c
+++ b/examples/messenger/c/recv.c
@@ -40,13 +40,14 @@ void die(const char *file, int line, const char *message)
   exit(1);
 }
 
-void usage()
+void usage(const char *address)
 {
   printf("Usage: recv [options] <addr>\n");
   printf("-c    \tPath to the certificate file.\n");
   printf("-k    \tPath to the private key file.\n");
   printf("-p    \tPassword for the private key.\n");
-  printf("<addr>\tAn address.\n");
+  printf("-r    \treply if reply_to is set. (can be specified more than 
once)\n");
+  printf("<addr>\tAn address. (%s)\n", address);
   exit(0);
 }
 
@@ -55,18 +56,21 @@ int main(int argc, char** argv)
   char* certificate = NULL;
   char* privatekey = NULL;
   char* password = NULL;
+  int reply = 0;
   char* address = (char *) "amqp://~0.0.0.0";
   int c;
   opterr = 0;
 
-  while((c = getopt(argc, argv, "hc:k:p:")) != -1)
+  while((c = getopt(argc, argv, "hrc:k:p:")) != -1)
   {
     switch(c)
     {
     case 'h':
-      usage();
+      usage(address);
       break;
 
+    case 'r': reply++; break;
+
     case 'c': certificate = optarg; break;
     case 'k': privatekey = optarg; break;
     case 'p': password = optarg; break;
@@ -142,8 +146,24 @@ int main(int argc, char** argv)
 
       printf("Address: %s\n", pn_message_get_address(message));
       const char* subject = pn_message_get_subject(message);
-      printf("Subject: %s\n", subject ? subject : "(no subject)");
+      subject = subject ? subject : "(no subject)";
+      printf("Subject: %s\n", subject);
       printf("Content: %s\n", buffer);
+      for (int i = 0; i < reply; i++) {
+        const char *reply_to = pn_message_get_reply_to(message);
+        if (reply_to) {
+          printf("Reply %d/%d to %s\n", i, reply, reply_to);
+          pn_message_set_address(message, reply_to);
+          char subject_buffer[1024];
+          strcpy(subject_buffer, "Re: ");
+          strcat(subject_buffer, subject);
+          pn_message_set_subject(message, subject_buffer);
+          pn_messenger_put(messenger, message);
+          check(messenger);
+          pn_messenger_send(messenger, -1);
+          check(messenger);
+        }
+      }
     }
   }
 
diff --git a/examples/messenger/c/send.c b/examples/messenger/c/send.c
index 9b969d2..e482635 100644
--- a/examples/messenger/c/send.c
+++ b/examples/messenger/c/send.c
@@ -41,10 +41,13 @@ void die(const char *file, int line, const char *message)
   exit(1);
 }
 
-void usage()
+void usage(const char *address)
 {
   printf("Usage: send [-a addr] [message]\n");
-  printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
+  printf("-a     \tThe target address [amqp[s]://domain[/name]] (%s)\n", 
address);
+  printf("-n     \tThe messenger name (default uuid)\n");
+  printf("-r     \trequest reply and expect it (can be specified more than 
once)\n");
+  printf("-w     \tuncleanly exit before last reply is expected (can be 
specified more than once)\n");
   printf("message\tA text string to send.\n");
   exit(0);
 }
@@ -53,16 +56,21 @@ int main(int argc, char** argv)
 {
   int c;
   opterr = 0;
+  int wait_reply = 0;
+  int reply = 0;
   char * address = (char *) "amqp://0.0.0.0";
   char * msgtext = (char *) "Hello World!";
+  char * name = NULL;
 
-  while((c = getopt(argc, argv, "ha:b:c:")) != -1)
+  while((c = getopt(argc, argv, "hrwa:n:")) != -1)
   {
     switch(c)
     {
+    case 'h': usage(address); break;
+    case 'r': reply++; wait_reply++; break;
+    case 'w': wait_reply--; break;
     case 'a': address = optarg; break;
-    case 'h': usage(); break;
-
+    case 'n': name = optarg; break;
     case '?':
       if(optopt == 'a')
       {
@@ -88,17 +96,42 @@ int main(int argc, char** argv)
   pn_messenger_t * messenger;
 
   message = pn_message();
-  messenger = pn_messenger(NULL);
+  messenger = pn_messenger(name);
 
   pn_messenger_start(messenger);
 
   pn_message_set_address(message, address);
+  if (reply)
+    pn_message_set_reply_to(message, "~");
   pn_data_t *body = pn_message_body(message);
   pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
   pn_messenger_put(messenger, message);
   check(messenger);
   pn_messenger_send(messenger, -1);
   check(messenger);
+  for (int i = 0; i < reply; i++) {
+    if ( i < wait_reply) {
+      printf("wait reply %d/%d/%d\n", i, wait_reply, reply);
+      pn_messenger_recv(messenger, 1);
+      check(messenger);
+      pn_messenger_get(messenger, message);
+      check(messenger);
+
+      char buffer[1024];
+      size_t buffsize = sizeof(buffer);
+      pn_data_t *body = pn_message_body(message);
+      pn_data_format(body, buffer, &buffsize);
+
+      printf("Address: %s\n", pn_message_get_address(message));
+      const char* subject = pn_message_get_subject(message);
+      subject = subject ? subject : "(no subject)";
+      printf("Subject: %s\n", subject);
+      printf("Content: %s\n", buffer);
+    } else if (reply) {
+      printf("Unclean exit before reply %d/%d/%d\n", i, wait_reply, reply);
+      exit(0);
+    }
+  }
 
   pn_messenger_stop(messenger);
   pn_messenger_free(messenger);
-- 
1.8.3.4

From 5c20f9a5765c1f6429141c8212e40370a6b6c5b9 Mon Sep 17 00:00:00 2001
From: Bozo Dragojevic <bozzo.dragoje...@gmail.com>
Date: Sun, 3 Nov 2013 17:22:18 +0100
Subject: [PATCH 2/2] Change default reply_to address to the container name

---
 proton-c/src/messenger/messenger.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/proton-c/src/messenger/messenger.c 
b/proton-c/src/messenger/messenger.c
index 999fe64..d7a2b70 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -1040,7 +1040,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t 
*messenger, const char *add
       return connection;
     }
     const char *container = pn_connection_remote_container(connection);
-    if (pn_streq(container, domain)) {
+    if (pn_streq(container, *name) && pn_streq(domain, "")) {
       return connection;
     }
     ctor = pn_connector_next(ctor);
@@ -1188,7 +1188,7 @@ static void outward_munge(pn_messenger_t *mng, 
pn_message_t *msg)
       heapbuf = (char *) malloc(needed);
       buf = heapbuf;
     }
-    sprintf(buf, "amqp://%s", mng->name);
+    sprintf(buf, "amqp:///%s", mng->name);
     pn_message_set_reply_to(msg, buf);
   }
   if (heapbuf) free (heapbuf);
-- 
1.8.3.4

Reply via email to