Repository: qpid-proton Updated Branches: refs/heads/master dc724c140 -> bdd986a1d (forced update)
PROTON-1823: [c] pn_message_send() simplified message sending for C Encapsulates the awkward allocate-encode-expand dance required by pn_message_encode() Supports the following 2 scenarios: 1. Simple: don't care about allocations, just send `pn_message_t *msg` and forget it: pn_message_send(msg, sender, NULL) 2. Efficient: re-use a buffer, buffer is allocated and expanded as required: pn_rwbytes_t buffer={0}; // Zero initialize, libary will do the allocation ... pn_message_send(msg, sender, &buffer); // Expand as needed pn_message_send(msg2, sender2, &buffer); // etc. ... free(buffer->start); // Application must do final free of buffer Note 2. assumes use of malloc/realloc/free, apps that need custom allocation can use the original pn_message_encode() API or we could add a version that takes a pointer to a function equivalent to realloc() Updated examples to use this API. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bdd986a1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bdd986a1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bdd986a1 Branch: refs/heads/master Commit: bdd986a1d8da5b5b0811aa244d974f48a757e3d7 Parents: 3cb7a5c Author: Alan Conway <acon...@redhat.com> Authored: Fri Apr 6 15:54:11 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Apr 6 17:45:38 2018 -0400 ---------------------------------------------------------------------- c/examples/direct.c | 37 ++++++++---------------------------- c/examples/send.c | 41 +++++++++++----------------------------- c/include/proton/message.h | 24 ++++++++++++++++++++++- c/src/core/message.c | 28 +++++++++++++++++++++++++++ c/tests/connection_driver.c | 8 +++----- 5 files changed, 73 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/examples/direct.c ---------------------------------------------------------------------- diff --git a/c/examples/direct.c b/c/examples/direct.c index 6d8642c..d2a4ae3 100644 --- a/c/examples/direct.c +++ b/c/examples/direct.c @@ -76,7 +76,7 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond, app_data_t *app } /* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ -static pn_bytes_t encode_message(app_data_t* app) { +static void send_message(app_data_t *app, pn_link_t *sender) { /* Construct a message with the map { "sequence": app.sent } */ pn_message_t* message = pn_message(); pn_data_t* body = pn_message_body(message); @@ -86,29 +86,11 @@ static pn_bytes_t encode_message(app_data_t* app) { pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); pn_data_put_int(body, app->sent); /* The sequence number */ pn_data_exit(body); - - /* encode the message, expanding the encode buffer as needed */ - if (app->msgout.start == NULL) { - static const size_t initial_size = 128; - app->msgout = pn_rwbytes(initial_size, (char*)malloc(initial_size)); - } - /* app->msgout is the total buffer space available. */ - /* mbuf wil point at just the portion used by the encoded message */ - { - pn_rwbytes_t mbuf = pn_rwbytes(app->msgout.size, app->msgout.start); - int status = 0; - while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->msgout.size *= 2; - app->msgout.start = (char*)realloc(app->msgout.start, app->msgout.size); - mbuf.size = app->msgout.size; - } - if (status != 0) { - fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); - exit(1); + if (pn_message_send(message, sender, &app->msgout) < 0) { + fprintf(stderr, "send error: %s\n", pn_error_text(pn_message_error(message))); + exit_code = 1; } pn_message_free(message); - return pn_bytes(mbuf.size, mbuf.start); - } } static void decode_message(pn_rwbytes_t data) { @@ -124,7 +106,7 @@ static void decode_message(pn_rwbytes_t data) { pn_message_free(m); free(data.start); } else { - fprintf(stderr, "decode_message: %s\n", pn_code(err)); + fprintf(stderr, "decode error: %s\n", pn_error_text(pn_message_error(m))); exit_code = 1; } } @@ -142,7 +124,7 @@ static void handle_receive(app_data_t *app, pn_event_t* event) { case PN_DELIVERY: { /* Incoming message data */ pn_delivery_t *d = pn_event_delivery(event); if (pn_delivery_readable(d)) { - pn_link_t *l = pn_delivery_link(d); + pn_link_t *l = pn_delivery_link(d); size_t size = pn_delivery_pending(d); pn_rwbytes_t* m = &app->msgin; /* Append data to incoming message buffer */ ssize_t recv; @@ -198,11 +180,8 @@ static void handle_send(app_data_t* app, pn_event_t* event) { ++app->sent; /* Use sent counter as unique delivery tag. */ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); - { - pn_bytes_t msgbuf = encode_message(app); - pn_link_send(sender, msgbuf.start, msgbuf.size); + send_message(app, sender); pn_link_advance(sender); - } } break; } @@ -308,7 +287,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { } } } - return true; + return exit_code == 0; } void run(app_data_t *app) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/examples/send.c ---------------------------------------------------------------------- diff --git a/c/examples/send.c b/c/examples/send.c index 9e8cc4a..8d979e6 100644 --- a/c/examples/send.c +++ b/c/examples/send.c @@ -38,6 +38,7 @@ typedef struct app_data_t { int message_count; pn_proactor_t *proactor; + pn_message_t *message; pn_rwbytes_t message_buffer; int sent; int acknowledged; @@ -55,40 +56,21 @@ static void check_condition(pn_event_t *e, pn_condition_t *cond) { } /* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ -static pn_bytes_t encode_message(app_data_t* app) { +static void send_message(app_data_t* app, pn_link_t *sender) { /* Construct a message with the map { "sequence": app.sent } */ - pn_message_t* message = pn_message(); - pn_data_t* body = pn_message_body(message); - pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ + pn_data_t* body; + pn_message_clear(app->message); + body = pn_message_body(app->message); + pn_data_put_int(pn_message_id(app->message), app->sent); /* Set the message_id also */ pn_data_put_map(body); pn_data_enter(body); pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); pn_data_put_int(body, app->sent); /* The sequence number */ pn_data_exit(body); - - /* encode the message, expanding the encode buffer as needed */ - if (app->message_buffer.start == NULL) { - static const size_t initial_size = 128; - app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); - } - /* app->message_buffer is the total buffer space available. */ - /* mbuf wil point at just the portion used by the encoded message */ - { - pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); - int status = 0; - while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->message_buffer.size *= 2; - app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); - mbuf.size = app->message_buffer.size; - mbuf.start = app->message_buffer.start; - } - if (status != 0) { - fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); + if (pn_message_send(app->message, sender, &app->message_buffer) < 0) { + fprintf(stderr, "error sending message: %s\n", pn_error_text(pn_message_error(app->message))); exit(1); } - pn_message_free(message); - return pn_bytes(mbuf.size, mbuf.start); - } } /* Returns true to continue, false if finished */ @@ -116,10 +98,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { ++app->sent; /* Use sent counter as unique delivery tag. */ pn_delivery(sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); - { - pn_bytes_t msgbuf = encode_message(app); - pn_link_send(sender, msgbuf.start, msgbuf.size); - } + send_message(app, sender); pn_link_advance(sender); } break; @@ -193,6 +172,7 @@ int main(int argc, char **argv) { app.port = (argc > 2) ? argv[2] : "amqp"; app.amqp_address = (argc > 3) ? argv[3] : "examples"; app.message_count = (argc > 4) ? atoi(argv[4]) : 10; + app.message = pn_message(); app.proactor = pn_proactor(); pn_proactor_addr(addr, sizeof(addr), app.host, app.port); @@ -200,5 +180,6 @@ int main(int argc, char **argv) { run(&app); pn_proactor_free(app.proactor); free(app.message_buffer.start); + pn_message_free(app.message); return exit_code; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/include/proton/message.h ---------------------------------------------------------------------- diff --git a/c/include/proton/message.h b/c/include/proton/message.h index f9d62b0..fd69688 100644 --- a/c/include/proton/message.h +++ b/c/include/proton/message.h @@ -723,7 +723,7 @@ PN_EXTERN pn_data_t *pn_message_body(pn_message_t *msg); PN_EXTERN int pn_message_decode(pn_message_t *msg, const char *bytes, size_t size); /** - * Encode/save message content as AMQP formatted binary data. + * Encode a message as AMQP formatted binary data. * * If the buffer space provided is insufficient to store the content * held in the message, the operation will fail and return a @@ -737,6 +737,28 @@ PN_EXTERN int pn_message_decode(pn_message_t *msg, const char *bytes, size_t siz */ PN_EXTERN int pn_message_encode(pn_message_t *msg, char *bytes, size_t *size); +struct pn_link_t; + +/** + * Encode and send a message on a sender link. + * + * @param[in] msg A message object. + * @param[in] sender A sending link. + * The message will be encoded and sent with pn_link_send() + * @param[inout] buf Used to encode the message. + * - if buf == NULL, temporary space will be allocated and freed with malloc()/free() + * - if buf->start != NULL and buf->size is large enough, the message is encoded to + * buf->start + * - if buf->start == NULL or buf->size is not enough, the buffer will be extended like this: + * + * buf->size = new_size; buf->start = realloc(buf->start, new_size) + * + * it is possible for the buffer to be extended more than once. + * @return The number of bytes encoded and sent on success. + * Returns an error code (< 0) on failure and sets pn_message_error() on msg + */ +PN_EXTERN ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, pn_rwbytes_t *buf); + /** * Save message content into a pn_data_t object data. The data object will first be cleared. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/src/core/message.c ---------------------------------------------------------------------- diff --git a/c/src/core/message.c b/c/src/core/message.c index 8f0bcf7..3312f8c 100644 --- a/c/src/core/message.c +++ b/c/src/core/message.c @@ -26,6 +26,7 @@ #include "protocol.h" #include "util.h" +#include <proton/link.h> #include <proton/object.h> #include <proton/codec.h> #include <proton/error.h> @@ -895,3 +896,30 @@ pn_data_t *pn_message_body(pn_message_t *msg) { return msg ? msg->body : NULL; } + +PN_EXTERN ssize_t pn_message_send(pn_message_t *msg, pn_link_t *sender, pn_rwbytes_t *buffer) { + static const size_t initial_size = 256; + pn_rwbytes_t local_buf = { 0 }; + ssize_t err = 0; + size_t size = 0; + + if (buffer == NULL) buffer = &local_buf; + if (buffer->start == NULL) { + buffer->start = (char*)malloc(initial_size); + buffer->size = initial_size; + } + if (buffer->start == NULL) return PN_OUT_OF_MEMORY; + size = buffer->size; + while ((err = pn_message_encode(msg, buffer->start, &size)) == PN_OVERFLOW) { + buffer->size *= 2; + buffer->start = (char*)realloc(buffer->start, buffer->size); + if (buffer->start == NULL) return PN_OUT_OF_MEMORY; + size = buffer->size; + } + if (err == 0) { + err = pn_link_send(sender, buffer->start, size); + if (err < 0) pn_error_copy(pn_message_error(msg), pn_link_error(sender)); + } + free(local_buf.start); + return err; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bdd986a1/c/tests/connection_driver.c ---------------------------------------------------------------------- diff --git a/c/tests/connection_driver.c b/c/tests/connection_driver.c index f152e89..7e4489a 100644 --- a/c/tests/connection_driver.c +++ b/c/tests/connection_driver.c @@ -102,11 +102,10 @@ static void test_message_transfer(test_t *t) { /* Encode and send a message */ pn_message_t *m = pn_message(); pn_data_put_string(pn_message_body(m), pn_bytes(4, "abc")); /* Include trailing NULL */ - pn_rwbytes_t buf = { 0 }; - ssize_t size = message_encode(m, &buf); - pn_message_free(m); pn_delivery(snd, PN_BYTES_LITERAL(x)); - TEST_INT_EQUAL(t, size, pn_link_send(snd, buf.start, size)); + pn_message_send(m, snd, NULL); + pn_message_free(m); + TEST_CHECK(t, pn_link_advance(snd)); test_connection_drivers_run(&client, &server); TEST_HANDLER_EXPECT(&server.handler, PN_TRANSPORT, PN_DELIVERY, 0); @@ -125,7 +124,6 @@ static void test_message_transfer(test_t *t) { TEST_STR_EQUAL(t, "abc", pn_data_get_string(pn_message_body(m2)).start); pn_message_free(m2); - free(buf.start); free(buf2.start); test_connection_driver_destroy(&client); test_connection_driver_destroy(&server); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org