Is this a bug, or am I Doing Something Wrong ?
Scenario { My sender sends a single message, and hopes to see that the receiver has accepted it. I launch 3 copies of the sender very close together-- they all talk to the same address. My receiver receives in a loop, accepts every message that it receives. } Result { Sometimes my receiver gets 1 of the 3 messages. Usually it gets 2. It never gets all 3. The 3rd sender hangs in pn_messenger_send(). While the 3rd sender is hanging in send(), the receiver is patiently waiting in recv(). } Sender Code ############################################ /* Launch 3 of these from a script like so: ./sender 6666 & ./sender 6666 & ./sender 6666 & */ #include "proton/message.h" #include "proton/messenger.h" #include <getopt.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <ctype.h> char * status_2_str ( pn_status_t status ) { switch ( status ) { case PN_STATUS_UNKNOWN: return "unknown"; break; case PN_STATUS_PENDING: return "pending"; break; case PN_STATUS_ACCEPTED: return "accepted"; break; case PN_STATUS_REJECTED: return "rejected"; break; default: return "bad value"; break; } } pid_t my_pid = 0; void check ( char * label, int result ) { fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result ); } int main(int argc, char** argv) { int c; char addr [ 1000 ]; char msgtext [ 100 ]; pn_message_t * message; pn_messenger_t * messenger; pn_data_t * body; pn_tracker_t tracker; pn_status_t status; int result; my_pid = getpid(); sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] ); message = pn_message ( ); messenger = pn_messenger ( NULL ); pn_messenger_start ( messenger ) ; pn_messenger_set_outgoing_window ( messenger, 1 ); pn_message_set_address ( message, addr ); body = pn_message_body ( message ); sprintf ( msgtext, "Message from %d", getpid() ); pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext )); pn_messenger_put ( messenger, message ); tracker = pn_messenger_outgoing_tracker ( messenger ); pn_messenger_send ( messenger ); status = pn_messenger_status ( messenger, tracker ); fprintf ( stderr, "status : %s\n", status_2_str(status) ); pn_messenger_stop ( messenger ); pn_messenger_free ( messenger ); pn_message_free ( message ); return 0; } Receiver Code ######################################################## /* Launch like this: ./receiver 6666 */ #include <stdio.h> #include <stdlib.h> #include <ctype.h> #include "proton/message.h" #include "proton/messenger.h" #define BUFSIZE 1024 int main(int argc, char** argv) { size_t bufsize = BUFSIZE; char buffer [ BUFSIZE ]; char addr [ 1000 ]; pn_message_t * message; pn_messenger_t * messenger; pn_data_t * body; pn_tracker_t tracker; sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] ); message = pn_message(); messenger = pn_messenger ( NULL ); pn_messenger_start(messenger); pn_messenger_subscribe ( messenger, addr ); pn_messenger_set_incoming_window ( messenger, 5 ); /*--------------------------------- Receive and accept the message. ---------------------------------*/ while ( 1 ) { fprintf ( stderr, "receiving...\n" ); pn_messenger_recv ( messenger, 3 ); while ( pn_messenger_incoming ( messenger ) > 0 ) { fprintf ( stderr, "getting message...\n" ); pn_messenger_get ( messenger, message ); tracker = pn_messenger_incoming_tracker ( messenger ); pn_messenger_accept ( messenger, tracker, 0 ); body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) ); fprintf ( stdout, "Content: %s\n", buffer); } } pn_messenger_stop(messenger); pn_messenger_free(messenger); return 0; }