Looks like all three credits were given to the first link. Once one message arrived, its credit was given to the second link. The second link then transferred a message but the credit was given back to the second link where there were no more messages to transfer.

On 04/04/2013 03:06 PM, Michael Goulish wrote:
OK, I'm looking at trace from receiver, and I thought
I would post it here so I can't be accused of hogging
all the fun for myself.

( Remember, three senders all send to same receiver address,
   only two get 'accepted' replies.  Last sender ends up hanging in send(),
   while receiver (in infinite loop) blocks on recv(). )

I have marked the lines of application output with "APPLICATION OUTPUT:"


Note:

I see these 3 lines:
   Accepted from localhost:42468
   Accepted from localhost:42469
   Accepted from localhost:42470

But only two get closed:
   Closed localhost:42468
   Closed localhost:42469




----------------- begin trace -----------------------
Listening on 0.0.0.0:6666
APPLICATION OUTPUT:   receiving...
Accepted from localhost:42468
Accepted from localhost:42469
     <- SASL
[0x25013c0:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x25013c0:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x25013c0:0] -> SASL-OUTCOME @68 [0]
     -> SASL
     -> AMQP
[0x24fae10:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, 
null, null, null, null, null, null]
Accepted from localhost:42470
     <- SASL
[0x253f490:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x253f490:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x253f490:0] -> SASL-OUTCOME @68 [0]
     -> SASL
     -> AMQP
[0x2538e40:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, 
null, null, null, null, null, null]
     <- AMQP
[0x24fae10:0] <- OPEN @16 ["1425753e-bda0-48af-a60f-b8a23c0933d3", "0.0.0.0", 
null, null, null, null, null, null, null]
[0x24fae10:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x24fae10:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, 
null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, 
null], null, null, 0]
[0x24fae10:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x24fae10:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, 
null, null, 0]
[0x24fae10:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 3, null, false]
     <- SASL
[0x2563350:0] <- SASL-INIT @65 [:ANONYMOUS, b""]
[0x2563350:0] -> SASL-MECHANISMS @64 [@PN_SYMBOL[:ANONYMOUS]]
[0x2563350:0] -> SASL-OUTCOME @68 [0]
     -> SASL
     -> AMQP
[0x255cd00:0] -> OPEN @16 ["a03b1f27-5053-47f0-ae85-c543782480b5", null, null, 
null, null, null, null, null, null]
     <- AMQP
