Any clues from a trace of the receiver?

$ PN_TRACE_FRM=1 ./receiver 6666

-Ted

On 04/04/2013 02:09 PM, Michael Goulish wrote:

   Is this a bug, or am I  Doing  Something  Wrong ?



Scenario
{
   My sender sends a single message, and hopes to see
   that the receiver has accepted it.

   I launch 3 copies of the sender very close together--
   they all talk to the same address.

   My receiver receives in a loop, accepts every message
   that it receives.
}




Result
{
   Sometimes my receiver gets 1 of the 3 messages.
   Usually it gets 2.
   It never gets all 3.

   The 3rd sender hangs in pn_messenger_send().

   While the 3rd sender is hanging in send(), the receiver
   is patiently waiting in recv().
}






Sender Code ############################################

/*
   Launch 3 of these from a script like so:
   ./sender 6666 &
   ./sender 6666 &
   ./sender 6666 &
*/


#include "proton/message.h"
#include "proton/messenger.h"

#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>


char *
status_2_str ( pn_status_t status )
{
   switch ( status )
   {
     case PN_STATUS_UNKNOWN:
       return "unknown";
       break;

     case PN_STATUS_PENDING:
       return "pending";
       break;

     case PN_STATUS_ACCEPTED:
       return "accepted";
       break;

     case PN_STATUS_REJECTED:
       return "rejected";
       break;

     default:
       return "bad value";
       break;
   }
}



pid_t my_pid = 0;


void
check ( char * label, int result )
{
   fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
}



int
main(int argc, char** argv)
{
   int c;
   char addr [ 1000 ];
   char msgtext [ 100 ];
   pn_message_t   * message;
   pn_messenger_t * messenger;
   pn_data_t      * body;
   pn_tracker_t     tracker;
   pn_status_t      status;
   int              result;

   my_pid = getpid();

   sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );


   message = pn_message ( );
   messenger = pn_messenger ( NULL );
   pn_messenger_start ( messenger ) ;
   pn_messenger_set_outgoing_window ( messenger, 1 );


   pn_message_set_address ( message, addr );
   body = pn_message_body ( message );


   sprintf ( msgtext, "Message from %d", getpid() );
   pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
   pn_messenger_put ( messenger, message );
   tracker = pn_messenger_outgoing_tracker ( messenger );
   pn_messenger_send ( messenger );


   status = pn_messenger_status ( messenger, tracker );
   fprintf ( stderr, "status : %s\n", status_2_str(status) );


   pn_messenger_stop ( messenger );
   pn_messenger_free ( messenger );
   pn_message_free ( message );

   return 0;
}




Receiver Code ########################################################

/*

   Launch like this:
   ./receiver 6666
*/

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>

#include "proton/message.h"
#include "proton/messenger.h"



#define BUFSIZE 1024



int
main(int argc, char** argv)
{
   size_t bufsize = BUFSIZE;
   char buffer [ BUFSIZE ];
   char addr [ 1000 ];
   pn_message_t   * message;
   pn_messenger_t * messenger;
   pn_data_t      * body;
   pn_tracker_t     tracker;


   sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );

   message = pn_message();
   messenger = pn_messenger ( NULL );

   pn_messenger_start(messenger);
   pn_messenger_subscribe ( messenger, addr );
   pn_messenger_set_incoming_window ( messenger, 5 );

   /*---------------------------------
     Receive and accept the message.
   ---------------------------------*/
   while ( 1 )
   {
     fprintf ( stderr, "receiving...\n" );
     pn_messenger_recv ( messenger, 3 );

     while ( pn_messenger_incoming ( messenger ) > 0 )
     {
       fprintf ( stderr, "getting message...\n" );
       pn_messenger_get ( messenger, message );
       tracker = pn_messenger_incoming_tracker ( messenger );
       pn_messenger_accept ( messenger, tracker, 0 );
       body = pn_message_body ( message );
       pn_data_format ( body, buffer, & bufsize );
       fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
       fprintf ( stdout, "Content: %s\n", buffer);
     }
   }

   pn_messenger_stop(messenger);
   pn_messenger_free(messenger);

   return 0;
}



Reply via email to