I think this is the same bug we've seen before with passing fixed
(positive) credit limits to recv. The implementation isn't smart enough to
pay attention to who actually is offering messages when it allocates
credit, and so it ends up giving out all of its credit to a sender that has
no use for it instead of to the senders that are blocked. I suspect if you
replace your 3 with -1 in your call to pn_messenger_recv, then you will see
the hang go away.

--Rafael


On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mgoul...@redhat.com> wrote:

> OK, I'm looking at trace from receiver, and I thought
> I would post it here so I can't be accused of hogging
> all the fun for myself.
>
> ( Remember, three senders all send to same receiver address,
>   only two get 'accepted' replies.  Last sender ends up hanging in send(),
>   while receiver (in infinite loop) blocks on recv(). )
>
> I have marked the lines of application output with "APPLICATION OUTPUT:"
>
>
> Note:
>
> I see these 3 lines:
>   Accepted from localhost:42468
>   Accepted from localhost:42469
>   Accepted from localhost:42470
>
> But only two get closed:
>   Closed localhost:42468
>   Closed localhost:42469
>
>
>
>
> ----------------- begin trace -----------------------
> Listening on 0.0.0.0:6666
> APPLICATION OUTPUT:   receiving...
> Accepted from localhost:42468
> Accepted from localhost:42469
>     <- SASL
> [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x25013c0:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
> Accepted from localhost:42470
>     <- SASL
> [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x253f490:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
>     <- AMQP
> [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
>     <- SASL
> [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> [0x2563350:0] -> SASL-OUTCOME @68 [0]
>     -> SASL
>     -> AMQP
> [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null,
> null, null, null, null, null, null, null]
>     <- AMQP
> [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
>     <- AMQP
> [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> "0.0.0.0", null, null, null, null, null, null, null]
> [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> [null, 0, null, 0, false, null, null, null, null, null, null], @41 [null,
> 0, null, 0, false, null, null], null, null, 0]
> [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> null, null, null, 0]
> [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> [0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xa1\x12Message from 22470"
> APPLICATION OUTPUT:   getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22470"
> APPLICATION OUTPUT:   receiving...
> [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00",
> 0, false, false] (148) "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> \x00@\x00Sw\xa1\x12Message from 22469"
> getting message...
> APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> APPLICATION OUTPUT:   Content: "Message from 22469
> APPLICATION OUTPUT:   receiving...
> [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> [0x24fae10:1] <- DETACH @22 [1, true, null]
> [0x24fae10:0] <- CLOSE @24 [null]
> [0x24fae10:0] <- EOS
> [0x24fae10:1] -> DETACH @22 [1, true, null]
> [0x24fae10:0] -> CLOSE @24 [null]
> [0x24fae10:0] -> EOS
> Closed localhost:42468
> [0x2538e40:1] <- DETACH @22 [1, true, null]
> [0x2538e40:0] <- CLOSE @24 [null]
> [0x2538e40:0] <- EOS
> [0x2538e40:1] -> DETACH @22 [1, true, null]
> [0x2538e40:0] -> CLOSE @24 [null]
> [0x2538e40:0] -> EOS
> Closed localhost:42469
>
> ----------------- end trace -------------------------
>
>
>
>
>
>
>
>
>
>
> ----- Original Message -----
> Any clues from a trace of the receiver?
>
> $ PN_TRACE_FRM=1 ./receiver 6666
>
> -Ted
>
> On 04/04/2013 02:09 PM, Michael Goulish wrote:
> >
> >    Is this a bug, or am I  Doing  Something  Wrong ?
> >
> >
> >
> > Scenario
> > {
> >    My sender sends a single message, and hopes to see
> >    that the receiver has accepted it.
> >
> >    I launch 3 copies of the sender very close together--
> >    they all talk to the same address.
> >
> >    My receiver receives in a loop, accepts every message
> >    that it receives.
> > }
> >
> >
> >
> >
> > Result
> > {
> >    Sometimes my receiver gets 1 of the 3 messages.
> >    Usually it gets 2.
> >    It never gets all 3.
> >
> >    The 3rd sender hangs in pn_messenger_send().
> >
> >    While the 3rd sender is hanging in send(), the receiver
> >    is patiently waiting in recv().
> > }
> >
> >
> >
> >
> >
> >
> > Sender Code ############################################
> >
> > /*
> >    Launch 3 of these from a script like so:
> >    ./sender 6666 &
> >    ./sender 6666 &
> >    ./sender 6666 &
> > */
> >
> >
> > #include "proton/message.h"
> > #include "proton/messenger.h"
> >
> > #include <getopt.h>
> > #include <stdio.h>
> > #include <stdlib.h>
> > #include <string.h>
> > #include <ctype.h>
> >
> >
> > char *
> > status_2_str ( pn_status_t status )
> > {
> >    switch ( status )
> >    {
> >      case PN_STATUS_UNKNOWN:
> >        return "unknown";
> >        break;
> >
> >      case PN_STATUS_PENDING:
> >        return "pending";
> >        break;
> >
> >      case PN_STATUS_ACCEPTED:
> >        return "accepted";
> >        break;
> >
> >      case PN_STATUS_REJECTED:
> >        return "rejected";
> >        break;
> >
> >      default:
> >        return "bad value";
> >        break;
> >    }
> > }
> >
> >
> >
> > pid_t my_pid = 0;
> >
> >
> > void
> > check ( char * label, int result )
> > {
> >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > }
> >
> >
> >
> > int
> > main(int argc, char** argv)
> > {
> >    int c;
> >    char addr [ 1000 ];
> >    char msgtext [ 100 ];
> >    pn_message_t   * message;
> >    pn_messenger_t * messenger;
> >    pn_data_t      * body;
> >    pn_tracker_t     tracker;
> >    pn_status_t      status;
> >    int              result;
> >
> >    my_pid = getpid();
> >
> >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> >
> >
> >    message = pn_message ( );
> >    messenger = pn_messenger ( NULL );
> >    pn_messenger_start ( messenger ) ;
> >    pn_messenger_set_outgoing_window ( messenger, 1 );
> >
> >
> >    pn_message_set_address ( message, addr );
> >    body = pn_message_body ( message );
> >
> >
> >    sprintf ( msgtext, "Message from %d", getpid() );
> >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
> >    pn_messenger_put ( messenger, message );
> >    tracker = pn_messenger_outgoing_tracker ( messenger );
> >    pn_messenger_send ( messenger );
> >
> >
> >    status = pn_messenger_status ( messenger, tracker );
> >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> >
> >
> >    pn_messenger_stop ( messenger );
> >    pn_messenger_free ( messenger );
> >    pn_message_free ( message );
> >
> >    return 0;
> > }
> >
> >
> >
> >
> > Receiver Code ########################################################
> >
> > /*
> >
> >    Launch like this:
> >    ./receiver 6666
> > */
> >
> > #include <stdio.h>
> > #include <stdlib.h>
> > #include <ctype.h>
> >
> > #include "proton/message.h"
> > #include "proton/messenger.h"
> >
> >
> >
> > #define BUFSIZE 1024
> >
> >
> >
> > int
> > main(int argc, char** argv)
> > {
> >    size_t bufsize = BUFSIZE;
> >    char buffer [ BUFSIZE ];
> >    char addr [ 1000 ];
> >    pn_message_t   * message;
> >    pn_messenger_t * messenger;
> >    pn_data_t      * body;
> >    pn_tracker_t     tracker;
> >
> >
> >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> >
> >    message = pn_message();
> >    messenger = pn_messenger ( NULL );
> >
> >    pn_messenger_start(messenger);
> >    pn_messenger_subscribe ( messenger, addr );
> >    pn_messenger_set_incoming_window ( messenger, 5 );
> >
> >    /*---------------------------------
> >      Receive and accept the message.
> >    ---------------------------------*/
> >    while ( 1 )
> >    {
> >      fprintf ( stderr, "receiving...\n" );
> >      pn_messenger_recv ( messenger, 3 );
> >
> >      while ( pn_messenger_incoming ( messenger ) > 0 )
> >      {
> >        fprintf ( stderr, "getting message...\n" );
> >        pn_messenger_get ( messenger, message );
> >        tracker = pn_messenger_incoming_tracker ( messenger );
> >        pn_messenger_accept ( messenger, tracker, 0 );
> >        body = pn_message_body ( message );
> >        pn_data_format ( body, buffer, & bufsize );
> >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> message ) );
> >        fprintf ( stdout, "Content: %s\n", buffer);
> >      }
> >    }
> >
> >    pn_messenger_stop(messenger);
> >    pn_messenger_free(messenger);
> >
> >    return 0;
> > }
> >
> >
>
>

Reply via email to