[0x2538e40:0] <- OPEN @16 ["35806640-4a26-47a2-a6e2-7fe7505938cf", "0.0.0.0", 
null, null, null, null, null, null, null]
[0x2538e40:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x2538e40:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, 
null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, 
null], null, null, 0]
[0x2538e40:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x2538e40:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, 
null, null, 0]
[0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
     <- AMQP
[0x255cd00:0] <- OPEN @16 ["c8b87edf-6971-4d73-9790-e6f44772cebb", "0.0.0.0", 
null, null, null, null, null, null, null]
[0x255cd00:1] <- BEGIN @17 [null, 0, 1024, 1024]
[0x255cd00:1] <- ATTACH @18 ["sender-xxx", 1, false, null, null, @40 [null, 0, 
null, 0, false, null, null, null, null, null, null], @41 [null, 0, null, 0, false, null, 
null], null, null, 0]
[0x255cd00:1] -> BEGIN @17 [1, 0, 1024, 1024]
[0x255cd00:1] -> ATTACH @18 ["sender-xxx", 1, true, null, null, null, null, 
null, null, 0]
[0x255cd00:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 0, null, false]
[0x24fae10:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, 
false] (148) 
"\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://1425753e-bda0-48af-a60f-b8a23c0933d3@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message
 from 22470"
APPLICATION OUTPUT:   getting message...
APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
APPLICATION OUTPUT:   Content: "Message from 22470"
APPLICATION OUTPUT:   receiving...
[0x24fae10:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
[0x2538e40:1] -> FLOW @19 [0, 1024, 0, 1024, 1, 0, 1, null, false]
[0x24fae10:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
[0x2538e40:1] <- TRANSFER @20 [1, 0, b"\x00\x00\x00\x00\x00\x00\x00\x00", 0, false, 
false] (148) 
"\x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00b\x00\x00\x00\x0d@@\xa1\x13amqp://0.0.0.0:6666@\xa1+amqp://35806640-4a26-47a2-a6e2-7fe7505938cf@@@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x00@R\x00@\x00Sw\xa1\x12Message
 from 22469"
getting message...
APPLICATION OUTPUT:   Address: amqp://0.0.0.0:6666
APPLICATION OUTPUT:   Content: "Message from 22469
APPLICATION OUTPUT:   receiving...
[0x2538e40:1] -> FLOW @19 [1, 1023, 0, 1024, 1, 1, 1, null, false]
[0x2538e40:1] -> DISPOSITION @21 [true, 0, 0, false, @36 []]
[0x2538e40:1] <- DISPOSITION @21 [false, 0, 0, false, @36 []]
[0x24fae10:1] <- DETACH @22 [1, true, null]
[0x24fae10:0] <- CLOSE @24 [null]
[0x24fae10:0] <- EOS
[0x24fae10:1] -> DETACH @22 [1, true, null]
[0x24fae10:0] -> CLOSE @24 [null]
[0x24fae10:0] -> EOS
Closed localhost:42468
[0x2538e40:1] <- DETACH @22 [1, true, null]
[0x2538e40:0] <- CLOSE @24 [null]
[0x2538e40:0] <- EOS
[0x2538e40:1] -> DETACH @22 [1, true, null]
[0x2538e40:0] -> CLOSE @24 [null]
[0x2538e40:0] -> EOS
Closed localhost:42469

----------------- end trace -------------------------










----- Original Message -----
Any clues from a trace of the receiver?

$ PN_TRACE_FRM=1 ./receiver 6666

-Ted

On 04/04/2013 02:09 PM, Michael Goulish wrote:
    Is this a bug, or am I  Doing  Something  Wrong ?



Scenario
{
    My sender sends a single message, and hopes to see
    that the receiver has accepted it.

    I launch 3 copies of the sender very close together--
    they all talk to the same address.

    My receiver receives in a loop, accepts every message
    that it receives.
}




Result
{
    Sometimes my receiver gets 1 of the 3 messages.
    Usually it gets 2.
    It never gets all 3.

    The 3rd sender hangs in pn_messenger_send().

    While the 3rd sender is hanging in send(), the receiver
    is patiently waiting in recv().
}






Sender Code ############################################

/*
    Launch 3 of these from a script like so:
    ./sender 6666 &
    ./sender 6666 &
    ./sender 6666 &
*/


#include "proton/message.h"
#include "proton/messenger.h"

#include <getopt.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>


char *
status_2_str ( pn_status_t status )
{
    switch ( status )
    {
      case PN_STATUS_UNKNOWN:
        return "unknown";
        break;

      case PN_STATUS_PENDING:
        return "pending";
        break;

      case PN_STATUS_ACCEPTED:
        return "accepted";
        break;

      case PN_STATUS_REJECTED:
        return "rejected";
        break;

      default:
        return "bad value";
        break;
    }
}



pid_t my_pid = 0;


void
check ( char * label, int result )
{
    fprintf ( stderr, "%d %s result: %d\n", my_pid, label, result );
}



int
main(int argc, char** argv)
{
    int c;
    char addr [ 1000 ];
    char msgtext [ 100 ];
    pn_message_t   * message;
    pn_messenger_t * messenger;
    pn_data_t      * body;
    pn_tracker_t     tracker;
    pn_status_t      status;
    int              result;

    my_pid = getpid();

    sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );


    message = pn_message ( );
    messenger = pn_messenger ( NULL );
    pn_messenger_start ( messenger ) ;
    pn_messenger_set_outgoing_window ( messenger, 1 );


    pn_message_set_address ( message, addr );
    body = pn_message_body ( message );


    sprintf ( msgtext, "Message from %d", getpid() );
    pn_data_put_string ( body, pn_bytes ( strlen ( msgtext ), msgtext ));
    pn_messenger_put ( messenger, message );
    tracker = pn_messenger_outgoing_tracker ( messenger );
    pn_messenger_send ( messenger );


    status = pn_messenger_status ( messenger, tracker );
    fprintf ( stderr, "status : %s\n", status_2_str(status) );


    pn_messenger_stop ( messenger );
    pn_messenger_free ( messenger );
    pn_message_free ( message );

    return 0;
}




Receiver Code ########################################################

/*

    Launch like this:
    ./receiver 6666
*/

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>

#include "proton/message.h"
#include "proton/messenger.h"



#define BUFSIZE 1024



int
main(int argc, char** argv)
{
    size_t bufsize = BUFSIZE;
    char buffer [ BUFSIZE ];
    char addr [ 1000 ];
    pn_message_t   * message;
    pn_messenger_t * messenger;
    pn_data_t      * body;
    pn_tracker_t     tracker;


    sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );

    message = pn_message();
    messenger = pn_messenger ( NULL );

    pn_messenger_start(messenger);
    pn_messenger_subscribe ( messenger, addr );
    pn_messenger_set_incoming_window ( messenger, 5 );

    /*---------------------------------
      Receive and accept the message.
    ---------------------------------*/
    while ( 1 )
    {
      fprintf ( stderr, "receiving...\n" );
      pn_messenger_recv ( messenger, 3 );

      while ( pn_messenger_incoming ( messenger ) > 0 )
      {
        fprintf ( stderr, "getting message...\n" );
        pn_messenger_get ( messenger, message );
        tracker = pn_messenger_incoming_tracker ( messenger );
        pn_messenger_accept ( messenger, tracker, 0 );
        body = pn_message_body ( message );
        pn_data_format ( body, buffer, & bufsize );
        fprintf ( stdout, "Address: %s\n", pn_message_get_address ( message ) );
        fprintf ( stdout, "Content: %s\n", buffer);
      }
    }

    pn_messenger_stop(messenger);
    pn_messenger_free(messenger);

    return 0;
}



Reply via email to