Re: bug? interaction between two senders

2013-02-15 Thread Rafael Schloming
Can you post your code?

--Rafael

On Fri, Feb 15, 2013 at 5:22 AM, Michael Goulish mgoul...@redhat.comwrote:


 Have I found a bug ?


 scenario
 {
   receiver
   {
 I start a receiver and it subscribes to ports  and 6667.
 In a loop, it starts trying to recv messages.  I set timeout
 to 700 msec, so it keeps looping and tells me whenever it
 gets a message.
   }


   senders
   {
 From two separate shells, I start two senders simultaneously,
 node A sending to , and node B sending to 6667.
 Both senders will send 10 messages.
   }


   great expectation
   {
 I want to see the receiver's print-outs indicating that it
 is receiving messages from both senders, interleaved.
   }


   grim reality
   {
 The receiver gets messages only from sender A until
 sender A stops.
 Then the receiver gets all the messages from sender B.
 Sender B hangs at its first call to pn_messenger_send()
 until sender A calls stop().
   }


   comment
   {
 uh-oh.
   }

 }





Re: bug? interaction between two senders

2013-02-15 Thread Ted Ross

This is related to PROTON-200

On 02/15/2013 09:15 AM, Rafael Schloming wrote:

Can you post your code?

--Rafael

On Fri, Feb 15, 2013 at 5:22 AM, Michael Goulish mgoul...@redhat.comwrote:


Have I found a bug ?


scenario
{
   receiver
   {
 I start a receiver and it subscribes to ports  and 6667.
 In a loop, it starts trying to recv messages.  I set timeout
 to 700 msec, so it keeps looping and tells me whenever it
 gets a message.
   }


   senders
   {
 From two separate shells, I start two senders simultaneously,
 node A sending to , and node B sending to 6667.
 Both senders will send 10 messages.
   }


   great expectation
   {
 I want to see the receiver's print-outs indicating that it
 is receiving messages from both senders, interleaved.
   }


   grim reality
   {
 The receiver gets messages only from sender A until
 sender A stops.
 Then the receiver gets all the messages from sender B.
 Sender B hangs at its first call to pn_messenger_send()
 until sender A calls stop().
   }


   comment
   {
 uh-oh.
   }

}







Re: bug? interaction between two senders

2013-02-15 Thread Michael Goulish
Yes, I'll post code below -- but I just talked to Ted -- I have rediscovered 
PROTON-200 Credit distribution by messenger is not balanced across all links. 
 The first sender is getting all the credit, and the next sender doesn't get 
any until the first sender calls stop.



to run:

 1. start receiver this way:   ./receiver 

 2. in a second window,
start sender A this way:   ./sender A 

 3. in a third window, within a few seconds,
start receiver B this way: ./sender B  


result:

   receiver gets all 10 messages ( 1 per second ) from
   sender A.  While it is receiving those, sender B hangs
   at its first call to send().  Messages start coming from 
   sender B only after sender A calls stop().




-- start receiver 

#include proton/message.h
#include proton/messenger.h

#include stdio.h
#include stdlib.h
#include ctype.h



#define BUFSIZE 1024


void
consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
{
  int consume_count;
  for ( consume_count = 0; consume_count  n; ++ consume_count )
  {
pn_messenger_get ( messenger, message );

size_t bufsize = BUFSIZE;
char buffer [ bufsize ];
pn_data_t * body = pn_message_body ( message );
pn_data_format ( body, buffer,  bufsize );

printf ( \n\nMessage \n);
printf ( Address: %s\n, pn_message_get_address ( message ) );
char const * subject = pn_message_get_subject(message);
printf ( Subject: %s\n, subject ? subject : (no subject) );
printf(Content: %s\n\n, buffer);
  }
}



