This is an automated email from the ASF dual-hosted git repository. bcall pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit 16efdabf738a80af7edbb7566a354e5076b089e1 Author: bneradt <bner...@verizonmedia.com> AuthorDate: Thu Feb 27 23:41:28 2020 +0000 traffic_dump: Fixing content:size collection. traffic_dump was collecting the client-request body information too early, resulting in an incorrect content:size value (it was always zero). This holds off on collecting the body size until later so the value is accurate. A traffic_dump AuTest is being added to verify correct behavior with requests containing bodies. Also, this updates the Traffic Dump post_process.py script to add proxy-request and server-response nodes to transactions without them because the proxy replied locally to the request. This is useful in replay scenarios in which the test version of Traffic Server likely won't have the cached responses. (cherry picked from commit ddaf9e5f42930d01570904214cd6086c0f1de24b) --- plugins/experimental/traffic_dump/post_process.py | 59 +++++++++++++++++----- plugins/experimental/traffic_dump/traffic_dump.cc | 51 +++++++++++++------ .../traffic_dump/gold/post_with_body.gold | 8 +++ .../pluginTest/traffic_dump/traffic_dump.test.py | 45 +++++++++++++++++ .../pluginTest/traffic_dump/verify_replay.py | 28 +++++++++- 5 files changed, 162 insertions(+), 29 deletions(-) diff --git a/plugins/experimental/traffic_dump/post_process.py b/plugins/experimental/traffic_dump/post_process.py index dd4a7ea..d2dbe96 100755 --- a/plugins/experimental/traffic_dump/post_process.py +++ b/plugins/experimental/traffic_dump/post_process.py @@ -109,11 +109,14 @@ def verify_response(response): raise VerifyResponseError("Response did not have a status.") -def verify_transaction(transaction): +def verify_transaction(transaction, fabricate_proxy_requests=False): """ Function to verify that a transaction looks complete. Args: transaction (json object) + fabricate_proxy_requests (bool) Whether the post-processor should + fabricate proxy requests if they don't exist because the proxy served + the response locally. Raises: VerifySessionError if there is no transaction. @@ -128,12 +131,19 @@ def verify_transaction(transaction): else: verify_request(transaction["client-request"]) + if "proxy-request" not in transaction and fabricate_proxy_requests: + if "proxy-response" not in transaction: + raise VerifyRequestError('proxy-response not found in transaction with a client-request') + transaction["proxy-request"] = transaction["client-request"] + if "server-response" not in transaction: + transaction["server-response"] = transaction["proxy-response"] + # proxy-response nodes can be empty. if "proxy-response" not in transaction: raise VerifyResponseError('proxy-response not found in transaction') if "proxy-request" in transaction or "server-response" in transaction: - # proxy-request nodes can be empty. + # proxy-request nodes can be empty, so no need to verify_response. if "proxy-request" not in transaction: raise VerifyRequestError('proxy-request not found in transaction') @@ -143,13 +153,16 @@ def verify_transaction(transaction): verify_response(transaction["server-response"]) -def verify_session(session): +def verify_session(session, fabricate_proxy_requests=False): """ Function to verify that a session looks complete. A valid session contains a valid list of transactions. Args: transaction (json object) + fabricate_proxy_requests (bool) Whether the post-processor should + fabricate proxy requests if they don't exist because the proxy served + the response locally. Raises: VerifyError if there is a problem with the session. @@ -159,7 +172,7 @@ def verify_session(session): if "transactions" not in session or not session["transactions"]: raise VerifySessionError('No transactions found in session.') for transaction in session["transactions"]: - verify_transaction(transaction) + verify_transaction(transaction, fabricate_proxy_requests) def write_sessions(sessions, filename, indent): @@ -203,14 +216,14 @@ def parse_json(replay_file): try: parsed_json = json.load(fd) except Exception as e: - message = e.split(':')[0] - logging.exception("Failed to load %s as a JSON object.", replay_file) + message = e.msg.split(':')[0] + logging.error("Failed to load %s as a JSON object: %s", replay_file, e) raise ParseJSONError(message) return parsed_json -def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir): +def readAndCombine(replay_dir, num_sessions_per_file, indent, fabricate_proxy_requests, out_dir): """ Read raw dump files, filter out incomplete sessions, and merge them into output files. @@ -218,6 +231,9 @@ def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir): replay_dir (string) Full path to dumps num_sessions_per_file (int) number of sessions in each output file indent (int) The number of spaces per line in the output replay files. + fabricate_proxy_requests (bool) Whether the post-processor should + fabricate proxy requests if they don't exist because the proxy served + the response locally. out_dir (string) Output directory for post-processed json files. """ session_count = 0 @@ -236,12 +252,12 @@ def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir): try: parsed_json = parse_json(replay_file) except ParseJSONError as e: - error_count[e] += e + error_count[e.message] += 1 continue for session in parsed_json["sessions"]: try: - verify_session(session) + verify_session(session, fabricate_proxy_requests) except VerifyError as e: connection_time = session['connection-time'] if not connection_time: @@ -266,7 +282,7 @@ def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir): return session_count, transaction_count, error_count -def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, single_line, cnt_q): +def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, single_line, fabricate_proxy_requests, cnt_q): """ Function used to set up individual threads. Each thread loops over the subdir_q, pulls a directory from there, and @@ -284,6 +300,9 @@ def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, single_line, into a single replay file. single_line (bool) Whether to emit replay files as a single line. If false, the file is spaced out in a human readable fashion. + fabricate_proxy_requests (bool) Whether the post-processor should + fabricate proxy requests if they don't exist because the proxy served + the response locally. cnt_q (Queue) Session, transaction, error count queue populated by each thread. """ @@ -293,7 +312,7 @@ def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, single_line, indent = 2 if single_line: indent = None - cnt = readAndCombine(subdir_path, num_sessions_per_file, indent, out_dir) + cnt = readAndCombine(subdir_path, num_sessions_per_file, indent, fabricate_proxy_requests, out_dir) cnt_q.put(cnt) @@ -331,6 +350,19 @@ def parse_args(): files that are spaced out in a human readable format. This turns off that behavior and leaves the files as single-line entries.''') + parser.add_argument("--no-fabricate-proxy-requests", action="store_true", + help='''By default, post processor will fabricate proxy + requests and server responses for transactions served + out of the proxy. Presumably in replay conditions, + these fabricated requests and responses will not hurt + anything because the Proxy Verifier server will not + notice if the proxy replies locally in replay + conditions. However, if it doesn't reply locally, then + the server will not know how to reply to these + requests. Using this option turns off this fabrication + behavior.''') + parser.add_argument("-j", "--num_threads", type=int, default=32, + help='''The maximum number of threads to use.''') parser.add_argument("-d", "--debug", action="store_true", help="Enable debug level logging.") return parser.parse_args() @@ -352,13 +384,14 @@ def main(): subdir_q.put(subdir) threads = [] - nthreads = min(max(subdir_q.qsize(), 1), 32) + nthreads = min(max(subdir_q.qsize(), 1), args.num_threads) # Start up the threads. for i in range(nthreads): t = Thread(target=post_process, args=(args.in_dir, subdir_q, args.out_dir, - args.num_sessions, args.no_human_readable, cnt_q)) + args.num_sessions, args.no_human_readable, + not args.no_fabricate_proxy_requests, cnt_q)) t.start() threads.append(t) diff --git a/plugins/experimental/traffic_dump/traffic_dump.cc b/plugins/experimental/traffic_dump/traffic_dump.cc index f915ce5..b802c91 100644 --- a/plugins/experimental/traffic_dump/traffic_dump.cc +++ b/plugins/experimental/traffic_dump/traffic_dump.cc @@ -225,9 +225,21 @@ remove_scheme_prefix(std::string_view url) return url; } -/// Helper functions to collect txn information from TSMBuffer +/// Write the content node. +// +/// "content" +/// "encoding" +/// "size" std::string -collect_headers(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t body_bytes) +write_content_node(int64_t num_body_bytes) +{ + return std::string(R"(,"content":{"encoding":"plain","size":)" + std::to_string(num_body_bytes) + '}'); +} + +/// Read the txn information from TSMBuffer and write the header information. +/// This function does not write the content node. +std::string +write_message_node_no_content(TSMBuffer &buffer, TSMLoc &hdr_loc) { std::string result = "{"; int len = 0; @@ -243,12 +255,12 @@ collect_headers(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t body_bytes) TSAssert(TS_SUCCESS == TSHttpHdrUrlGet(buffer, hdr_loc, &url_loc)); // 2. "scheme": cp = TSUrlSchemeGet(buffer, url_loc, &len); - TSDebug(PLUGIN_NAME, "collect_headers(): found scheme %.*s ", len, cp); + TSDebug(PLUGIN_NAME, "write_message_node(): found scheme %.*s ", len, cp); result += "," + json_entry("scheme", cp, len); // 3. "method":(string) cp = TSHttpHdrMethodGet(buffer, hdr_loc, &len); - TSDebug(PLUGIN_NAME, "collect_headers(): found method %.*s ", len, cp); + TSDebug(PLUGIN_NAME, "write_message_node(): found method %.*s ", len, cp); result += "," + json_entry("method", cp, len); // 4. "url" @@ -267,7 +279,7 @@ collect_headers(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t body_bytes) url_string = remove_scheme_prefix(url_string); } - TSDebug(PLUGIN_NAME, "collect_headers(): found host target %.*s", static_cast<int>(url_string.size()), url_string.data()); + TSDebug(PLUGIN_NAME, "write_message_node(): found host target %.*s", static_cast<int>(url_string.size()), url_string.data()); result += "," + json_entry("url", url_string.data(), url_string.size()); TSfree(url); TSHandleMLocRelease(buffer, hdr_loc, url_loc); @@ -280,11 +292,6 @@ collect_headers(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t body_bytes) // 3. "encoding" } - // "content" - // "encoding" - // "size" - result += R"(,"content":{"encoding":"plain","size":)" + std::to_string(body_bytes) + '}'; - // "headers": [[name(string), value(string)]] result += R"(,"headers":{"encoding":"esc_json", "fields": [)"; TSMLoc field_loc = TSMimeHdrFieldGet(buffer, hdr_loc, 0); @@ -305,8 +312,17 @@ collect_headers(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t body_bytes) result += ","; } } + return result += "]}"; +} - return result + "]}}"; +/// Read the txn information from TSMBuffer and write the header information including +/// the content node describing the body characteristics. +std::string +write_message_node(TSMBuffer &buffer, TSMLoc &hdr_loc, int64_t num_body_bytes) +{ + std::string result = write_message_node_no_content(buffer, hdr_loc); + result += write_content_node(num_body_bytes); + return result + "}"; } // Per session AIO handler: update AIO counts and clean up @@ -405,7 +421,9 @@ session_txn_handler(TSCont contp, TSEvent event, void *edata) TSMLoc hdr_loc; if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &buffer, &hdr_loc)) { TSDebug(PLUGIN_NAME, "Found client request"); - txn_info += R"(,"client-request":)" + collect_headers(buffer, hdr_loc, TSHttpTxnClientReqBodyBytesGet(txnp)); + // We don't have an accurate view of the body size until TXN_CLOSE so we hold + // off on writing the content:size node until then. + txn_info += R"(,"client-request":)" + write_message_node_no_content(buffer, hdr_loc); TSHandleMLocRelease(buffer, TS_NULL_MLOC, hdr_loc); buffer = nullptr; } @@ -417,21 +435,24 @@ session_txn_handler(TSCont contp, TSEvent event, void *edata) // proxy-request/response headers TSMBuffer buffer; TSMLoc hdr_loc; + if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &buffer, &hdr_loc)) { + txn_info += write_content_node(TSHttpTxnClientReqBodyBytesGet(txnp)) + "}"; + } if (TS_SUCCESS == TSHttpTxnServerReqGet(txnp, &buffer, &hdr_loc)) { TSDebug(PLUGIN_NAME, "Found proxy request"); - txn_info += R"(,"proxy-request":)" + collect_headers(buffer, hdr_loc, TSHttpTxnServerReqBodyBytesGet(txnp)); + txn_info += R"(,"proxy-request":)" + write_message_node(buffer, hdr_loc, TSHttpTxnServerReqBodyBytesGet(txnp)); TSHandleMLocRelease(buffer, TS_NULL_MLOC, hdr_loc); buffer = nullptr; } if (TS_SUCCESS == TSHttpTxnServerRespGet(txnp, &buffer, &hdr_loc)) { TSDebug(PLUGIN_NAME, "Found server response"); - txn_info += R"(,"server-response":)" + collect_headers(buffer, hdr_loc, TSHttpTxnServerRespBodyBytesGet(txnp)); + txn_info += R"(,"server-response":)" + write_message_node(buffer, hdr_loc, TSHttpTxnServerRespBodyBytesGet(txnp)); TSHandleMLocRelease(buffer, TS_NULL_MLOC, hdr_loc); buffer = nullptr; } if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &buffer, &hdr_loc)) { TSDebug(PLUGIN_NAME, "Found proxy response"); - txn_info += R"(,"proxy-response":)" + collect_headers(buffer, hdr_loc, TSHttpTxnClientRespBodyBytesGet(txnp)); + txn_info += R"(,"proxy-response":)" + write_message_node(buffer, hdr_loc, TSHttpTxnClientRespBodyBytesGet(txnp)); TSHandleMLocRelease(buffer, TS_NULL_MLOC, hdr_loc); buffer = nullptr; } diff --git a/tests/gold_tests/pluginTest/traffic_dump/gold/post_with_body.gold b/tests/gold_tests/pluginTest/traffic_dump/gold/post_with_body.gold new file mode 100644 index 0000000..1dad3b3 --- /dev/null +++ b/tests/gold_tests/pluginTest/traffic_dump/gold/post_with_body.gold @@ -0,0 +1,8 @@ +`` +> POST http://localhost:``/post_with_body HTTP/1.1 +> Host: www.example.com`` +> User-Agent: curl/`` +> Accept: */* +`` +< Server: ATS/`` +`` diff --git a/tests/gold_tests/pluginTest/traffic_dump/traffic_dump.test.py b/tests/gold_tests/pluginTest/traffic_dump/traffic_dump.test.py index b0d4d6d..afb75db 100644 --- a/tests/gold_tests/pluginTest/traffic_dump/traffic_dump.test.py +++ b/tests/gold_tests/pluginTest/traffic_dump/traffic_dump.test.py @@ -43,6 +43,13 @@ response_header = {"headers": "HTTP/1.1 200 OK" "\r\nConnection: close\r\nContent-Length: 0\r\n\r\n", "timestamp": "1469733493.993", "body": ""} server.addResponse("sessionfile.log", request_header, response_header) +request_header = {"headers": "GET /post_with_body HTTP/1.1\r\n" + "Host: www.example.com\r\nContent-Length: 0\r\n\r\n", + "timestamp": "1469733493.993", "body": ""} +response_header = {"headers": "HTTP/1.1 200 OK" + "\r\nConnection: close\r\nContent-Length: 0\r\n\r\n", + "timestamp": "1469733493.993", "body": ""} +server.addResponse("sessionfile.log", request_header, response_header) # Define ATS and configure ts = Test.MakeATSProcess("ts") @@ -69,6 +76,9 @@ ts.Streams.stderr = Testers.ContainsExpression( ts.Streams.stderr += Testers.ContainsExpression( "Initialized with sample pool size 1 bytes and disk limit 1000000000 bytes", "Verify traffic_dump initialized with the configured disk limit.") +ts.Streams.stderr += Testers.ContainsExpression( + "Finish a session with log file of.*bytes", + "Verify traffic_dump sees the end of sessions and accounts for it.") # Set up the json replay file expectations. replay_file_session_1 = os.path.join(replay_dir, "127", "0000000000000000") @@ -77,6 +87,8 @@ replay_file_session_2 = os.path.join(replay_dir, "127", "0000000000000001") ts.Disk.File(replay_file_session_2, exists=True) replay_file_session_3 = os.path.join(replay_dir, "127", "0000000000000002") ts.Disk.File(replay_file_session_3, exists=True) +replay_file_session_4 = os.path.join(replay_dir, "127", "0000000000000003") +ts.Disk.File(replay_file_session_4, exists=True) # # Test 1: Verify the correct behavior of two transactions across two sessions. @@ -153,3 +165,36 @@ tr.Processes.Default.Command = "python3 {0} {1} {2} --request-target '{3}'".form tr.Processes.Default.ReturnCode = 0 tr.StillRunningAfter = server tr.StillRunningAfter = ts + +# +# Test 3: Verify correct handling of a POST with body data. +# + +# Verify that an explicit path in the request line is recorded. +tr = Test.AddTestRun("Make a POST request with a body.") +request_target = "http://localhost:{0}/post_with_body".format(ts.Variables.port) + +# Send the replay file as the request body because it is conveniently already +# in the test run directory. +tr.Processes.Default.Command = ( + 'curl --data-binary @{0} --request-target "{1}" ' + 'http://127.0.0.1:{2} -H\'Host: www.example.com\' --verbose'.format( + verify_replay, request_target, ts.Variables.port)) +tr.Processes.Default.ReturnCode = 0 +tr.Processes.Default.Streams.stderr = "gold/post_with_body.gold" +tr.StillRunningAfter = server +tr.StillRunningAfter = ts + +tr = Test.AddTestRun("Verify the client-request size node has the expected value.") +tr.Setup.CopyAs(verify_replay, Test.RunDirectory) + +size_of_verify_replay_file = os.path.getsize(os.path.join(Test.TestDirectory, verify_replay)) +tr.Processes.Default.Command = \ + "python3 {0} {1} {2} --client-request-size {3}".format( + verify_replay, + os.path.join(Test.Variables.AtsTestToolsDir, 'lib', 'replay_schema.json'), + replay_file_session_4, + size_of_verify_replay_file) +tr.Processes.Default.ReturnCode = 0 +tr.StillRunningAfter = server +tr.StillRunningAfter = ts diff --git a/tests/gold_tests/pluginTest/traffic_dump/verify_replay.py b/tests/gold_tests/pluginTest/traffic_dump/verify_replay.py index 5a33061..532c92b 100644 --- a/tests/gold_tests/pluginTest/traffic_dump/verify_replay.py +++ b/tests/gold_tests/pluginTest/traffic_dump/verify_replay.py @@ -67,7 +67,27 @@ def verify_request_target(replay_json, request_target): print("The replay file did not have a first transaction with a url element.") return False - return url == request_target + if url != request_target: + print("Mismatched request target. Expected: {}, received: {}".format(request_target, url)) + return False + return True + + +def verify_client_request_size(replay_json, client_request_size): + """ + Verify that the 'url' element of the first transaction contains the request target. + """ + try: + size = int(replay_json['sessions'][0]['transactions'][0]['client-request']['content']['size']) + except KeyError: + print("The replay file did not have content size element in the first client-request.") + return False + + if size != client_request_size: + print("Mismatched client-request request size. Expected: {}, received: {}".format( + client_request_size, size)) + return False + return True def parse_args(): @@ -80,6 +100,9 @@ def parse_args(): help="The replay file to validate.") parser.add_argument("--request-target", help="The request target ('url' element) to expect in the replay file.") + parser.add_argument("--client-request-size", + type=int, + help="The expected size value in the client-request node.") return parser.parse_args() @@ -107,6 +130,9 @@ def main(): if args.request_target and not verify_request_target(replay_json, args.request_target): return 1 + if args.client_request_size and not verify_client_request_size(replay_json, args.client_request_size): + return 1 + return 0