Yes!   -1 did it.  Thanks!


----- Original Message -----
> I think this is the same bug we've seen before with passing fixed
> (positive) credit limits to recv. The implementation isn't smart enough to
> pay attention to who actually is offering messages when it allocates
> credit, and so it ends up giving out all of its credit to a sender that has
> no use for it instead of to the senders that are blocked. I suspect if you
> replace your 3 with -1 in your call to pn_messenger_recv, then you will see
> the hang go away.
> 
> --Rafael
> 
> 
> On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mgoul...@redhat.com> wrote:
> 
> > OK, I'm looking at trace from receiver, and I thought
> > I would post it here so I can't be accused of hogging
> > all the fun for myself.
> >
> > ( Remember, three senders all send to same receiver address,
> >   only two get 'accepted' replies.  Last sender ends up hanging in send(),
> >   while receiver (in infinite loop) blocks on recv(). )
> >
> > I have marked the lines of application output with "APPLICATION OUTPUT:"
> >
> >
> > Note:
> >
> > I see these 3 lines:
> >   Accepted from localhost:42468
> >   Accepted from localhost:42469
> >   Accepted from localhost:42470
> >
> > But only two get closed:
> >   Closed localhost:42468
> >   Closed localhost:42469
> >
> >
> >
> >
> > ----------------- begin trace -----------------------
> > Listening on 0.0.0.0:6666
> > APPLICATION OUTPUT:   receiving...
> > Accepted from localhost:42468
> > Accepted from localhost:42469
> >     <- SASL
> > [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x25013c0:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> > Accepted from localhost:42470
> >     <- SASL
> > [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x253f490:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> >     <- AMQP
> > [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
> >     <- SASL
> > [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > [0x2563350:0] -> SASL-OUTCOME @68 [0]
> >     -> SASL
> >     -> AMQP
> > [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> > null, null, null, null, null, null, null]
> >     <- AMQP
> > [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> >     <- AMQP
> > [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> > "0.0.0.0", null, null, null, null, null, null, null]
> > [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> > 0, null, 0, false, null, null], null, null, 0]
> > [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > null, null, null, 0]
> > [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > \x00@\x00Sw\xa1\x12Message from 22470"
> > APPLICATION OUTPUT:   getting message...
> > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > APPLICATION OUTPUT:   Content: "Message from 22470"
> > APPLICATION OUTPUT:   receiving...
> > [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> > [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > \x00@\x00Sw\xa1\x12Message from 22469"
> > getting message...
> > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > APPLICATION OUTPUT:   Content: "Message from 22469
> > APPLICATION OUTPUT:   receiving...
> > [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> > [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > [0x24fae10:1] <- DETACH @22 [1, true, null]
> > [0x24fae10:0] <- CLOSE @24 [null]
> > [0x24fae10:0] <- EOS
> > [0x24fae10:1] -> DETACH @22 [1, true, null]
> > [0x24fae10:0] -> CLOSE @24 [null]
> > [0x24fae10:0] -> EOS
> > Closed localhost:42468
> > [0x2538e40:1] <- DETACH @22 [1, true, null]
> > [0x2538e40:0] <- CLOSE @24 [null]
> > [0x2538e40:0] <- EOS
> > [0x2538e40:1] -> DETACH @22 [1, true, null]
> > [0x2538e40:0] -> CLOSE @24 [null]
> > [0x2538e40:0] -> EOS
> > Closed localhost:42469
> >
> > ----------------- end trace -------------------------
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > ----- Original Message -----
> > Any clues from a trace of the receiver?
> >
> > $ PN_TRACE_FRM=1 ./receiver 6666
> >
> > -Ted
> >
> > On 04/04/2013 02:09 PM, Michael Goulish wrote:
> > >
> > >    Is this a bug, or am I  Doing  Something  Wrong ?
> > >
> > >
> > >
> > > Scenario
> > > {
> > >    My sender sends a single message, and hopes to see
> > >    that the receiver has accepted it.
> > >
> > >    I launch 3 copies of the sender very close together--
> > >    they all talk to the same address.
> > >
> > >    My receiver receives in a loop, accepts every message
> > >    that it receives.
> > > }
> > >
> > >
> > >
> > >
> > > Result
> > > {
> > >    Sometimes my receiver gets 1 of the 3 messages.
> > >    Usually it gets 2.
> > >    It never gets all 3.
> > >
> > >    The 3rd sender hangs in pn_messenger_send().
> > >
> > >    While the 3rd sender is hanging in send(), the receiver
> > >    is patiently waiting in recv().
> > > }
> > >
> > >
> > >
> > >
> > >
> > >
> > > Sender Code ############################################
> > >
> > > /*
> > >    Launch 3 of these from a script like so:
> > >    ./sender 6666 &
> > >    ./sender 6666 &
> > >    ./sender 6666 &
> > > */
> > >
> > >
> > > #include "proton/message.h"
> > > #include "proton/messenger.h"
> > >
> > > #include <getopt.h>
> > > #include <stdio.h>
> > > #include <stdlib.h>
> > > #include <string.h>
> > > #include <ctype.h>
> > >
> > >
> > > char *
> > > status_2_str ( pn_status_t status )
> > > {
> > >    switch ( status )
> > >    {
> > >      case PN_STATUS_UNKNOWN:
> > >        return "unknown";
> > >        break;
> > >
> > >      case PN_STATUS_PENDING:
> > >        return "pending";
> > >        break;
> > >
> > >      case PN_STATUS_ACCEPTED:
> > >        return "accepted";
> > >        break;
> > >
> > >      case PN_STATUS_REJECTED:
> > >        return "rejected";
> > >        break;
> > >
> > >      default:
> > >        return "bad value";
> > >        break;
> > >    }
> > > }
> > >
> > >
> > >
> > > pid_t my_pid = 0;
> > >
> > >
> > > void
> > > check ( char * label, int result )
> > > {
> > >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > > }
> > >
> > >
> > >
> > > int
> > > main(int argc, char** argv)
> > > {
> > >    int c;
> > >    char addr [ 1000 ];
> > >    char msgtext [ 100 ];
> > >    pn_message_t   * message;
> > >    pn_messenger_t * messenger;
> > >    pn_data_t      * body;
> > >    pn_tracker_t     tracker;
> > >    pn_status_t      status;
> > >    int              result;
> > >
> > >    my_pid = getpid();
> > >
> > >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> > >
> > >
> > >    message = pn_message ( );
> > >    messenger = pn_messenger ( NULL );
> > >    pn_messenger_start ( messenger ) ;
> > >    pn_messenger_set_outgoing_window ( messenger, 1 );
> > >
> > >
> > >    pn_message_set_address ( message, addr );
> > >    body = pn_message_body ( message );
> > >
> > >
> > >    sprintf ( msgtext, "Message from %d", getpid() );
> > >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
> > >    pn_messenger_put ( messenger, message );
> > >    tracker = pn_messenger_outgoing_tracker ( messenger );
> > >    pn_messenger_send ( messenger );
> > >
> > >
> > >    status = pn_messenger_status ( messenger, tracker );
> > >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> > >
> > >
> > >    pn_messenger_stop ( messenger );
> > >    pn_messenger_free ( messenger );
> > >    pn_message_free ( message );
> > >
> > >    return 0;
> > > }
> > >
> > >
> > >
> > >
> > > Receiver Code ########################################################
> > >
> > > /*
> > >
> > >    Launch like this:
> > >    ./receiver 6666
> > > */
> > >
> > > #include <stdio.h>
> > > #include <stdlib.h>
> > > #include <ctype.h>
> > >
> > > #include "proton/message.h"
> > > #include "proton/messenger.h"
> > >
> > >
> > >
> > > #define BUFSIZE 1024
> > >
> > >
> > >
> > > int
> > > main(int argc, char** argv)
> > > {
> > >    size_t bufsize = BUFSIZE;
> > >    char buffer [ BUFSIZE ];
> > >    char addr [ 1000 ];
> > >    pn_message_t   * message;
> > >    pn_messenger_t * messenger;
> > >    pn_data_t      * body;
> > >    pn_tracker_t     tracker;
> > >
> > >
> > >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> > >
> > >    message = pn_message();
> > >    messenger = pn_messenger ( NULL );
> > >
> > >    pn_messenger_start(messenger);
> > >    pn_messenger_subscribe ( messenger, addr );
> > >    pn_messenger_set_incoming_window ( messenger, 5 );
> > >
> > >    /*---------------------------------
> > >      Receive and accept the message.
> > >    ---------------------------------*/
> > >    while ( 1 )
> > >    {
> > >      fprintf ( stderr, "receiving...\n" );
> > >      pn_messenger_recv ( messenger, 3 );
> > >
> > >      while ( pn_messenger_incoming ( messenger ) > 0 )
> > >      {
> > >        fprintf ( stderr, "getting message...\n" );
> > >        pn_messenger_get ( messenger, message );
> > >        tracker = pn_messenger_incoming_tracker ( messenger );
> > >        pn_messenger_accept ( messenger, tracker, 0 );
> > >        body = pn_message_body ( message );
> > >        pn_data_format ( body, buffer, & bufsize );
> > >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> > message ) );
> > >        fprintf ( stdout, "Content: %s\n", buffer);
> > >      }
> > >    }
> > >
> > >    pn_messenger_stop(messenger);
> > >    pn_messenger_free(messenger);
> > >
> > >    return 0;
> > > }
> > >
> > >
> >
> >
> 

Reply via email to