[
https://issues.apache.org/jira/browse/PROTON-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13576893#comment-13576893
]
michael goulish commented on PROTON-222:
----------------------------------------
I am able to get my example working the way I want to by using a tracker, with
window size 1, on the sender, and calling pn_messenger_status() after every
message sent.
new code:
==================================== sender
=============================================
#include "proton/message.h"
#include "proton/messenger.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int
main(int argc, char** argv)
{
int c;
opterr = 0;
char addr [ 1000 ];
char content [ 1000 ];
char subject [ 1000 ];
sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] );
pn_message_t * message;
pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_outgoing_window ( messenger, 1 );
pn_messenger_start(messenger);
int n_messages = 10;
int sent_count;
/*------------------------------------------
Put and send a message every 1 second.
------------------------------------------*/
for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count )
{
sleep ( 1 );
sprintf ( subject, "This is message %d.", sent_count + 1 );
pn_message_set_address ( message, addr );
pn_message_set_subject ( message, subject );
pn_data_t *body = pn_message_body(message);
sprintf ( content, "Hello, Proton!" );
pn_data_put_string(body, pn_bytes(strlen(content), content));
pn_messenger_put(messenger, message);
pn_tracker_t tracker;
tracker = pn_messenger_outgoing_tracker ( messenger );
pn_messenger_send(messenger);
pn_messenger_status ( messenger, tracker );
fprintf ( stderr, "sent %d messages.\n", sent_count + 1 );
}
// Countdown to stop, to give me time to see it ....
fprintf ( stderr, "Calling stop in ...\n" );
for ( int i = 5; i > 0; -- i )
{
fprintf ( stderr, "%d\n", i );
sleep ( 1 );
}
fprintf ( stderr, "stop.\n");
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
pn_message_free(message);
return 0;
}
===================================== receiver
=========================================
#include "proton/message.h"
#include "proton/messenger.h"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#define BUFSIZE 1024
void
consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message )
{
for ( int consume_count = 0; consume_count < n; ++ consume_count )
{
pn_messenger_get ( messenger, message );
size_t bufsize = BUFSIZE;
char buffer [ bufsize ];
pn_data_t * body = pn_message_body ( message );
pn_data_format ( body, buffer, & bufsize );
printf ( "\n\nMessage ----------------------------\n");
printf ( "Address: %s\n", pn_message_get_address ( message ) );
char const * subject = pn_message_get_subject(message);
printf ( "Subject: %s\n", subject ? subject : "(no subject)" );
printf("Content: %s\n\n", buffer);
}
}
int
main(int argc, char** argv)
{
char addr [ 1000 ];
sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] );
pn_message_t * message;
pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger ( NULL );
pn_messenger_start(messenger);
pn_messenger_subscribe ( messenger, addr );
int messages_wanted = 10;
int total_received = 0;
int received_this_time;
pn_messenger_set_timeout ( messenger, 700 );
int tries = 0;
while ( total_received < messages_wanted )
{
++ tries;
pn_messenger_recv ( messenger, BUFSIZE );
received_this_time = pn_messenger_incoming ( messenger );
fprintf ( stderr,
"try: %d received: %d total: %d\n",
tries,
received_this_time,
total_received
);
consume_messages ( messenger, received_this_time, message );
total_received += received_this_time;
}
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
return 0;
}
> pn_messenger_send returns before message data has been written to the wire
> --------------------------------------------------------------------------
>
> Key: PROTON-222
> URL: https://issues.apache.org/jira/browse/PROTON-222
> Project: Qpid Proton
> Issue Type: Bug
> Components: proton-c, proton-j
> Affects Versions: 0.3
> Reporter: Rafael H. Schloming
> Assignee: Ken Giusti
> Fix For: 0.4
>
> Attachments: transport.patch
>
>
> Currently, pn_messender_send will block until the engine reports there are no
> queued messages being held. The problem arises because the queued message
> count only reports message data that is being held by the engine due to
> insufficient credit to send the messages. Messages may also be sitting in the
> transport's encoded frame buffer waiting to be written to the wire, and
> messages may also be held by the driver itself. This latter possibly is
> problematic given the current transport interface because there is no way for
> an application using the engine (in this case messenger) to know whether data
> is being held by the driver without introducing an undesirable coupling
> between the application and the driver implementation.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira