This is an automated email from the ASF dual-hosted git repository. bcall pushed a commit to branch 9.0.x in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit 801243747c28224a22737a714e1125e8754a207e Author: bneradt <bner...@verizonmedia.com> AuthorDate: Tue Feb 25 16:48:07 2020 +0000 traffic_dump post_process.py (cherry picked from commit 17c48bfc16980496129cff2925ce46b6b0b3b05c) --- plugins/experimental/traffic_dump/post_process.py | 392 ++++++++++++++++++++++ 1 file changed, 392 insertions(+) diff --git a/plugins/experimental/traffic_dump/post_process.py b/plugins/experimental/traffic_dump/post_process.py new file mode 100755 index 0000000..dd4a7ea --- /dev/null +++ b/plugins/experimental/traffic_dump/post_process.py @@ -0,0 +1,392 @@ +#!/usr/bin/env python3 +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +''' + +from collections import defaultdict +from copy import deepcopy +from queue import Queue +from threading import Thread +import argparse +import json +import logging +import os +import sys + +description = ''' +Post process replay files produced by traffic_dump and clean it of any +incomplete transactions or sessions which got partially written because Traffic +Server was interrupted mid-connection. This also merges sessions to the same +client and, by default, formats the output files with human readable spacing. +''' + + +# Base replay file template with basic elements +TEMPLATE = json.loads('{"meta": {"version":"1.0"},"sessions":[]}') + + +class PostProcessError(Exception): + ''' Base class for post processing errors. + ''' + def __init__(self, message=None): + self.message = message + + def __str__(self, *args): + if self.message: + return self.message + else: + return 'PostProcessError raised' + + +class VerifyError(PostProcessError): + ''' Base class for node node verification errors. + ''' + pass + + +class VerifyRequestError(VerifyError): + ''' There was a problem verifying a request node. + ''' + pass + + +class VerifyResponseError(VerifyError): + ''' There was a problem verifying a response node. + ''' + pass + + +class VerifySessionError(VerifyError): + ''' There was a problem verifying a session node. + ''' + pass + + +def verify_request(request): + """ Function to verify request with method, url, and headers + Args: + request (json object) + + Raises: + VerifyRequestError if there is a problem with the request. + """ + if not request: + raise VerifyRequestError('No request found.') + if "method" not in request or not request["method"]: + raise VerifyRequestError("Request did not have a method.") + if "url" not in request or not request["url"]: + raise VerifyRequestError("Request did not have a url.") + if "headers" not in request or not request["headers"]: + raise VerifyRequestError("Request did not have headers.") + + +def verify_response(response): + """ Function to verify response with status + Args: + response (json object) + + Raises: + VerifyResponseError if there is a problem with the response. + """ + if not response: + raise VerifyResponseError("No response found.") + if "status" not in response or not response["status"]: + raise VerifyResponseError("Response did not have a status.") + + +def verify_transaction(transaction): + """ Function to verify that a transaction looks complete. + + Args: + transaction (json object) + + Raises: + VerifySessionError if there is no transaction. + VerifyRequestError if there is a problem with a request. + VerifyResponseError if there is a problem with a response. + """ + if not transaction: + raise VerifySessionError('No transaction found in the session.') + + if "client-request" not in transaction: + raise VerifyRequestError('client-request not found in transaction') + else: + verify_request(transaction["client-request"]) + + # proxy-response nodes can be empty. + if "proxy-response" not in transaction: + raise VerifyResponseError('proxy-response not found in transaction') + + if "proxy-request" in transaction or "server-response" in transaction: + # proxy-request nodes can be empty. + if "proxy-request" not in transaction: + raise VerifyRequestError('proxy-request not found in transaction') + + if "server-response" not in transaction: + raise VerifyResponseError('server-response not found in transaction') + else: + verify_response(transaction["server-response"]) + + +def verify_session(session): + """ Function to verify that a session looks complete. + + A valid session contains a valid list of transactions. + + Args: + transaction (json object) + + Raises: + VerifyError if there is a problem with the session. + """ + if not session: + raise VerifySessionError('Session not found.') + if "transactions" not in session or not session["transactions"]: + raise VerifySessionError('No transactions found in session.') + for transaction in session["transactions"]: + verify_transaction(transaction) + + +def write_sessions(sessions, filename, indent): + """ Write the JSON sessions to the given filename. + + Args: + sessions The parsed JSON sessions to dump into filename. + filename (string) The path to the file to write the parsed JSON file to. + indent (int) The number of spaces per line to write to the file. A + value of None causes the whole JSON file to be written as a single line. + """ + new_json = deepcopy(TEMPLATE) + new_json["sessions"] = deepcopy(sessions) + with open(filename, "w") as f: + json.dump(new_json, f, ensure_ascii=False, indent=indent) + logging.debug("{} has {} sessions".format(filename, len(sessions))) + + +class ParseJSONError(PostProcessError): + ''' There was an error opening or parsing the replay file. + ''' + pass + + +def parse_json(replay_file): + """ Open and parse the replay_file. + + Args: + replay_file (string) The file with JSON content to parse. + + Return: + The json package parsed JSON file or None if there was a problem + parsing the file. + """ + try: + fd = open(replay_file, 'r') + except Exception as e: + logging.exception("Failed to open %s.", replay_file) + raise ParseJSONError(e) + + try: + parsed_json = json.load(fd) + except Exception as e: + message = e.split(':')[0] + logging.exception("Failed to load %s as a JSON object.", replay_file) + raise ParseJSONError(message) + + return parsed_json + + +def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir): + """ Read raw dump files, filter out incomplete sessions, and merge + them into output files. + + Args: + replay_dir (string) Full path to dumps + num_sessions_per_file (int) number of sessions in each output file + indent (int) The number of spaces per line in the output replay files. + out_dir (string) Output directory for post-processed json files. + """ + session_count = 0 + batch_count = 0 + transaction_count = 0 + error_count = defaultdict(int) + + base_name = os.path.basename(replay_dir) + + sessions = [] + for f in os.listdir(replay_dir): + replay_file = os.path.join(replay_dir, f) + if not os.path.isfile(replay_file): + continue + + try: + parsed_json = parse_json(replay_file) + except ParseJSONError as e: + error_count[e] += e + continue + + for session in parsed_json["sessions"]: + try: + verify_session(session) + except VerifyError as e: + connection_time = session['connection-time'] + if not connection_time: + connection_time = session['start-time'] + if connection_time: + logging.debug("Omitting session in %s with connection-time: %d: %s", + replay_file, session['connection-time'], e) + else: + logging.debug("Omitting a session in %s, could not find a connection time: %s", + replay_file, e) + continue + sessions.append(session) + session_count += 1 + transaction_count += len(session["transactions"]) + if len(sessions) >= num_sessions_per_file: + write_sessions(sessions, "{}/{}_{}.json".format(out_dir, base_name, batch_count), indent) + sessions = [] + batch_count += 1 + if sessions: + write_sessions(sessions, "{}/{}_{}.json".format(out_dir, base_name, batch_count), indent) + + return session_count, transaction_count, error_count + + +def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, single_line, cnt_q): + """ Function used to set up individual threads. + + Each thread loops over the subdir_q, pulls a directory from there, and + process the replay files in that directory. The threads finish when the + subdir queue is empty, meaning each subdir has been processed. + + Args: + in_dir (string) Path to parent of the subdirectories in subdir_q. + subdir_q (Queue) Queue of subdir to read from. + out_dir (string) The directory into which the post processed replay files + are placed. + num_sessions_per_file (int) traffic_dump will emit a separate file per + session. This mechanism merges sessions within a single subdir. + num_sessions_per_file is the limit to the number of sessions merged + into a single replay file. + single_line (bool) Whether to emit replay files as a single line. If + false, the file is spaced out in a human readable fashion. + cnt_q (Queue) Session, transaction, error count queue populated by each + thread. + """ + while not subdir_q.empty(): + subdir = subdir_q.get() + subdir_path = os.path.join(in_dir, subdir) + indent = 2 + if single_line: + indent = None + cnt = readAndCombine(subdir_path, num_sessions_per_file, indent, out_dir) + cnt_q.put(cnt) + + +def configure_logging(use_debug=False): + ''' Configure the logging mechanism. + + Args: + use_debug (bool) Whether to configure debug-level logging. + ''' + log_format = '%(levelname)s: %(message)s' + if use_debug: + logging.basicConfig(format=log_format, level=logging.DEBUG) + else: + logging.basicConfig(format=log_format, level=logging.INFO) + + +def parse_args(): + ''' Parse the command line arguments. + ''' + parser = argparse.ArgumentParser(description=description) + + parser.add_argument("in_dir", type=str, + help='''The input directory of traffic_dump replay + files. The expectation is that this will contain + sub-directories that themselves contain replay files. + This is written to accommodate the directory populated + by traffic_dump via the --logdir option.''') + parser.add_argument("out_dir", type=str, + help="The output directory of post processed replay files.") + parser.add_argument("-n", "--num_sessions", type=int, default=10, + help='''The maximum number of sessions merged into + single replay output files. The default is 10.''') + parser.add_argument("--no-human-readable", action="store_true", + help='''By default, post processor will generate replay + files that are spaced out in a human readable format. + This turns off that behavior and leaves the files as + single-line entries.''') + parser.add_argument("-d", "--debug", action="store_true", + help="Enable debug level logging.") + return parser.parse_args() + + +def main(): + args = parse_args() + configure_logging(use_debug=args.debug) + logging.debug("Original options: %s", " ".join(sys.argv)) + + if not os.path.exists(args.out_dir): + os.mkdir(args.out_dir) + + # generate thread arguments + subdir_q = Queue() + cnt_q = Queue() + for subdir in os.listdir(args.in_dir): + if os.path.isdir(os.path.join(args.in_dir, subdir)): + subdir_q.put(subdir) + + threads = [] + nthreads = min(max(subdir_q.qsize(), 1), 32) + + # Start up the threads. + for i in range(nthreads): + t = Thread(target=post_process, + args=(args.in_dir, subdir_q, args.out_dir, + args.num_sessions, args.no_human_readable, cnt_q)) + t.start() + threads.append(t) + + # Wait for them to finish. + for t in threads: + t.join() + + # Retrieve the counts + session_count = 0 + transaction_count = 0 + errors = defaultdict(int) + for count_tuple in list(cnt_q.queue): + session_count += count_tuple[0] + transaction_count += count_tuple[1] + for e in count_tuple[2]: + errors[e] += count_tuple[2][e] + summary = "Total {} sessions and {} transactions.".format(session_count, transaction_count) + logging.info(summary) + if errors: + logging.info("Total errors:") + for e in errors: + logging.info("{}: {}".format(e, errors[e])) + else: + logging.info("Total errors: 0") + + with open("{}/summary.txt".format(args.out_dir), "w", encoding="ascii") as f: + f.write("{}\n".format(summary)) + + +if __name__ == "__main__": + sys.exit(main())