Yes, I'll post code below -- but I just talked to Ted -- I have rediscovered 
PROTON-200 "Credit distribution by messenger is not balanced across all links". 
 The first sender is getting all the credit, and the next sender doesn't get 
any until the first sender calls stop.



to run:

 1. start receiver this way:   ./receiver 6666

 2. in a second window,
    start sender A this way:   ./sender A 6666

 3. in a third window, within a few seconds,
    start receiver B this way: ./sender B 6666 


result:

   receiver gets all 10 messages ( 1 per second ) from
   sender A.  While it is receiving those, sender B hangs
   at its first call to send().  Messages start coming from 
   sender B only after sender A calls stop().




------------------ start receiver ----------------------------

#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>



#define BUFSIZE 1024


void
consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
{
  int consume_count;
  for ( consume_count = 0; consume_count < n; ++ consume_count )
  {
    pn_messenger_get ( messenger, message );

    size_t bufsize = BUFSIZE;
    char buffer [ bufsize ];
    pn_data_t * body = pn_message_body ( message );
    pn_data_format ( body, buffer, & bufsize );

    printf ( "\n\nMessage ----------------------------\n");
    printf ( "Address: %s\n", pn_message_get_address ( message ) );
    char const * subject = pn_message_get_subject(message);
    printf ( "Subject: %s\n", subject ? subject : "(no subject)" );
    printf("Content: %s\n\n", buffer);
  }
}



int
main(int argc, char** argv)
{
  char addr [ 1000 ];

  sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );

  pn_message_t   * message;
  pn_messenger_t * messenger;

  message   = pn_message ( );
  messenger = pn_messenger ( NULL );

  pn_messenger_start ( messenger );

  /*--------------------------------
    Subscribe to two addresses.
  --------------------------------*/
  pn_messenger_subscribe ( messenger, addr );

  int messages_wanted = 20;
  int total_received  = 0;
  int received_this_time;

  pn_messenger_set_timeout ( messenger, 700 );

  int tries = 0;
  while ( total_received < messages_wanted )
  {
    ++ tries;
    pn_messenger_recv ( messenger, messages_wanted );
    received_this_time = pn_messenger_incoming ( messenger );
    total_received += received_this_time;
    fprintf ( stderr,
              "try: %d received: %d total: %d\n",
              tries,
              received_this_time,
              total_received
            );
    consume_messages ( messenger, received_this_time, message );
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}

-----------------end receiver ------------------------------



------------------- start sender ---------------------------------

#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>



int
main(int argc, char** argv)
{
  int c;
  char addr [ 1000 ];
  char content [ 1000 ];
  char subject [ 1000 ];
  char * name;

  sprintf ( addr, "amqp://0.0.0.0:%s", argv[2] );

  pn_message_t * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger ( name = argv[1] );

  pn_messenger_set_outgoing_window ( messenger, 1 );
  fprintf ( stderr, "MDEBUG start \n" );
  pn_messenger_start ( messenger );

  int n_messages = 10;
  int sent_count;

  /*------------------------------------------
    Put and send a message every 1 second.
  ------------------------------------------*/
  for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )
  {
    sleep ( 1 );
    sprintf ( subject, "from node %s", name );
    pn_message_set_address ( message, addr );
    pn_message_set_subject ( message, subject );
    pn_data_t *body = pn_message_body(message);
    sprintf ( content, "Hello, Proton!" );
    pn_data_put_string(body, pn_bytes(strlen(content), content));
    pn_messenger_put(messenger, message);

  fprintf ( stderr, "MDEBUG send\n" );
    pn_messenger_send(messenger);

    fprintf ( stderr, "sent %d messages.\n", sent_count + 1 );
  }


  // Countdown to stop, to give me time to see it ....
  fprintf ( stderr, "Calling stop in ...\n" );
  int i;
  for ( i = 5; i > 0; -- i )
  {
    fprintf ( stderr, "%d\n", i );
    sleep ( 1 );
  }
  fprintf ( stderr, "stop.\n");

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);
  pn_message_free(message);

  return 0;
}






------------------- end sender ----------------------------





----- Original Message -----
From: "Rafael Schloming" <r...@alum.mit.edu>
To: proton@qpid.apache.org
Sent: Friday, February 15, 2013 9:15:38 AM
Subject: Re: bug? interaction between two senders

Can you post your code?

--Rafael

On Fri, Feb 15, 2013 at 5:22 AM, Michael Goulish <mgoul...@redhat.com>wrote:

>
> Have I found a bug ?
>
>
> scenario
> {
>   receiver
>   {
>     I start a receiver and it subscribes to ports 6666 and 6667.
>     In a loop, it starts trying to recv messages.  I set timeout
>     to 700 msec, so it keeps looping and tells me whenever it
>     gets a message.
>   }
>
>
>   senders
>   {
>     From two separate shells, I start two senders simultaneously,
>     node A sending to 6666, and node B sending to 6667.
>     Both senders will send 10 messages.
>   }
>
>
>   great expectation
>   {
>     I want to see the receiver's print-outs indicating that it
>     is receiving messages from both senders, interleaved.
>   }
>
>
>   grim reality
>   {
>     The receiver gets messages only from sender A until
>     sender A stops.
>     Then the receiver gets all the messages from sender B.
>     Sender B hangs at its first call to pn_messenger_send()
>     until sender A calls stop().
>   }
>
>
>   comment
>   {
>     uh-oh.
>   }
>
> }
>
>
>

Reply via email to