This is an automated email from the ASF dual-hosted git repository. kgiusti pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 2876ccd736da700d8d03739cd85e5c8d495f2391 Author: Kenneth Giusti <kgiu...@apache.org> AuthorDate: Wed Aug 21 14:11:50 2019 -0400 DISPATCH-1394: Fix qd_message_check() to detect truncated message headers This closes #557 --- include/qpid/dispatch/message.h | 11 +- src/message.c | 421 ++++++++++++++--------- src/python_embedded.c | 2 +- src/router_core/exchange_bindings.c | 2 +- src/router_core/modules/edge_router/addr_proxy.c | 2 +- src/router_node.c | 13 +- tests/message_test.c | 337 ++++++++++++++++-- tests/run_unit_tests_size.c | 5 + 8 files changed, 579 insertions(+), 214 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 26ddb4b..0d669c4 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -232,8 +232,17 @@ void qd_message_send(qd_message_t *msg, qd_link_t *link, bool strip_outbound_ann /** * Check that the message is well-formed up to a certain depth. Any part of the message that is * beyond the specified depth is not checked for validity. + * + * Note: some message sections are optional - QD_MESSAGE_OK is returned if the + * optional section is not present, as that is valid. */ -int qd_message_check(qd_message_t *msg, qd_message_depth_t depth); +typedef enum { + QD_MESSAGE_DEPTH_INVALID, // corrupt or malformed message detected + QD_MESSAGE_DEPTH_OK, // valid up to depth, including 'depth' if not optional + QD_MESSAGE_DEPTH_INCOMPLETE // have not received up to 'depth', or partial depth +} qd_message_depth_status_t; + +qd_message_depth_status_t qd_message_check_depth(const qd_message_t *msg, qd_message_depth_t depth); /** * Return an iterator for the requested message field. If the field is not in the message, diff --git a/src/message.c b/src/message.c index ddc49bb..0325c14 100644 --- a/src/message.c +++ b/src/message.c @@ -69,6 +69,18 @@ static const unsigned char * const TAGS_ANY = (unsigned char "\xa1\xb1\xa3\xb3\xe0\xf0" "\x40\x56\x41\x42\x50\x60\x70\x52\x43\x80\x53\x44\x51\x61\x71\x54\x81\x55\x72\x82\x74\x84\x94\x73\x83\x98"; + +static const char * const section_names[QD_DEPTH_ALL + 1] = { + [QD_DEPTH_NONE] = "none", + [QD_DEPTH_HEADER] = "header", + [QD_DEPTH_DELIVERY_ANNOTATIONS] = "delivery annotations", + [QD_DEPTH_MESSAGE_ANNOTATIONS] = "message annotations", + [QD_DEPTH_PROPERTIES] = "properties", + [QD_DEPTH_APPLICATION_PROPERTIES] = "application properties", + [QD_DEPTH_BODY] = "body", + [QD_DEPTH_ALL] = "footer" +}; + PN_HANDLE(PN_DELIVERY_CTX) ALLOC_DEFINE_CONFIG(qd_message_t, sizeof(qd_message_pvt_t), 0, 0); @@ -328,7 +340,9 @@ static void print_field( static const char REPR_END[] = "}\0"; char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits flags) { - if (flags == 0 || !qd_message_check(msg, QD_DEPTH_APPLICATION_PROPERTIES)) { + if (flags == 0 + || qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) != QD_MESSAGE_DEPTH_OK + || !((qd_message_pvt_t *)msg)->content->section_application_properties.parsed) { return NULL; } char *begin = buffer; @@ -357,20 +371,22 @@ char* qd_message_repr(qd_message_t *msg, char* buffer, size_t len, qd_log_bits f /** * Advance cursor through buffer chain by 'consume' bytes. * Cursor and buffer args are advanced to point to new position in buffer chain. - * - if the number of bytes in the buffer chain is less than or equal to - * the consume number then return a null buffer and cursor. + * - if the number of bytes in the buffer chain is less than or equal to + * the consume number then set *cursor and *buffer to NULL and + * return the number of missing bytes * - the original buffer chain is not changed or freed. * * @param cursor Pointer into current buffer content * @param buffer pointer to current buffer * @param consume number of bytes to advance + * @return 0 if all bytes consumed, != 0 if not enough bytes available */ -static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) +static int advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) { unsigned char *local_cursor = *cursor; qd_buffer_t *local_buffer = *buffer; - int remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer)); + int remaining = qd_buffer_cursor(local_buffer) - local_cursor; while (consume > 0) { if (consume < remaining) { local_cursor += consume; @@ -383,12 +399,14 @@ static void advance(unsigned char **cursor, qd_buffer_t **buffer, int consume) break; } local_cursor = qd_buffer_base(local_buffer); - remaining = qd_buffer_size(local_buffer) - (local_cursor - qd_buffer_base(local_buffer)); + remaining = qd_buffer_size(local_buffer); } } *cursor = local_cursor; *buffer = local_buffer; + + return consume; } @@ -556,33 +574,47 @@ static int start_list(unsigned char **cursor, qd_buffer_t **buffer) } +// Validate a message section (header, body, etc). This determines whether or +// not a given section is present and complete at the start of the buffer chain. +// +// The section is identified by a 'pattern' (a descriptor identifier, such as +// "MESSAGE_ANNOTATION_LONG" above). The descriptor also provides a type +// 'tag', which MUST match else the section is invalid. // -// Check the buffer chain, starting at cursor to see if it matches the pattern. -// If the pattern matches, check the next tag to see if it's in the set of expected -// tags. If not, return zero. If so, set the location descriptor to the good -// tag and advance the cursor (and buffer, if needed) to the end of the matched section. +// Non-Body message sections are optional. So if the pattern does NOT match +// then the section that the pattern represents is not present. Whether or not +// this is acceptable is left to the caller. // -// If there is no match, don't advance the cursor. +// If the pattern and tag match, extract the length and verify that the entire +// section is present in the buffer chain. If this is the case then store the +// start of the section in 'location' and advance '*buffer' and '*cursor' to +// the next section. // -// Return 0 if the pattern matches but the following tag is unexpected -// Return 0 if the pattern matches and the location already has a pointer (duplicate section) -// Return 1 if the pattern matches and we've advanced the cursor/buffer -// Return 1 if the pattern does not match +// if there is not enough of the section present in the buffer chain we need to +// wait until more data arrives and try again. // -static int qd_check_and_advance(qd_buffer_t **buffer, - unsigned char **cursor, - const unsigned char *pattern, - int pattern_length, - const unsigned char *expected_tags, - qd_field_location_t *location) +// +typedef enum { + QD_SECTION_INVALID, // invalid section (tag mismatch, duplicate section, etc). + QD_SECTION_MATCH, + QD_SECTION_NO_MATCH, + QD_SECTION_NEED_MORE // not enough data in the buffer chain - try again +} qd_section_status_t; + +static qd_section_status_t message_section_check(qd_buffer_t **buffer, + unsigned char **cursor, + const unsigned char *pattern, + int pattern_length, + const unsigned char *expected_tags, + qd_field_location_t *location) { qd_buffer_t *test_buffer = *buffer; unsigned char *test_cursor = *cursor; if (!test_cursor) - return 1; // no match + return QD_SECTION_NEED_MORE; - unsigned char *end_of_buffer = qd_buffer_base(test_buffer) + qd_buffer_size(test_buffer); + unsigned char *end_of_buffer = qd_buffer_cursor(test_buffer); int idx = 0; while (idx < pattern_length && *test_cursor == pattern[idx]) { @@ -591,14 +623,14 @@ static int qd_check_and_advance(qd_buffer_t **buffer, if (test_cursor == end_of_buffer) { test_buffer = test_buffer->next; if (test_buffer == 0) - return 1; // Pattern didn't match + return QD_SECTION_NEED_MORE; test_cursor = qd_buffer_base(test_buffer); end_of_buffer = test_cursor + qd_buffer_size(test_buffer); } } if (idx < pattern_length) - return 1; // Pattern didn't match + return QD_SECTION_NO_MATCH; // // Pattern matched, check the tag @@ -606,10 +638,10 @@ static int qd_check_and_advance(qd_buffer_t **buffer, while (*expected_tags && *test_cursor != *expected_tags) expected_tags++; if (*expected_tags == 0) - return 0; // Unexpected tag + return QD_SECTION_INVALID; // Error: Unexpected tag if (location->parsed) - return 0; // Duplicate section + return QD_SECTION_INVALID; // Error: Duplicate section // // Pattern matched and tag is expected. Mark the beginning of the section. @@ -620,18 +652,22 @@ static int qd_check_and_advance(qd_buffer_t **buffer, location->hdr_length = pattern_length; // - // Advance the pointers to consume the whole section. + // Check that the full section is present, if so advance the pointers to + // consume the whole section. // int pre_consume = 1; // Count the already extracted tag - int consume = 0; + uint32_t consume = 0; unsigned char tag = next_octet(&test_cursor, &test_buffer); - unsigned char tag_subcat = tag & 0xF0; + + // if there is no more data the only valid data type is a null type (0x40), + // size is implied as 0 if (!test_cursor && tag_subcat != 0x40) - return 0; + return QD_SECTION_NEED_MORE; switch (tag_subcat) { - case 0x40: break; + // fixed sizes: + case 0x40: /* null */ break; case 0x50: consume = 1; break; case 0x60: consume = 2; break; case 0x70: consume = 4; break; @@ -641,37 +677,43 @@ static int qd_check_and_advance(qd_buffer_t **buffer, case 0xB0: case 0xD0: case 0xF0: + // uint32_t size field: pre_consume += 3; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 24; - if (!test_cursor) return 0; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 16; - if (!test_cursor) return 0; - consume |= ((int) next_octet(&test_cursor, &test_buffer)) << 8; - if (!test_cursor) return 0; + consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 24; + if (!test_cursor) return QD_SECTION_NEED_MORE; + consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 16; + if (!test_cursor) return QD_SECTION_NEED_MORE; + consume |= ((uint32_t) next_octet(&test_cursor, &test_buffer)) << 8; + if (!test_cursor) return QD_SECTION_NEED_MORE; // Fall through to the next case... case 0xA0: case 0xC0: case 0xE0: + // uint8_t size field pre_consume += 1; - consume |= (int) next_octet(&test_cursor, &test_buffer); - if (!test_cursor) return 0; + consume |= (uint32_t) next_octet(&test_cursor, &test_buffer); + if (!test_cursor) return QD_SECTION_NEED_MORE; break; } location->length = pre_consume + consume; - if (consume) - advance(&test_cursor, &test_buffer, consume); + if (consume) { + if (advance(&test_cursor, &test_buffer, consume) != 0) { + return QD_SECTION_NEED_MORE; // whole section not fully received + } + } // // increment the reference count of the parsed section as location now - // references it. Note that the cursor has advanced to the octet after the - // parsed section, so be careful not to include an extra buffer past the - // end + // references it. Note that the cursor may have advanced to the octet after + // the parsed section, so be careful not to include an extra buffer past + // the end. And cursor + buffer will be null if the parsed section ends at + // the end of the buffer chain, so be careful of that, too! // qd_buffer_t *start = *buffer; qd_buffer_t *last = test_buffer; - if (last != start && last != 0) { + if (last && last != start) { if (test_cursor == qd_buffer_base(last)) { // last does not include octets for the current section last = DEQ_PREV(last); @@ -689,7 +731,7 @@ static int qd_check_and_advance(qd_buffer_t **buffer, *cursor = test_cursor; *buffer = test_buffer; - return 1; + return QD_SECTION_MATCH; } @@ -762,7 +804,7 @@ static qd_field_location_t *qd_message_properties_field(qd_message_t *msg, qd_me qd_message_content_t *content = MSG_CONTENT(msg); if (!content->section_message_properties.parsed) { - if (!qd_message_check(msg, QD_DEPTH_PROPERTIES) || !content->section_message_properties.parsed) + if (qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) != QD_MESSAGE_DEPTH_OK || !content->section_message_properties.parsed) return 0; } @@ -834,7 +876,7 @@ static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_mess switch (section) { case QD_FIELD_HEADER: if (content->section_message_header.parsed || - (qd_message_check(msg, QD_DEPTH_HEADER) && content->section_message_header.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_HEADER) == QD_MESSAGE_DEPTH_OK && content->section_message_header.parsed)) return &content->section_message_header; break; @@ -843,31 +885,31 @@ static qd_field_location_t *qd_message_field_location(qd_message_t *msg, qd_mess case QD_FIELD_DELIVERY_ANNOTATION: if (content->section_delivery_annotation.parsed || - (qd_message_check(msg, QD_DEPTH_DELIVERY_ANNOTATIONS) && content->section_delivery_annotation.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_delivery_annotation.parsed)) return &content->section_delivery_annotation; break; case QD_FIELD_MESSAGE_ANNOTATION: if (content->section_message_annotation.parsed || - (qd_message_check(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) && content->section_message_annotation.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) == QD_MESSAGE_DEPTH_OK && content->section_message_annotation.parsed)) return &content->section_message_annotation; break; case QD_FIELD_APPLICATION_PROPERTIES: if (content->section_application_properties.parsed || - (qd_message_check(msg, QD_DEPTH_APPLICATION_PROPERTIES) && content->section_application_properties.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_APPLICATION_PROPERTIES) == QD_MESSAGE_DEPTH_OK && content->section_application_properties.parsed)) return &content->section_application_properties; break; case QD_FIELD_BODY: if (content->section_body.parsed || - (qd_message_check(msg, QD_DEPTH_BODY) && content->section_body.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_BODY) == QD_MESSAGE_DEPTH_OK && content->section_body.parsed)) return &content->section_body; break; case QD_FIELD_FOOTER: if (content->section_footer.parsed || - (qd_message_check(msg, QD_DEPTH_ALL) && content->section_footer.parsed)) + (qd_message_check_depth(msg, QD_DEPTH_ALL) == QD_MESSAGE_DEPTH_OK && content->section_footer.parsed)) return &content->section_footer; break; @@ -1754,154 +1796,201 @@ void qd_message_send(qd_message_t *in_msg, } -static int qd_check_field_LH(qd_message_content_t *content, - qd_message_depth_t depth, - const unsigned char *long_pattern, - const unsigned char *short_pattern, - const unsigned char *expected_tags, - qd_field_location_t *location, - int more) +static qd_message_depth_status_t message_check_depth_LH(qd_message_content_t *content, + qd_message_depth_t depth, + const unsigned char *long_pattern, + const unsigned char *short_pattern, + const unsigned char *expected_tags, + qd_field_location_t *location, + bool optional) { #define LONG 10 #define SHORT 3 - if (depth > content->parse_depth) { - if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location)) - return 0; - if (0 == qd_check_and_advance(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location)) - return 0; - if (!more) - content->parse_depth = depth; + if (depth <= content->parse_depth) + return QD_MESSAGE_DEPTH_OK; + + qd_section_status_t rc; + rc = message_section_check(&content->parse_buffer, &content->parse_cursor, short_pattern, SHORT, expected_tags, location); + if (rc == QD_SECTION_NO_MATCH) // try the alternative + rc = message_section_check(&content->parse_buffer, &content->parse_cursor, long_pattern, LONG, expected_tags, location); + + if (rc == QD_SECTION_MATCH || (optional && rc == QD_SECTION_NO_MATCH)) { + content->parse_depth = depth; + return QD_MESSAGE_DEPTH_OK; } - return 1; + + if (rc == QD_SECTION_NEED_MORE) { + if (!content->receive_complete) + return QD_MESSAGE_DEPTH_INCOMPLETE; + + // no more data is going to come. OK if at the end and optional: + if (!content->parse_cursor && optional) + return QD_MESSAGE_DEPTH_OK; + + // otherwise we've got an invalid (truncated) header + } + + // if QD_SECTION_NO_MATCH && !optional => INVALID; + // QD_SECTION_INVALID => INVALID; + + return QD_MESSAGE_DEPTH_INVALID; } -static bool qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth) +static qd_message_depth_status_t qd_message_check_LH(qd_message_content_t *content, qd_message_depth_t depth) { qd_error_clear(); - // - // In the case of a streaming or multi buffer message, there is a chance that some buffers might be freed before the entire - // message has arrived in which case we cannot reliably check the message using the depth. - // - if (content->buffers_freed) - return true; + if (depth <= content->parse_depth || depth == QD_DEPTH_NONE) + return QD_MESSAGE_DEPTH_OK; // We've already parsed at least this deep qd_buffer_t *buffer = DEQ_HEAD(content->buffers); - if (!buffer) { - return false; + return content->receive_complete ? QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE; } - if (depth <= content->parse_depth) - return true; // We've already parsed at least this deep - if (content->parse_buffer == 0) { content->parse_buffer = buffer; content->parse_cursor = qd_buffer_base(content->parse_buffer); } - if (depth == QD_DEPTH_NONE) - return true; + qd_message_depth_status_t rc = QD_MESSAGE_DEPTH_OK; + int last_section = QD_DEPTH_NONE; - // - // MESSAGE HEADER - // - if (0 == qd_check_field_LH(content, QD_DEPTH_HEADER, - MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, &content->section_message_header, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid header"); - return false; - } - if (depth == QD_DEPTH_HEADER) - return true; + switch (content->parse_depth + 1) { // start checking at the next unparsed section + case QD_DEPTH_HEADER: + // + // MESSAGE HEADER (optional) + // + last_section = QD_DEPTH_HEADER; + rc = message_check_depth_LH(content, QD_DEPTH_HEADER, + MSG_HDR_LONG, MSG_HDR_SHORT, TAGS_LIST, + &content->section_message_header, true); + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_HEADER) + break; - // - // DELIVERY ANNOTATION - // - if (0 == qd_check_field_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS, - DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, &content->section_delivery_annotation, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid delivery-annotations"); - return false; - } - if (depth == QD_DEPTH_DELIVERY_ANNOTATIONS) - return true; + // fallthrough - // - // MESSAGE ANNOTATION - // - if (0 == qd_check_field_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS, - MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, &content->section_message_annotation, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid annotations"); - return false; - } - if (depth == QD_DEPTH_MESSAGE_ANNOTATIONS) - return true; + case QD_DEPTH_DELIVERY_ANNOTATIONS: + // + // DELIVERY ANNOTATIONS (optional) + // + last_section = QD_DEPTH_DELIVERY_ANNOTATIONS; + rc = message_check_depth_LH(content, QD_DEPTH_DELIVERY_ANNOTATIONS, + DELIVERY_ANNOTATION_LONG, DELIVERY_ANNOTATION_SHORT, TAGS_MAP, + &content->section_delivery_annotation, true); + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_DELIVERY_ANNOTATIONS) + break; - // - // PROPERTIES - // - if (0 == qd_check_field_LH(content, QD_DEPTH_PROPERTIES, - PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, &content->section_message_properties, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid message properties"); - return false; - } - if (depth == QD_DEPTH_PROPERTIES) - return true; + // fallthrough - // - // APPLICATION PROPERTIES - // - if (0 == qd_check_field_LH(content, QD_DEPTH_APPLICATION_PROPERTIES, - APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, &content->section_application_properties, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid application-properties"); - return false; - } - if (depth == QD_DEPTH_APPLICATION_PROPERTIES) - return true; + case QD_DEPTH_MESSAGE_ANNOTATIONS: + // + // MESSAGE ANNOTATION (optional) + // + last_section = QD_DEPTH_MESSAGE_ANNOTATIONS; + rc = message_check_depth_LH(content, QD_DEPTH_MESSAGE_ANNOTATIONS, + MESSAGE_ANNOTATION_LONG, MESSAGE_ANNOTATION_SHORT, TAGS_MAP, + &content->section_message_annotation, true); + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_MESSAGE_ANNOTATIONS) + break; - // - // BODY - // Note that this function expects a limited set of types in a VALUE section. This is - // not a problem for messages passing through Dispatch because through-only messages won't - // be parsed to BODY-depth. - // - if (0 == qd_check_field_LH(content, QD_DEPTH_BODY, - BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, &content->section_body, 1)) { - qd_error(QD_ERROR_MESSAGE, "Invalid body data"); - return false; - } - if (0 == qd_check_field_LH(content, QD_DEPTH_BODY, - BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, &content->section_body, 1)) { - qd_error(QD_ERROR_MESSAGE, "Invalid body sequence"); - return false; - } - if (0 == qd_check_field_LH(content, QD_DEPTH_BODY, - BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, &content->section_body, 0)) { - qd_error(QD_ERROR_MESSAGE, "Invalid body value"); - return false; - } - if (depth == QD_DEPTH_BODY) - return true; + // fallthough - // - // FOOTER - // - if (0 == qd_check_field_LH(content, QD_DEPTH_ALL, - FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, &content->section_footer, 0)) { + case QD_DEPTH_PROPERTIES: + // + // PROPERTIES (optional) + // + last_section = QD_DEPTH_PROPERTIES; + rc = message_check_depth_LH(content, QD_DEPTH_PROPERTIES, + PROPERTIES_LONG, PROPERTIES_SHORT, TAGS_LIST, + &content->section_message_properties, true); + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_PROPERTIES) + break; - qd_error(QD_ERROR_MESSAGE, "Invalid footer"); - return false; + // fallthrough + + case QD_DEPTH_APPLICATION_PROPERTIES: + // + // APPLICATION PROPERTIES (optional) + // + last_section = QD_DEPTH_APPLICATION_PROPERTIES; + rc = message_check_depth_LH(content, QD_DEPTH_APPLICATION_PROPERTIES, + APPLICATION_PROPERTIES_LONG, APPLICATION_PROPERTIES_SHORT, TAGS_MAP, + &content->section_application_properties, true); + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_APPLICATION_PROPERTIES) + break; + + // fallthrough + + case QD_DEPTH_BODY: + // In the case of multi-buffer streaming we may discard buffers that + // contain only the Body or Footer section for those messages that are + // through-only. We really cannot validate those sections if that should happen + // + if (content->buffers_freed) + return QD_MESSAGE_DEPTH_OK; + + // + // BODY (not optional, but proton allows it - see PROTON-2085) + // + // AMQP 1.0 defines 3 valid Body types: Binary, Sequence (list), or Value (any type) + // Since the body is mandatory, we need to match one of these. Setting + // the optional flag to false will force us to check each one until a match is found. + // + last_section = QD_DEPTH_BODY; + rc = message_check_depth_LH(content, QD_DEPTH_BODY, + BODY_VALUE_LONG, BODY_VALUE_SHORT, TAGS_ANY, + &content->section_body, false); + if (rc == QD_MESSAGE_DEPTH_INVALID) { // may be a different body type, need to check: + rc = message_check_depth_LH(content, QD_DEPTH_BODY, + BODY_DATA_LONG, BODY_DATA_SHORT, TAGS_BINARY, + &content->section_body, false); + if (rc == QD_MESSAGE_DEPTH_INVALID) { + rc = message_check_depth_LH(content, QD_DEPTH_BODY, + BODY_SEQUENCE_LONG, BODY_SEQUENCE_SHORT, TAGS_LIST, + &content->section_body, true); // PROTON-2085 + } + } + + if (rc != QD_MESSAGE_DEPTH_OK || depth == QD_DEPTH_BODY) + break; + + // fallthrough + + case QD_DEPTH_ALL: + // + // FOOTER (optional) + // + if (content->buffers_freed) // see above + return QD_MESSAGE_DEPTH_OK; + + last_section = QD_DEPTH_ALL; + rc = message_check_depth_LH(content, QD_DEPTH_ALL, + FOOTER_LONG, FOOTER_SHORT, TAGS_MAP, + &content->section_footer, true); + break; + + default: + assert(false); // should not happen! + qd_error(QD_ERROR_MESSAGE, "BUG! Invalid message depth specified: %d", + content->parse_depth + 1); + return QD_MESSAGE_DEPTH_INVALID; } - return true; + if (rc == QD_MESSAGE_DEPTH_INVALID) + qd_error(QD_ERROR_MESSAGE, "Invalid message: %s section invalid", + section_names[last_section]); + + return rc; } -int qd_message_check(qd_message_t *in_msg, qd_message_depth_t depth) +qd_message_depth_status_t qd_message_check_depth(const qd_message_t *in_msg, qd_message_depth_t depth) { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; qd_message_content_t *content = msg->content; - int result; + qd_message_depth_status_t result; LOCK(content->lock); result = qd_message_check_LH(content, depth); diff --git a/src/python_embedded.c b/src/python_embedded.c index 243ab0a..c822925 100644 --- a/src/python_embedded.c +++ b/src/python_embedded.c @@ -520,7 +520,7 @@ static void qd_io_rx_handler(void *context, qd_message_t *msg, int link_id, int // // Parse the message through the body and exit if the message is not well formed. // - if (!qd_message_check(msg, QD_DEPTH_BODY)) + if (qd_message_check_depth(msg, QD_DEPTH_BODY) != QD_MESSAGE_DEPTH_OK) return; // This is called from non-python threads so we need to acquire the GIL to use python APIS. diff --git a/src/router_core/exchange_bindings.c b/src/router_core/exchange_bindings.c index 0f5908c..dc3b495 100644 --- a/src/router_core/exchange_bindings.c +++ b/src/router_core/exchange_bindings.c @@ -192,7 +192,7 @@ int qdr_forward_exchange_CT(qdr_core_t *core, if (!presettled) in_delivery->settled = true; - qd_iterator_t *subject = qd_message_check(msg, QD_DEPTH_PROPERTIES) + qd_iterator_t *subject = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT) : NULL; next_hop_list_t transmit_list; diff --git a/src/router_core/modules/edge_router/addr_proxy.c b/src/router_core/modules/edge_router/addr_proxy.c index 31f9fdc..61fbf59 100644 --- a/src/router_core/modules/edge_router/addr_proxy.c +++ b/src/router_core/modules/edge_router/addr_proxy.c @@ -428,7 +428,7 @@ static void on_transfer(void *link_context, // // Validate the message // - if (qd_message_check(msg, QD_DEPTH_BODY)) { + if (qd_message_check_depth(msg, QD_DEPTH_BODY) == QD_MESSAGE_DEPTH_OK) { // // Get the message body. It must be a list with two elements. The first is an address // and the second is a boolean indicating whether that address has upstream destinations. diff --git a/src/router_node.c b/src/router_node.c index 1e5ee6a..ef1ce17 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -465,20 +465,19 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link) // using the address from the link target. // qd_message_depth_t validation_depth = (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS; - bool valid_message = qd_message_check(msg, validation_depth); + qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, validation_depth); - if (!valid_message) { - if (receive_complete) { - // - // The entire message has been received and the message is still invalid. Reject the message. - // + if (depth_valid != QD_MESSAGE_DEPTH_OK) { + if (depth_valid == QD_MESSAGE_DEPTH_INVALID) { qd_message_set_discard(msg, true); pn_link_flow(pn_link, 1); pn_delivery_update(pnd, PN_REJECTED); pn_delivery_settle(pnd); qd_message_free(msg); + } else { + // otherwise wait until more data arrives and re-try the validation + assert(depth_valid == QD_MESSAGE_DEPTH_INCOMPLETE); } - // otherwise wait until more data arrives and re-try the validation return next_delivery; } diff --git a/tests/message_test.c b/tests/message_test.c index fa30d94..bc786a1 100644 --- a/tests/message_test.c +++ b/tests/message_test.c @@ -25,12 +25,12 @@ #include <qpid/dispatch/amqp.h> #include <proton/message.h> -static char buffer[10000]; +static unsigned char buffer[10000]; static size_t flatten_bufs(qd_message_content_t *content) { - char *cursor = buffer; - qd_buffer_t *buf = DEQ_HEAD(content->buffers); + unsigned char *cursor = buffer; + qd_buffer_t *buf = DEQ_HEAD(content->buffers); while (buf) { memcpy(cursor, qd_buffer_base(buf), qd_buffer_size(buf)); @@ -42,9 +42,9 @@ static size_t flatten_bufs(qd_message_content_t *content) } -static void set_content(qd_message_content_t *content, size_t len) +static void set_content(qd_message_content_t *content, unsigned char *buffer, size_t len) { - char *cursor = buffer; + unsigned char *cursor = buffer; qd_buffer_t *buf; while (len > (size_t) (cursor - buffer)) { @@ -58,6 +58,7 @@ static void set_content(qd_message_content_t *content, size_t len) qd_buffer_insert(buf, segment); DEQ_INSERT_TAIL(content->buffers, buf); } + content->receive_complete = true; } @@ -85,7 +86,7 @@ static char* test_send_to_messenger(void *context) pn_message_t *pn_msg = pn_message(); size_t len = flatten_bufs(content); - int result = pn_message_decode(pn_msg, buffer, len); + int result = pn_message_decode(pn_msg, (char *)buffer, len); if (result != 0) { pn_message_free(pn_msg); qd_message_free(msg); @@ -111,7 +112,7 @@ static char* test_receive_from_messenger(void *context) pn_message_set_address(pn_msg, "test_addr_1"); size_t size = 10000; - int result = pn_message_encode(pn_msg, buffer, &size); + int result = pn_message_encode(pn_msg, (char *)buffer, &size); if (result != 0) { pn_message_free(pn_msg); return "Error in pn_message_encode"; @@ -120,13 +121,12 @@ static char* test_receive_from_messenger(void *context) qd_message_t *msg = qd_message(); qd_message_content_t *content = MSG_CONTENT(msg); - set_content(content, size); + set_content(content, buffer, size); - int valid = qd_message_check(msg, QD_DEPTH_ALL); - if (!valid) { + if (qd_message_check_depth(msg, QD_DEPTH_ALL) != QD_MESSAGE_DEPTH_OK) { pn_message_free(pn_msg); qd_message_free(msg); - return "qd_message_check returns 'invalid'"; + return "qd_message_check_depth returns 'invalid'"; } qd_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_TO); @@ -195,7 +195,7 @@ static char* test_message_properties(void *context) pn_message_set_correlation_id(pn_msg, cid); size_t size = 10000; - int result = pn_message_encode(pn_msg, buffer, &size); + int result = pn_message_encode(pn_msg, (char *)buffer, &size); pn_message_free(pn_msg); if (result != 0) return "Error in pn_message_encode"; @@ -203,7 +203,7 @@ static char* test_message_properties(void *context) qd_message_t *msg = qd_message(); qd_message_content_t *content = MSG_CONTENT(msg); - set_content(content, size); + set_content(content, buffer, size); qd_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_CORRELATION_ID); if (!iter) { @@ -266,42 +266,93 @@ static char* test_message_properties(void *context) } +// run qd_message_check_depth against different legal AMQP message +// +static char* _check_all_depths(qd_message_t *msg) +{ + static const qd_message_depth_t depths[] = { + // yep: purposely out of order + QD_DEPTH_MESSAGE_ANNOTATIONS, + QD_DEPTH_DELIVERY_ANNOTATIONS, + QD_DEPTH_PROPERTIES, + QD_DEPTH_HEADER, + QD_DEPTH_APPLICATION_PROPERTIES, + QD_DEPTH_BODY + }; + static const int n_depths = 6; + + static char err[1024]; + + for (int i = 0; i < n_depths; ++i) { + if (qd_message_check_depth(msg, depths[i]) != QD_MESSAGE_DEPTH_OK) { + snprintf(err, 1023, + "qd_message_check_depth returned 'invalid' for section 0x%X", (unsigned int)depths[i]); + err[1023] = 0; + return err; + } + } + return 0; +} + + static char* test_check_multiple(void *context) { + // case 1: a minimal encoded message + // pn_message_t *pn_msg = pn_message(); - pn_message_set_address(pn_msg, "test_addr_2"); size_t size = 10000; - int result = pn_message_encode(pn_msg, buffer, &size); + int result = pn_message_encode(pn_msg, (char *)buffer, &size); pn_message_free(pn_msg); if (result != 0) return "Error in pn_message_encode"; qd_message_t *msg = qd_message(); qd_message_content_t *content = MSG_CONTENT(msg); - set_content(content, size); - - int valid = qd_message_check(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); - if (!valid) { - qd_message_free(msg); - return "qd_message_check returns 'invalid' for DELIVERY_ANNOTATIONS"; - } - - valid = qd_message_check(msg, QD_DEPTH_BODY); - if (!valid) { - qd_message_free(msg); - return "qd_message_check returns 'invalid' for BODY"; - } - - valid = qd_message_check(msg, QD_DEPTH_PROPERTIES); - if (!valid) { - qd_message_free(msg); - return "qd_message_check returns 'invalid' for PROPERTIES"; - } - + set_content(content, buffer, size); + char *rc = _check_all_depths(msg); qd_message_free(msg); + if (rc) return rc; - return 0; + // case 2: minimal, with address field in header + // + pn_msg = pn_message(); + pn_message_set_address(pn_msg, "test_addr_2"); + size = 10000; + result = pn_message_encode(pn_msg, (char *)buffer, &size); + pn_message_free(pn_msg); + if (result != 0) return "Error in pn_message_encode"; + msg = qd_message(); + set_content(MSG_CONTENT(msg), buffer, size); + rc = _check_all_depths(msg); + qd_message_free(msg); + if (rc) return rc; + + // case 3: null body + // + pn_msg = pn_message(); + pn_data_t *body = pn_message_body(pn_msg); + pn_data_put_null(body); + size = 10000; + result = pn_message_encode(pn_msg, (char *)buffer, &size); + pn_message_free(pn_msg); + if (result != 0) return "Error in pn_message_encode"; + msg = qd_message(); + set_content(MSG_CONTENT(msg), buffer, size); + rc = _check_all_depths(msg); + qd_message_free(msg); + if (rc) return rc; + + // case 4: minimal legal AMQP 1.0 message (as defined by the standard) + // A single body field with a null value + const unsigned char null_body[] = {0x00, 0x53, 0x77, 0x40}; + size = sizeof(null_body); + memcpy(buffer, null_body, size); + msg = qd_message(); + set_content(MSG_CONTENT(msg), buffer, size); + rc = _check_all_depths(msg); + qd_message_free(msg); + return rc; } @@ -334,7 +385,7 @@ static char* test_send_message_annotations(void *context) pn_message_t *pn_msg = pn_message(); size_t len = flatten_bufs(content); - int result = pn_message_decode(pn_msg, buffer, len); + int result = pn_message_decode(pn_msg, (char *)buffer, len); if (result != 0) { qd_message_free(msg); return "Error in pn_message_decode"; @@ -415,6 +466,216 @@ static char* test_q2_input_holdoff_sensing(void *context) } +// verify that message check does not incorrectly validate a message section +// that has not been completely received. +// +static char *test_incomplete_annotations(void *context) +{ + const char big_string[] = + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" + "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789"; + + char *result = 0; + qd_message_t *msg = 0; + pn_message_t *out_message = pn_message(); + + pn_data_t *body = pn_message_body(out_message); + pn_data_clear(body); + pn_data_put_list(body); + pn_data_enter(body); + pn_data_put_long(body, 1); + pn_data_put_long(body, 2); + pn_data_put_long(body, 3); + pn_data_exit(body); + + // Add a bunch 'o user message annotations + pn_data_t *annos = pn_message_annotations(out_message); + pn_data_clear(annos); + pn_data_put_map(annos); + pn_data_enter(annos); + + pn_data_put_symbol(annos, pn_bytes(strlen("my-key"), "my-key")); + pn_data_put_string(annos, pn_bytes(strlen("my-data"), "my-data")); + + pn_data_put_symbol(annos, pn_bytes(strlen("my-other-key"), "my-other-key")); + pn_data_put_string(annos, pn_bytes(strlen("my-other-data"), "my-other-data")); + + // embedded map + pn_data_put_symbol(annos, pn_bytes(strlen("my-map"), "my-map")); + pn_data_put_map(annos); + pn_data_enter(annos); + pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key1"), "my-map-key1")); + pn_data_put_char(annos, 'X'); + pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key2"), "my-map-key2")); + pn_data_put_byte(annos, 0x12); + pn_data_put_symbol(annos, pn_bytes(strlen("my-map-key3"), "my-map-key3")); + pn_data_put_string(annos, pn_bytes(strlen("Are We Not Men?"), "Are We Not Men?")); + pn_data_put_symbol(annos, pn_bytes(strlen("my-last-key"), "my-last-key")); + pn_data_put_binary(annos, pn_bytes(sizeof(big_string), big_string)); + pn_data_exit(annos); + + pn_data_put_symbol(annos, pn_bytes(strlen("my-ulong"), "my-ulong")); + pn_data_put_ulong(annos, 0xDEADBEEFCAFEBEEF); + + // embedded list + pn_data_put_symbol(annos, pn_bytes(strlen("my-list"), "my-list")); + pn_data_put_list(annos); + pn_data_enter(annos); + pn_data_put_string(annos, pn_bytes(sizeof(big_string), big_string)); + pn_data_put_double(annos, 3.1415); + pn_data_put_short(annos, 1966); + pn_data_exit(annos); + + pn_data_put_symbol(annos, pn_bytes(strlen("my-bool"), "my-bool")); + pn_data_put_bool(annos, false); + + pn_data_exit(annos); + + // now encode it + + size_t encode_len = sizeof(buffer); + int rc = pn_message_encode(out_message, (char *)buffer, &encode_len); + if (rc) { + if (rc == PN_OVERFLOW) + result = "Error: sizeof(buffer) in message_test.c too small - update it!"; + else + result = "Error encoding message"; + goto exit; + } + + assert(encode_len > 100); // you broke the test! + + // Verify that the message check fails unless the entire annotations are + // present. First copy in only the first 100 bytes: enough for the MA + // section header but not the whole section + + msg = qd_message(); + qd_message_content_t *content = MSG_CONTENT(msg); + set_content(content, buffer, 100); + content->receive_complete = false; // more data coming! + if (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) != QD_MESSAGE_DEPTH_INCOMPLETE) { + result = "Error: incomplete message was not detected!"; + goto exit; + } + + // now complete the message + set_content(content, &buffer[100], encode_len - 100); + if (qd_message_check_depth(msg, QD_DEPTH_MESSAGE_ANNOTATIONS) != QD_MESSAGE_DEPTH_OK) { + result = "Error: expected message to be valid!"; + } + +exit: + + if (out_message) pn_message_free(out_message); + if (msg) qd_message_free(msg); + + return result; +} + + +static char *test_check_weird_messages(void *context) +{ + char *result = 0; + qd_message_t *msg = qd_message(); + + // case 1: + // delivery annotations with empty map + unsigned char da_map[] = {0x00, 0x80, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x71, + 0xc1, 0x01, 0x00}; + // first test an incomplete pattern: + set_content(MSG_CONTENT(msg), da_map, 4); + MSG_CONTENT(msg)->receive_complete = false; + qd_message_depth_status_t mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); + if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { + result = "Expected INCOMPLETE status"; + goto exit; + } + + // full pattern, but no tag + set_content(MSG_CONTENT(msg), &da_map[4], 6); + MSG_CONTENT(msg)->receive_complete = false; + mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); + if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { + result = "Expected INCOMPLETE status"; + goto exit; + } + + // add tag, but incomplete field: + set_content(MSG_CONTENT(msg), &da_map[10], 1); + MSG_CONTENT(msg)->receive_complete = false; + mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); + if (mc != QD_MESSAGE_DEPTH_INCOMPLETE) { + result = "Expected INCOMPLETE status"; + goto exit; + } + + // and finish up + set_content(MSG_CONTENT(msg), &da_map[11], 2); + mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); + if (mc != QD_MESSAGE_DEPTH_OK) { + result = "Expected OK status"; + goto exit; + } + + // case 2: negative test - detect invalid tag + unsigned char bad_hdr[] = {0x00, 0x53, 0x70, 0xC1}; // 0xc1 == map, not list! + qd_message_free(msg); + msg = qd_message(); + set_content(MSG_CONTENT(msg), bad_hdr, sizeof(bad_hdr)); + MSG_CONTENT(msg)->receive_complete = false; + mc = qd_message_check_depth(msg, QD_DEPTH_DELIVERY_ANNOTATIONS); // looking _past_ header! + if (mc != QD_MESSAGE_DEPTH_INVALID) { + result = "Bad tag not detected!"; + goto exit; + } + + // case 3: check the valid body types + unsigned char body_bin[] = {0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x75, + 0xA0, 0x03, 0x00, 0x01, 0x02}; + qd_message_free(msg); + msg = qd_message(); + set_content(MSG_CONTENT(msg), body_bin, sizeof(body_bin)); + mc = qd_message_check_depth(msg, QD_DEPTH_ALL); // looking _past_ header! + if (mc != QD_MESSAGE_DEPTH_OK) { + result = "Expected OK bin body"; + goto exit; + } + + unsigned char body_seq[] = {0x00, 0x53, 0x76, 0x45}; + qd_message_free(msg); + msg = qd_message(); + set_content(MSG_CONTENT(msg), body_seq, sizeof(body_seq)); + mc = qd_message_check_depth(msg, QD_DEPTH_BODY); + if (mc != QD_MESSAGE_DEPTH_OK) { + result = "Expected OK seq body"; + goto exit; + } + + unsigned char body_value[] = {0x00, 0x53, 0x77, 0x51, 0x99}; + qd_message_free(msg); + msg = qd_message(); + set_content(MSG_CONTENT(msg), body_value, sizeof(body_value)); + mc = qd_message_check_depth(msg, QD_DEPTH_BODY); + if (mc != QD_MESSAGE_DEPTH_OK) { + result = "Expected OK value body"; + goto exit; + } + +exit: + if (msg) qd_message_free(msg); + return result; +} + + int message_tests(void) { int result = 0; @@ -426,6 +687,8 @@ int message_tests(void) TEST_CASE(test_check_multiple, 0); TEST_CASE(test_send_message_annotations, 0); TEST_CASE(test_q2_input_holdoff_sensing, 0); + TEST_CASE(test_incomplete_annotations, 0); + TEST_CASE(test_check_weird_messages, 0); return result; } diff --git a/tests/run_unit_tests_size.c b/tests/run_unit_tests_size.c index 25c467f..ec4f88f 100644 --- a/tests/run_unit_tests_size.c +++ b/tests/run_unit_tests_size.c @@ -20,6 +20,9 @@ #include <qpid/dispatch/buffer.h> #include <qpid/dispatch/alloc.h> +void qd_log_initialize(void); +void qd_error_initialize(); + int message_tests(); int field_tests(); int parse_tests(); @@ -36,6 +39,8 @@ int main(int argc, char** argv) } qd_alloc_initialize(); + qd_log_initialize(); + qd_error_initialize(); qd_buffer_set_size(buffer_size); int result = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org