int
main(int argc, char** argv)
{
  char addr [ 1000 ];

  sprintf ( addr, amqp://~0.0.0.0:%s, argv[1] );

  pn_message_t   * message;
  pn_messenger_t * messenger;

  message   = pn_message ( );
  messenger = pn_messenger ( NULL );

  pn_messenger_start ( messenger );

  /*
Subscribe to two addresses.
  */
  pn_messenger_subscribe ( messenger, addr );

  int messages_wanted = 20;
  int total_received  = 0;
  int received_this_time;

  pn_messenger_set_timeout ( messenger, 700 );

  int tries = 0;
  while ( total_received  messages_wanted )
  {
++ tries;
pn_messenger_recv ( messenger, messages_wanted );
received_this_time = pn_messenger_incoming ( messenger );
total_received += received_this_time;
fprintf ( stderr,
  try: %d received: %d total: %d\n,
  tries,
  received_this_time,
  total_received
);
consume_messages ( messenger, received_this_time, message );
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}

-end receiver --



--- start sender -

#include proton/message.h
#include proton/messenger.h

#include stdio.h
#include stdlib.h
#include unistd.h
#include string.h



int
main(int argc, char** argv)
{
  int c;
  char addr [ 1000 ];
  char content [ 1000 ];
  char subject [ 1000 ];
  char * name;

  sprintf ( addr, amqp://0.0.0.0:%s, argv[2] );

  pn_message_t * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger ( name = argv[1] );

  pn_messenger_set_outgoing_window ( messenger, 1 );
  fprintf ( stderr, MDEBUG start \n );
  pn_messenger_start ( messenger );

  int n_messages = 10;
  int sent_count;

  /*--
Put and send a message every 1 second.
  --*/
  for ( sent_count = 0 ; sent_count  n_messages; ++ sent_count )
  {
sleep ( 1 );
sprintf ( subject, from node %s, name );
pn_message_set_address ( message, addr );
pn_message_set_subject ( message, subject );
pn_data_t *body = pn_message_body(message);
sprintf ( content, Hello, Proton! );
pn_data_put_string(body, pn_bytes(strlen(content), content));
pn_messenger_put(messenger, message);

  fprintf ( stderr, MDEBUG send\n );
pn_messenger_send(messenger);

fprintf ( stderr, sent %d messages.\n, sent_count + 1 );
  }


  // Countdown to stop, to give me time to see it 
  fprintf ( stderr, Calling stop in ...\n );
  int i;
  for ( i = 5; i  0; -- i )
  {
fprintf ( stderr, %d\n, i );
sleep ( 1 );
  }
  fprintf ( stderr, stop.\n);

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);
  pn_message_free(message);

  return 0;
}






--- end sender 





- Original Message -
From: Rafael Schloming r...@alum.mit.edu
To: proton@qpid.apache.org
Sent: Friday, February 15, 2013 9:15:38 AM
Subject: Re: bug? interaction between two senders

Can you post your code?

--Rafael

On Fri, Feb 15, 2013 at 5:22 AM, Michael Goulish mgoul...@redhat.comwrote:


 Have I found a bug ?


 scenario
 {
   receiver
   {
 I start a receiver and it subscribes to ports  and 6667

Re: bug? interaction between two senders

2013-02-15 Thread Alan Conway
On Fri, 2013-02-15 at 05:22 -0500, Michael Goulish wrote:
 Have I found a bug ?
 
 
 scenario
 {
   receiver
   {
 I start a receiver and it subscribes to ports  and 6667.
 In a loop, it starts trying to recv messages.  I set timeout
 to 700 msec, so it keeps looping and tells me whenever it 
 gets a message.
   }
 
 
   senders
   {
 From two separate shells, I start two senders simultaneously, 
 node A sending to , and node B sending to 6667.
 Both senders will send 10 messages.
   }

 
   great expectation
   {
 I want to see the receiver's print-outs indicating that it 
 is receiving messages from both senders, interleaved.
   }
 
 
   grim reality
   {
 The receiver gets messages only from sender A until 
 sender A stops.
 Then the receiver gets all the messages from sender B.
 Sender B hangs at its first call to pn_messenger_send() 
 until sender A calls stop().
   }
 
 
   comment
   {
 uh-oh.
   }
   
 }
 
 

10 messages is not a lot. I would guess (hope!) that 10 messages can be
sent in less than the time it takes to  start the second sender
processes. Try 1 messages before you raise a bug.

Cheers,
Alan.