[ https://issues.apache.org/jira/browse/PROTON-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13576893#comment-13576893 ]
michael goulish commented on PROTON-222: ---------------------------------------- I am able to get my example working the way I want to by using a tracker, with window size 1, on the sender, and calling pn_messenger_status() after every message sent. new code: ==================================== sender ============================================= #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <string.h> int main(int argc, char** argv) { int c; opterr = 0; char addr [ 1000 ]; char content [ 1000 ]; char subject [ 1000 ]; sprintf ( addr, "amqp://0.0.0.0:%s", argv[1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger(NULL); pn_messenger_set_outgoing_window ( messenger, 1 ); pn_messenger_start(messenger); int n_messages = 10; int sent_count; /*------------------------------------------ Put and send a message every 1 second. ------------------------------------------*/ for ( sent_count = 0 ; sent_count < n_messages; ++ sent_count ) { sleep ( 1 ); sprintf ( subject, "This is message %d.", sent_count + 1 ); pn_message_set_address ( message, addr ); pn_message_set_subject ( message, subject ); pn_data_t *body = pn_message_body(message); sprintf ( content, "Hello, Proton!" ); pn_data_put_string(body, pn_bytes(strlen(content), content)); pn_messenger_put(messenger, message); pn_tracker_t tracker; tracker = pn_messenger_outgoing_tracker ( messenger ); pn_messenger_send(messenger); pn_messenger_status ( messenger, tracker ); fprintf ( stderr, "sent %d messages.\n", sent_count + 1 ); } // Countdown to stop, to give me time to see it .... fprintf ( stderr, "Calling stop in ...\n" ); for ( int i = 5; i > 0; -- i ) { fprintf ( stderr, "%d\n", i ); sleep ( 1 ); } fprintf ( stderr, "stop.\n"); pn_messenger_stop(messenger); pn_messenger_free(messenger); pn_message_free(message); return 0; } ===================================== receiver ========================================= #include "proton/message.h" #include "proton/messenger.h" #include <stdio.h> #include <stdlib.h> #include <ctype.h> #define BUFSIZE 1024 void consume_messages ( pn_messenger_t * messenger, int n, pn_message_t * message ) { for ( int consume_count = 0; consume_count < n; ++ consume_count ) { pn_messenger_get ( messenger, message ); size_t bufsize = BUFSIZE; char buffer [ bufsize ]; pn_data_t * body = pn_message_body ( message ); pn_data_format ( body, buffer, & bufsize ); printf ( "\n\nMessage ----------------------------\n"); printf ( "Address: %s\n", pn_message_get_address ( message ) ); char const * subject = pn_message_get_subject(message); printf ( "Subject: %s\n", subject ? subject : "(no subject)" ); printf("Content: %s\n\n", buffer); } } int main(int argc, char** argv) { char addr [ 1000 ]; sprintf ( addr, "amqp://~0.0.0.0:%s", argv[1] ); pn_message_t * message; pn_messenger_t * messenger; message = pn_message(); messenger = pn_messenger ( NULL ); pn_messenger_start(messenger); pn_messenger_subscribe ( messenger, addr ); int messages_wanted = 10; int total_received = 0; int received_this_time; pn_messenger_set_timeout ( messenger, 700 ); int tries = 0; while ( total_received < messages_wanted ) { ++ tries; pn_messenger_recv ( messenger, BUFSIZE ); received_this_time = pn_messenger_incoming ( messenger ); fprintf ( stderr, "try: %d received: %d total: %d\n", tries, received_this_time, total_received ); consume_messages ( messenger, received_this_time, message ); total_received += received_this_time; } pn_messenger_stop(messenger); pn_messenger_free(messenger); return 0; } > pn_messenger_send returns before message data has been written to the wire > -------------------------------------------------------------------------- > > Key: PROTON-222 > URL: https://issues.apache.org/jira/browse/PROTON-222 > Project: Qpid Proton > Issue Type: Bug > Components: proton-c, proton-j > Affects Versions: 0.3 > Reporter: Rafael H. Schloming > Assignee: Ken Giusti > Fix For: 0.4 > > Attachments: transport.patch > > > Currently, pn_messender_send will block until the engine reports there are no > queued messages being held. The problem arises because the queued message > count only reports message data that is being held by the engine due to > insufficient credit to send the messages. Messages may also be sitting in the > transport's encoded frame buffer waiting to be written to the wire, and > messages may also be held by the driver itself. This latter possibly is > problematic given the current transport interface because there is no way for > an application using the engine (in this case messenger) to know whether data > is being held by the driver without introducing an undesirable coupling > between the application and the driver implementation. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira