Dang - hit send too soon:

https://issues.apache.org/jira/browse/PROTON-200

It depends on having the ability to revoke issued credit, which is still 
outstanding AFAIK.



----- Original Message -----
> From: "Ken Giusti" <kgiu...@redhat.com>
> To: proton@qpid.apache.org
> Sent: Thursday, April 4, 2013 5:46:04 PM
> Subject: Re: problem with multiple senders
> 
> FWIW: there's a JIRA that's tracking this:
> 
> 
> ----- Original Message -----
> > From: "Michael Goulish" <mgoul...@redhat.com>
> > To: proton@qpid.apache.org
> > Sent: Thursday, April 4, 2013 4:45:03 PM
> > Subject: Re: problem with multiple senders
> > 
> > Yes!   -1 did it.  Thanks!
> > 
> > 
> > 
> > ----- Original Message -----
> > > I think this is the same bug we've seen before with passing fixed
> > > (positive) credit limits to recv. The implementation isn't smart enough
> > > to
> > > pay attention to who actually is offering messages when it allocates
> > > credit, and so it ends up giving out all of its credit to a sender that
> > > has
> > > no use for it instead of to the senders that are blocked. I suspect if
> > > you
> > > replace your 3 with -1 in your call to pn_messenger_recv, then you will
> > > see
> > > the hang go away.
> > > 
> > > --Rafael
> > > 
> > > 
> > > On Thu, Apr 4, 2013 at 3:06 PM, Michael Goulish <mgoul...@redhat.com>
> > > wrote:
> > > 
> > > > OK, I'm looking at trace from receiver, and I thought
> > > > I would post it here so I can't be accused of hogging
> > > > all the fun for myself.
> > > >
> > > > ( Remember, three senders all send to same receiver address,
> > > >   only two get 'accepted' replies.  Last sender ends up hanging in
> > > >   send(),
> > > >   while receiver (in infinite loop) blocks on recv(). )
> > > >
> > > > I have marked the lines of application output with "APPLICATION
> > > > OUTPUT:"
> > > >
> > > >
> > > > Note:
> > > >
> > > > I see these 3 lines:
> > > >   Accepted from localhost:42468
> > > >   Accepted from localhost:42469
> > > >   Accepted from localhost:42470
> > > >
> > > > But only two get closed:
> > > >   Closed localhost:42468
> > > >   Closed localhost:42469
> > > >
> > > >
> > > >
> > > >
> > > > ----------------- begin trace -----------------------
> > > > Listening on 0.0.0.0:6666
> > > > APPLICATION OUTPUT:   receiving...
> > > > Accepted from localhost:42468
> > > > Accepted from localhost:42469
> > > >     <- SASL
> > > > [0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x25013c0:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > > Accepted from localhost:42470
> > > >     <- SASL
> > > > [0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x253f490:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > >     <- AMQP
> > > > [0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
> > > >     <- SASL
> > > > [0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
> > > > [0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
> > > > [0x2563350:0] -> SASL-OUTCOME @68 [0]
> > > >     -> SASL
> > > >     -> AMQP
> > > > [0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5",
> > > > null,
> > > > null, null, null, null, null, null, null]
> > > >     <- AMQP
> > > > [0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > > >     <- AMQP
> > > > [0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb",
> > > > "0.0.0.0", null, null, null, null, null, null, null]
> > > > [0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
> > > > [0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40
> > > > [null, 0, null, 0, false, null, null, null, null, null, null], @41
> > > > [null,
> > > > 0, null, 0, false, null, null], null, null, 0]
> > > > [0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
> > > > [0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null,
> > > > null, null, null, 0]
> > > > [0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
> > > > [0x24fae10:1] <- TRANSFER @20 [1, 0,
> > > > b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > > 0, false, false] (148)
> > > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > > @\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@
> > > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > > \x00@\x00Sw\xa1\x12Message from 22470"
> > > > APPLICATION OUTPUT:   getting message...
> > > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > > APPLICATION OUTPUT:   Content: "Message from 22470"
> > > > APPLICATION OUTPUT:   receiving...
> > > > [0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > > [0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
> > > > [0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > > [0x2538e40:1] <- TRANSFER @20 [1, 0,
> > > > b"\x00\x00\x00\x00\x00\x00\x00\x00",
> > > > 0, false, false] (148)
> > > > "\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR
> > > > \x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666
> > > > @\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@
> > > > @@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R
> > > > \x00@\x00Sw\xa1\x12Message from 22469"
> > > > getting message...
> > > > APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
> > > > APPLICATION OUTPUT:   Content: "Message from 22469
> > > > APPLICATION OUTPUT:   receiving...
> > > > [0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
> > > > [0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
> > > > [0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
> > > > [0x24fae10:1] <- DETACH @22 [1, true, null]
> > > > [0x24fae10:0] <- CLOSE @24 [null]
> > > > [0x24fae10:0] <- EOS
> > > > [0x24fae10:1] -> DETACH @22 [1, true, null]
> > > > [0x24fae10:0] -> CLOSE @24 [null]
> > > > [0x24fae10:0] -> EOS
> > > > Closed localhost:42468
> > > > [0x2538e40:1] <- DETACH @22 [1, true, null]
> > > > [0x2538e40:0] <- CLOSE @24 [null]
> > > > [0x2538e40:0] <- EOS
> > > > [0x2538e40:1] -> DETACH @22 [1, true, null]
> > > > [0x2538e40:0] -> CLOSE @24 [null]
> > > > [0x2538e40:0] -> EOS
> > > > Closed localhost:42469
> > > >
> > > > ----------------- end trace -------------------------
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > ----- Original Message -----
> > > > Any clues from a trace of the receiver?
> > > >
> > > > $ PN_TRACE_FRM=1 ./receiver 6666
> > > >
> > > > -Ted
> > > >
> > > > On 04/04/2013 02:09 PM, Michael Goulish wrote:
> > > > >
> > > > >    Is this a bug, or am I  Doing  Something  Wrong ?
> > > > >
> > > > >
> > > > >
> > > > > Scenario
> > > > > {
> > > > >    My sender sends a single message, and hopes to see
> > > > >    that the receiver has accepted it.
> > > > >
> > > > >    I launch 3 copies of the sender very close together--
> > > > >    they all talk to the same address.
> > > > >
> > > > >    My receiver receives in a loop, accepts every message
> > > > >    that it receives.
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Result
> > > > > {
> > > > >    Sometimes my receiver gets 1 of the 3 messages.
> > > > >    Usually it gets 2.
> > > > >    It never gets all 3.
> > > > >
> > > > >    The 3rd sender hangs in pn_messenger_send().
> > > > >
> > > > >    While the 3rd sender is hanging in send(), the receiver
> > > > >    is patiently waiting in recv().
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Sender Code ############################################
> > > > >
> > > > > /*
> > > > >    Launch 3 of these from a script like so:
> > > > >    ./sender 6666 &
> > > > >    ./sender 6666 &
> > > > >    ./sender 6666 &
> > > > > */
> > > > >
> > > > >
> > > > > #include "proton/message.h"
> > > > > #include "proton/messenger.h"
> > > > >
> > > > > #include <getopt.h>
> > > > > #include <stdio.h>
> > > > > #include <stdlib.h>
> > > > > #include <string.h>
> > > > > #include <ctype.h>
> > > > >
> > > > >
> > > > > char *
> > > > > status_2_str ( pn_status_t status )
> > > > > {
> > > > >    switch ( status )
> > > > >    {
> > > > >      case PN_STATUS_UNKNOWN:
> > > > >        return "unknown";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_PENDING:
> > > > >        return "pending";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_ACCEPTED:
> > > > >        return "accepted";
> > > > >        break;
> > > > >
> > > > >      case PN_STATUS_REJECTED:
> > > > >        return "rejected";
> > > > >        break;
> > > > >
> > > > >      default:
> > > > >        return "bad value";
> > > > >        break;
> > > > >    }
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > pid_t my_pid = 0;
> > > > >
> > > > >
> > > > > void
> > > > > check ( char * label, int result )
> > > > > {
> > > > >    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > int
> > > > > main(int argc, char** argv)
> > > > > {
> > > > >    int c;
> > > > >    char addr [ 1000 ];
> > > > >    char msgtext [ 100 ];
> > > > >    pn_message_t   * message;
> > > > >    pn_messenger_t * messenger;
> > > > >    pn_data_t      * body;
> > > > >    pn_tracker_t     tracker;
> > > > >    pn_status_t      status;
> > > > >    int              result;
> > > > >
> > > > >    my_pid = getpid();
> > > > >
> > > > >    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
> > > > >
> > > > >
> > > > >    message = pn_message ( );
> > > > >    messenger = pn_messenger ( NULL );
> > > > >    pn_messenger_start ( messenger ) ;
> > > > >    pn_messenger_set_outgoing_window ( messenger, 1 );
> > > > >
> > > > >
> > > > >    pn_message_set_address ( message, addr );
> > > > >    body = pn_message_body ( message );
> > > > >
> > > > >
> > > > >    sprintf ( msgtext, "Message from %d", getpid() );
> > > > >    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext
> > > > >    ));
> > > > >    pn_messenger_put ( messenger, message );
> > > > >    tracker = pn_messenger_outgoing_tracker ( messenger );
> > > > >    pn_messenger_send ( messenger );
> > > > >
> > > > >
> > > > >    status = pn_messenger_status ( messenger, tracker );
> > > > >    fprintf ( stderr, "status : %s\n", status_2_str(status) );
> > > > >
> > > > >
> > > > >    pn_messenger_stop ( messenger );
> > > > >    pn_messenger_free ( messenger );
> > > > >    pn_message_free ( message );
> > > > >
> > > > >    return 0;
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Receiver Code
> > > > > ########################################################
> > > > >
> > > > > /*
> > > > >
> > > > >    Launch like this:
> > > > >    ./receiver 6666
> > > > > */
> > > > >
> > > > > #include <stdio.h>
> > > > > #include <stdlib.h>
> > > > > #include <ctype.h>
> > > > >
> > > > > #include "proton/message.h"
> > > > > #include "proton/messenger.h"
> > > > >
> > > > >
> > > > >
> > > > > #define BUFSIZE 1024
> > > > >
> > > > >
> > > > >
> > > > > int
> > > > > main(int argc, char** argv)
> > > > > {
> > > > >    size_t bufsize = BUFSIZE;
> > > > >    char buffer [ BUFSIZE ];
> > > > >    char addr [ 1000 ];
> > > > >    pn_message_t   * message;
> > > > >    pn_messenger_t * messenger;
> > > > >    pn_data_t      * body;
> > > > >    pn_tracker_t     tracker;
> > > > >
> > > > >
> > > > >    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
> > > > >
> > > > >    message = pn_message();
> > > > >    messenger = pn_messenger ( NULL );
> > > > >
> > > > >    pn_messenger_start(messenger);
> > > > >    pn_messenger_subscribe ( messenger, addr );
> > > > >    pn_messenger_set_incoming_window ( messenger, 5 );
> > > > >
> > > > >    /*---------------------------------
> > > > >      Receive and accept the message.
> > > > >    ---------------------------------*/
> > > > >    while ( 1 )
> > > > >    {
> > > > >      fprintf ( stderr, "receiving...\n" );
> > > > >      pn_messenger_recv ( messenger, 3 );
> > > > >
> > > > >      while ( pn_messenger_incoming ( messenger ) > 0 )
> > > > >      {
> > > > >        fprintf ( stderr, "getting message...\n" );
> > > > >        pn_messenger_get ( messenger, message );
> > > > >        tracker = pn_messenger_incoming_tracker ( messenger );
> > > > >        pn_messenger_accept ( messenger, tracker, 0 );
> > > > >        body = pn_message_body ( message );
> > > > >        pn_data_format ( body, buffer, & bufsize );
> > > > >        fprintf ( stdout, "Address: %s\n", pn_message_get_address (
> > > > message ) );
> > > > >        fprintf ( stdout, "Content: %s\n", buffer);
> > > > >      }
> > > > >    }
> > > > >
> > > > >    pn_messenger_stop(messenger);
> > > > >    pn_messenger_free(messenger);
> > > > >
> > > > >    return 0;
> > > > > }
> > > > >
> > > > >
> > > >
> > > >
> > > 
> > 
> 
> --
> -K
> 

-- 
-K

Reply via email to