[ 
https://issues.apache.org/jira/browse/PROTON-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13576893#comment-13576893
 ] 

michael goulish commented on PROTON-222:
----------------------------------------

I am able to get my example working the way I want to by using a tracker, with 
window size 1, on the sender, and calling  pn_messenger_status() after every 
message sent.

new code:

==================================== sender 
=============================================

#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>



int
main(int argc, char** argv)
{
  int c;
  opterr = 0;
  char addr [ 1000 ];
  char content [ 1000 ];
  char subject [ 1000 ];

  sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );

  pn_message_t * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);
  pn_messenger_set_outgoing_window ( messenger, 1 );

  pn_messenger_start(messenger);

  int n_messages = 10;
  int sent_count;

  /*------------------------------------------
    Put and send a message every 1 second.
  ------------------------------------------*/
  for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )
  {
    sleep ( 1 );
    sprintf ( subject, "This is message %d.", sent_count + 1 );
    pn_message_set_address ( message, addr );
    pn_message_set_subject ( message, subject );
    pn_data_t *body = pn_message_body(message);
    sprintf ( content, "Hello, Proton!" );
    pn_data_put_string(body, pn_bytes(strlen(content), content));
    pn_messenger_put(messenger, message);

    pn_tracker_t tracker;
    tracker = pn_messenger_outgoing_tracker ( messenger );

    pn_messenger_send(messenger);

    pn_messenger_status ( messenger, tracker );

    fprintf ( stderr, "sent %d messages.\n", sent_count + 1 );
  }


  // Countdown to stop, to give me time to see it ....
  fprintf ( stderr, "Calling stop in ...\n" );
  for ( int i = 5; i > 0; -- i )
  {
    fprintf ( stderr, "%d\n", i );
    sleep ( 1 );
  }
  fprintf ( stderr, "stop.\n");

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);
  pn_message_free(message);

  return 0;
}



===================================== receiver 
=========================================
#include "proton/message.h"
#include "proton/messenger.h"

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>



#define BUFSIZE 1024


void
consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
{
  for ( int consume_count = 0; consume_count < n; ++ consume_count )
  {
    pn_messenger_get ( messenger, message );

    size_t bufsize = BUFSIZE;
    char buffer [ bufsize ];
    pn_data_t * body = pn_message_body ( message );
    pn_data_format ( body, buffer, & bufsize );

    printf ( "\n\nMessage ----------------------------\n");
    printf ( "Address: %s\n", pn_message_get_address ( message ) );
    char const * subject = pn_message_get_subject(message);
    printf ( "Subject: %s\n", subject ? subject : "(no subject)" );
    printf("Content: %s\n\n", buffer);
  }
}


int
main(int argc, char** argv)
{
  char addr [ 1000 ];

  sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
  pn_message_t   * message;
  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger ( NULL );

  pn_messenger_start(messenger);
  pn_messenger_subscribe ( messenger, addr );

  int messages_wanted    = 10;
  int total_received     =  0;
  int received_this_time;

  pn_messenger_set_timeout ( messenger, 700 );

  int tries = 0;
  while ( total_received < messages_wanted )
  {
    ++ tries;
    pn_messenger_recv ( messenger, BUFSIZE );
    received_this_time = pn_messenger_incoming ( messenger );
    fprintf ( stderr,
              "try: %d received: %d   total: %d\n",
              tries,
              received_this_time,
              total_received
            );
    consume_messages ( messenger, received_this_time, message );
    total_received += received_this_time;
  }

  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);

  return 0;
}








                
> pn_messenger_send returns before message data has been written to the wire
> --------------------------------------------------------------------------
>
>                 Key: PROTON-222
>                 URL: https://issues.apache.org/jira/browse/PROTON-222
>             Project: Qpid Proton
>          Issue Type: Bug
>          Components: proton-c, proton-j
>    Affects Versions: 0.3
>            Reporter: Rafael H. Schloming
>            Assignee: Ken Giusti
>             Fix For: 0.4
>
>         Attachments: transport.patch
>
>
> Currently, pn_messender_send will block until the engine reports there are no 
> queued messages being held. The problem arises because the queued message 
> count only reports message data that is being held by the engine due to 
> insufficient credit to send the messages. Messages may also be sitting in the 
> transport's encoded frame buffer waiting to be written to the wire, and 
> messages may also be held by the driver itself. This latter possibly is 
> problematic given the current transport interface because there is no way for 
> an application using the engine (in this case messenger) to know whether data 
> is being held by the driver without introducing an undesirable coupling 
> between the application and the driver implementation.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to