Is this a bug, or am I  Doing  Something  Wrong ?



Scenario
{
  My sender sends a single message, and hopes to see
  that the receiver has accepted it.

  I launch 3 copies of the sender very close together-- 
  they all talk to the same address.   

  My receiver receives in a loop, accepts every message
  that it receives.
}




Result
{
  Sometimes my receiver gets 1 of the 3 messages.
  Usually it gets 2.
  It never gets all 3.

  The 3rd sender hangs in pn_messenger_send().

  While the 3rd sender is hanging in send(), the receiver
  is patiently waiting in recv().
}






Sender Code ############################################

/*
  Launch 3 of these from a script like so:
  ./sender 6666 &
  ./sender 6666 &
  ./sender 6666 &
*/


#include "proton/message.h"
#include "proton/messenger.h"

#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>


char *
status_2_str ( pn_status_t status )
{
  switch ( status )
  {
    case PN_STATUS_UNKNOWN:
      return "unknown";
      break;

    case PN_STATUS_PENDING:
      return "pending";
      break;

    case PN_STATUS_ACCEPTED:
      return "accepted";
      break;

    case PN_STATUS_REJECTED:
      return "rejected";
      break;

    default:
      return "bad value";
      break;
  }
}



pid_t my_pid = 0;


void
check ( char * label, int result )
{
  fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
}



int
main(int argc, char** argv)
{
  int c;
  char addr [ 1000 ];
  char msgtext [ 100 ];
  pn_message_t   * message;
  pn_messenger_t * messenger;
  pn_data_t      * body;
  pn_tracker_t     tracker;
  pn_status_t      status;
  int              result;

  my_pid = getpid();

  sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );


  message = pn_message ( );
  messenger = pn_messenger ( NULL );
  pn_messenger_start ( messenger ) ;
  pn_messenger_set_outgoing_window ( messenger, 1 );


  pn_message_set_address ( message, addr );
  body = pn_message_body ( message );


  sprintf ( msgtext, "Message from %d", getpid() );
  pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
  pn_messenger_put ( messenger, message );
  tracker = pn_messenger_outgoing_tracker ( messenger );
  pn_messenger_send ( messenger );


  status = pn_messenger_status ( messenger, tracker );
  fprintf ( stderr, "status : %s\n", status_2_str(status) );


  pn_messenger_stop ( messenger );
  pn_messenger_free ( messenger );
  pn_message_free ( message );

  return 0;
}




Receiver Code ########################################################

/*

  Launch like this:
  ./receiver 6666
*/

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>

#include "proton/message.h"
#include "proton/messenger.h"



#define BUFSIZE 1024



int
main(int argc, char** argv)
{
  size_t bufsize = BUFSIZE;
  char buffer [ BUFSIZE ];
  char addr [ 1000 ];
  pn_message_t   * message;
  pn_messenger_t * messenger;
  pn_data_t      * body;
  pn_tracker_t     tracker;


  sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );

  message = pn_message();
  messenger = pn_messenger ( NULL );

  pn_messenger_start(messenger);
  pn_messenger_subscribe ( messenger, addr );
  pn_messenger_set_incoming_window ( messenger, 5 );

  /*---------------------------------
    Receive and accept the message.
  ---------------------------------*/
  while ( 1 )
  {
    fprintf ( stderr, "receiving...\n" );
    pn_messenger_recv ( messenger, 3 );

    while ( pn_messenger_incoming ( messenger ) > 0 )
    {
      fprintf ( stderr, "getting message...\n" );
      pn_messenger_get ( messenger, message );
      tracker = pn_messenger_incoming_tracker ( messenger );
      pn_messenger_accept ( messenger, tracker, 0 );
      body = pn_message_body ( message );
      pn_data_format ( body, buffer, & bufsize );
      fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
      fprintf ( stdout, "Content: %s\n", buffer);
    }
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}


Reply via email to