Yes, I'll post code below -- but I just talked to Ted -- I have rediscovered
PROTON-200 Credit distribution by messenger is not balanced across all links.
The first sender is getting all the credit, and the next sender doesn't get
any until the first sender calls stop.
to run:
1. start receiver this way: ./receiver
2. in a second window,
start sender A this way: ./sender A
3. in a third window, within a few seconds,
start receiver B this way: ./sender B
result:
receiver gets all 10 messages ( 1 per second ) from
sender A. While it is receiving those, sender B hangs
at its first call to send(). Messages start coming from
sender B only after sender A calls stop().
-- start receiver
#include proton/message.h
#include proton/messenger.h
#include stdio.h
#include stdlib.h
#include ctype.h
#define BUFSIZE 1024
void
consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
{
int consume_count;
for ( consume_count = 0; consume_count n; ++ consume_count )
{
pn_messenger_get ( messenger, message );
size_t bufsize = BUFSIZE;
char buffer [ bufsize ];
pn_data_t * body = pn_message_body ( message );
pn_data_format ( body, buffer, bufsize );
printf ( \n\nMessage \n);
printf ( Address: %s\n, pn_message_get_address ( message ) );
char const * subject = pn_message_get_subject(message);
printf ( Subject: %s\n, subject ? subject : (no subject) );
printf(Content: %s\n\n, buffer);
}
}
int
main(int argc, char** argv)
{
char addr [ 1000 ];
sprintf ( addr, amqp://~0.0.0.0:%s, argv[1] );
pn_message_t * message;
pn_messenger_t * messenger;
message = pn_message ( );
messenger = pn_messenger ( NULL );
pn_messenger_start ( messenger );
/*
Subscribe to two addresses.
*/
pn_messenger_subscribe ( messenger, addr );
int messages_wanted = 20;
int total_received = 0;
int received_this_time;
pn_messenger_set_timeout ( messenger, 700 );
int tries = 0;
while ( total_received messages_wanted )
{
++ tries;
pn_messenger_recv ( messenger, messages_wanted );
received_this_time = pn_messenger_incoming ( messenger );
total_received += received_this_time;
fprintf ( stderr,
try: %d received: %d total: %d\n,
tries,
received_this_time,
total_received
);
consume_messages ( messenger, received_this_time, message );
}
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
return 0;
}
-end receiver --
--- start sender -
#include proton/message.h
#include proton/messenger.h
#include stdio.h
#include stdlib.h
#include unistd.h
#include string.h
int
main(int argc, char** argv)
{
int c;
char addr [ 1000 ];
char content [ 1000 ];
char subject [ 1000 ];
char * name;
sprintf ( addr, amqp://0.0.0.0:%s, argv[2] );
pn_message_t * message;
pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger ( name = argv[1] );
pn_messenger_set_outgoing_window ( messenger, 1 );
fprintf ( stderr, MDEBUG start \n );
pn_messenger_start ( messenger );
int n_messages = 10;
int sent_count;
/*--
Put and send a message every 1 second.
--*/
for ( sent_count = 0 ; sent_count n_messages; ++ sent_count )
{
sleep ( 1 );
sprintf ( subject, from node %s, name );
pn_message_set_address ( message, addr );
pn_message_set_subject ( message, subject );
pn_data_t *body = pn_message_body(message);
sprintf ( content, Hello, Proton! );
pn_data_put_string(body, pn_bytes(strlen(content), content));
pn_messenger_put(messenger, message);
fprintf ( stderr, MDEBUG send\n );
pn_messenger_send(messenger);
fprintf ( stderr, sent %d messages.\n, sent_count + 1 );
}
// Countdown to stop, to give me time to see it
fprintf ( stderr, Calling stop in ...\n );
int i;
for ( i = 5; i 0; -- i )
{
fprintf ( stderr, %d\n, i );
sleep ( 1 );
}
fprintf ( stderr, stop.\n);
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
pn_message_free(message);
return 0;
}
--- end sender
- Original Message -
From: Rafael Schloming r...@alum.mit.edu
To: proton@qpid.apache.org
Sent: Friday, February 15, 2013 9:15:38 AM
Subject: Re: bug? interaction between two senders
Can you post your code?
--Rafael
On Fri, Feb 15, 2013 at 5:22 AM, Michael Goulish mgoul...@redhat.comwrote:
Have I found a bug ?
scenario
{
receiver
{
I start a receiver and it subscribes to ports and 6667