[ 
https://issues.apache.org/jira/browse/PROTON-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13576830#comment-13576830
 ] 

michael goulish commented on PROTON-222:
----------------------------------------

Here is some sample code that demonstrates this bug, and possibly clarifies how 
it can show up.  

Running the receiver and sender in separate windows, I can see that the 
receiver does not get anything after ten calls (on the sender) to send().  

It is only when the sender finally calls stop() that the receiver gets all ten 
messages at once.



===================================== sender 
===========================================

#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>



int
main(int argc, char** argv)
{
  int c;
  opterr = 0;
  char addr [ 1000 ];
  char content [ 1000 ];
  char subject [ 1000 ];

  sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );

  pn_message_t * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);

  pn_messenger_start(messenger);

  int n_messages = 10;
  int sent_count;

  /*------------------------------------------
    Put and send a message every 1 second.
  ------------------------------------------*/
  for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )
  {
    sleep ( 1 );
    sprintf ( subject, "This is message %d.", sent_count + 1 );
    pn_message_set_address ( message, addr );
    pn_message_set_subject ( message, subject );
    pn_data_t *body = pn_message_body(message);
    sprintf ( content, "Hello, Proton!" );
    pn_data_put_string(body, pn_bytes(strlen(content), content));
    pn_messenger_put(messenger, message);
    pn_messenger_send(messenger);
    fprintf ( stderr, "sent %d messages.\n", sent_count + 1 );
  }


  // Countdown to stop, to give me time to see it ....
  fprintf ( stderr, "Calling stop in ...\n" );
  for ( int i = 5; i > 0; -- i )
  {
    fprintf ( stderr, "%d\n", i );
    sleep ( 1 );
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);
  pn_message_free(message);

  return 0;
}



======================================  receiver  
===============================================

#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>



#define BUFSIZE 1024


int
main(int argc, char** argv)
{
  char addr [ 1000 ];

  sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
  pn_message_t   * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger ( NULL );

  pn_messenger_start(messenger);
  pn_messenger_subscribe ( messenger, addr );

  int messages_wanted    = 10;
  int total_received     =  0;
  int received_this_time;

  pn_messenger_set_timeout ( messenger, 700 );

  int tries = 0;
  while ( total_received < messages_wanted )
  {
    ++ tries;
    pn_messenger_recv ( messenger, BUFSIZE );
    received_this_time = pn_messenger_incoming ( messenger );
    fprintf ( stderr,
              "try: %d received: %d   total: %d\n",
              tries,
              received_this_time,
              total_received
            );
    total_received += received_this_time;
  }

  int total_consumed = 0;

  for ( ; total_consumed < total_received; ++ total_consumed )
  {
    pn_messenger_get ( messenger, message );

    size_t bufsize = BUFSIZE;
    char buffer [ bufsize ];
    pn_data_t * body = pn_message_body ( message );
    pn_data_format ( body, buffer, & bufsize );

    printf ( "\n\nMessage %d ----------------------------\n", total_consumed + 
1 );
    printf ( "Address: %s\n", pn_message_get_address ( message ) );
    char const * subject = pn_message_get_subject(message);
    printf ( "Subject: %s\n", subject ? subject : "(no subject)" );
    printf("Content: %s\n", buffer);
  }
  printf ( "\n\n" );

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}


                
> pn_messenger_send returns before message data has been written to the wire
> --------------------------------------------------------------------------
>
>                 Key: PROTON-222
>                 URL: https://issues.apache.org/jira/browse/PROTON-222
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: proton-c, proton-j
>    Affects Versions: 0.3
>            Reporter: Rafael H. Schloming
>            Assignee: Ken Giusti
>             Fix For: 0.4
>
>         Attachments: transport.patch
>
>
> Currently, pn_messender_send will block until the engine reports there are no 
> queued messages being held. The problem arises because the queued message 
> count only reports message data that is being held by the engine due to 
> insufficient credit to send the messages. Messages may also be sitting in the 
> transport's encoded frame buffer waiting to be written to the wire, and 
> messages may also be held by the driver itself. This latter possibly is 
> problematic given the current transport interface because there is no way for 
> an application using the engine (in this case messenger) to know whether data 
> is being held by the driver without introducing an undesirable coupling 
> between the application and the driver implementation.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to