This is an automated email from the ASF dual-hosted git repository.

bcall pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit 801243747c28224a22737a714e1125e8754a207e
Author: bneradt <bner...@verizonmedia.com>
AuthorDate: Tue Feb 25 16:48:07 2020 +0000

    traffic_dump post_process.py
    
    (cherry picked from commit 17c48bfc16980496129cff2925ce46b6b0b3b05c)
---
 plugins/experimental/traffic_dump/post_process.py | 392 ++++++++++++++++++++++
 1 file changed, 392 insertions(+)

diff --git a/plugins/experimental/traffic_dump/post_process.py 
b/plugins/experimental/traffic_dump/post_process.py
new file mode 100755
index 0000000..dd4a7ea
--- /dev/null
+++ b/plugins/experimental/traffic_dump/post_process.py
@@ -0,0 +1,392 @@
+#!/usr/bin/env python3
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+'''
+
+from collections import defaultdict
+from copy import deepcopy
+from queue import Queue
+from threading import Thread
+import argparse
+import json
+import logging
+import os
+import sys
+
+description = '''
+Post process replay files produced by traffic_dump and clean it of any
+incomplete transactions or sessions which got partially written because Traffic
+Server was interrupted mid-connection. This also merges sessions to the same
+client and, by default, formats the output files with human readable spacing.
+'''
+
+
+# Base replay file template with basic elements
+TEMPLATE = json.loads('{"meta": {"version":"1.0"},"sessions":[]}')
+
+
+class PostProcessError(Exception):
+    ''' Base class for post processing errors.
+    '''
+    def __init__(self, message=None):
+        self.message = message
+
+    def __str__(self, *args):
+        if self.message:
+            return self.message
+        else:
+            return 'PostProcessError raised'
+
+
+class VerifyError(PostProcessError):
+    ''' Base class for node node verification errors.
+    '''
+    pass
+
+
+class VerifyRequestError(VerifyError):
+    ''' There was a problem verifying a request node.
+    '''
+    pass
+
+
+class VerifyResponseError(VerifyError):
+    ''' There was a problem verifying a response node.
+    '''
+    pass
+
+
+class VerifySessionError(VerifyError):
+    ''' There was a problem verifying a session node.
+    '''
+    pass
+
+
+def verify_request(request):
+    """ Function to verify request with method, url, and headers
+    Args:
+        request (json object)
+
+    Raises:
+        VerifyRequestError if there is a problem with the request.
+    """
+    if not request:
+        raise VerifyRequestError('No request found.')
+    if "method" not in request or not request["method"]:
+        raise VerifyRequestError("Request did not have a method.")
+    if "url" not in request or not request["url"]:
+        raise VerifyRequestError("Request did not have a url.")
+    if "headers" not in request or not request["headers"]:
+        raise VerifyRequestError("Request did not have headers.")
+
+
+def verify_response(response):
+    """ Function to verify response with status
+    Args:
+        response (json object)
+
+    Raises:
+        VerifyResponseError if there is a problem with the response.
+    """
+    if not response:
+        raise VerifyResponseError("No response found.")
+    if "status" not in response or not response["status"]:
+        raise VerifyResponseError("Response did not have a status.")
+
+
+def verify_transaction(transaction):
+    """ Function to verify that a transaction looks complete.
+
+    Args:
+        transaction (json object)
+
+    Raises:
+        VerifySessionError if there is no transaction.
+        VerifyRequestError if there is a problem with a request.
+        VerifyResponseError if there is a problem with a response.
+    """
+    if not transaction:
+        raise VerifySessionError('No transaction found in the session.')
+
+    if "client-request" not in transaction:
+        raise VerifyRequestError('client-request not found in transaction')
+    else:
+        verify_request(transaction["client-request"])
+
+    # proxy-response nodes can be empty.
+    if "proxy-response" not in transaction:
+        raise VerifyResponseError('proxy-response not found in transaction')
+
+    if "proxy-request" in transaction or "server-response" in transaction:
+        # proxy-request nodes can be empty.
+        if "proxy-request" not in transaction:
+            raise VerifyRequestError('proxy-request not found in transaction')
+
+        if "server-response" not in transaction:
+            raise VerifyResponseError('server-response not found in 
transaction')
+        else:
+            verify_response(transaction["server-response"])
+
+
+def verify_session(session):
+    """ Function to verify that a session looks complete.
+
+        A valid session contains a valid list of transactions.
+
+    Args:
+        transaction (json object)
+
+    Raises:
+        VerifyError if there is a problem with the session.
+    """
+    if not session:
+        raise VerifySessionError('Session not found.')
+    if "transactions" not in session or not session["transactions"]:
+        raise VerifySessionError('No transactions found in session.')
+    for transaction in session["transactions"]:
+        verify_transaction(transaction)
+
+
+def write_sessions(sessions, filename, indent):
+    """ Write the JSON sessions to the given filename.
+
+    Args:
+        sessions The parsed JSON sessions to dump into filename.
+        filename (string) The path to the file to write the parsed JSON file 
to.
+        indent (int) The number of spaces per line to write to the file. A
+          value of None causes the whole JSON file to be written as a single 
line.
+    """
+    new_json = deepcopy(TEMPLATE)
+    new_json["sessions"] = deepcopy(sessions)
+    with open(filename, "w") as f:
+        json.dump(new_json, f, ensure_ascii=False, indent=indent)
+        logging.debug("{} has {} sessions".format(filename, len(sessions)))
+
+
+class ParseJSONError(PostProcessError):
+    ''' There was an error opening or parsing the replay file.
+    '''
+    pass
+
+
+def parse_json(replay_file):
+    """ Open and parse the replay_file.
+
+    Args:
+        replay_file (string) The file with JSON content to parse.
+
+    Return:
+        The json package parsed JSON file or None if there was a problem
+        parsing the file.
+    """
+    try:
+        fd = open(replay_file, 'r')
+    except Exception as e:
+        logging.exception("Failed to open %s.", replay_file)
+        raise ParseJSONError(e)
+
+    try:
+        parsed_json = json.load(fd)
+    except Exception as e:
+        message = e.split(':')[0]
+        logging.exception("Failed to load %s as a JSON object.", replay_file)
+        raise ParseJSONError(message)
+
+    return parsed_json
+
+
+def readAndCombine(replay_dir, num_sessions_per_file, indent, out_dir):
+    """ Read raw dump files, filter out incomplete sessions, and merge
+    them into output files.
+
+    Args:
+        replay_dir (string) Full path to dumps
+        num_sessions_per_file (int) number of sessions in each output file
+        indent (int) The number of spaces per line in the output replay files.
+        out_dir (string) Output directory for post-processed json files.
+    """
+    session_count = 0
+    batch_count = 0
+    transaction_count = 0
+    error_count = defaultdict(int)
+
+    base_name = os.path.basename(replay_dir)
+
+    sessions = []
+    for f in os.listdir(replay_dir):
+        replay_file = os.path.join(replay_dir, f)
+        if not os.path.isfile(replay_file):
+            continue
+
+        try:
+            parsed_json = parse_json(replay_file)
+        except ParseJSONError as e:
+            error_count[e] += e
+            continue
+
+        for session in parsed_json["sessions"]:
+            try:
+                verify_session(session)
+            except VerifyError as e:
+                connection_time = session['connection-time']
+                if not connection_time:
+                    connection_time = session['start-time']
+                if connection_time:
+                    logging.debug("Omitting session in %s with 
connection-time: %d: %s",
+                                  replay_file, session['connection-time'], e)
+                else:
+                    logging.debug("Omitting a session in %s, could not find a 
connection time: %s",
+                                  replay_file, e)
+                continue
+            sessions.append(session)
+            session_count += 1
+            transaction_count += len(session["transactions"])
+        if len(sessions) >= num_sessions_per_file:
+            write_sessions(sessions, "{}/{}_{}.json".format(out_dir, 
base_name, batch_count), indent)
+            sessions = []
+            batch_count += 1
+    if sessions:
+        write_sessions(sessions, "{}/{}_{}.json".format(out_dir, base_name, 
batch_count), indent)
+
+    return session_count, transaction_count, error_count
+
+
+def post_process(in_dir, subdir_q, out_dir, num_sessions_per_file, 
single_line, cnt_q):
+    """ Function used to set up individual threads.
+
+    Each thread loops over the subdir_q, pulls a directory from there, and
+    process the replay files in that directory. The threads finish when the
+    subdir queue is empty, meaning each subdir has been processed.
+
+    Args:
+        in_dir (string) Path to parent of the subdirectories in subdir_q.
+        subdir_q (Queue) Queue of subdir to read from.
+        out_dir (string) The directory into which the post processed replay 
files
+          are placed.
+        num_sessions_per_file (int) traffic_dump will emit a separate file per
+          session. This mechanism merges sessions within a single subdir.
+          num_sessions_per_file is the limit to the number of sessions merged
+          into a single replay file.
+        single_line (bool) Whether to emit replay files as a single line. If
+          false, the file is spaced out in a human readable fashion.
+        cnt_q (Queue) Session, transaction, error count queue populated by each
+          thread.
+    """
+    while not subdir_q.empty():
+        subdir = subdir_q.get()
+        subdir_path = os.path.join(in_dir, subdir)
+        indent = 2
+        if single_line:
+            indent = None
+        cnt = readAndCombine(subdir_path, num_sessions_per_file, indent, 
out_dir)
+        cnt_q.put(cnt)
+
+
+def configure_logging(use_debug=False):
+    ''' Configure the logging mechanism.
+
+    Args:
+        use_debug (bool) Whether to configure debug-level logging.
+    '''
+    log_format = '%(levelname)s: %(message)s'
+    if use_debug:
+        logging.basicConfig(format=log_format, level=logging.DEBUG)
+    else:
+        logging.basicConfig(format=log_format, level=logging.INFO)
+
+
+def parse_args():
+    ''' Parse the command line arguments.
+    '''
+    parser = argparse.ArgumentParser(description=description)
+
+    parser.add_argument("in_dir", type=str,
+                        help='''The input directory of traffic_dump replay
+                        files.  The expectation is that this will contain
+                        sub-directories that themselves contain replay files.
+                        This is written to accommodate the directory populated
+                        by traffic_dump via the --logdir option.''')
+    parser.add_argument("out_dir", type=str,
+                        help="The output directory of post processed replay 
files.")
+    parser.add_argument("-n", "--num_sessions", type=int, default=10,
+                        help='''The maximum number of sessions merged into
+                        single replay output files. The default is 10.''')
+    parser.add_argument("--no-human-readable", action="store_true",
+                        help='''By default, post processor will generate replay
+                        files that are spaced out in a human readable format.
+                        This turns off that behavior and leaves the files as
+                        single-line entries.''')
+    parser.add_argument("-d", "--debug", action="store_true",
+                        help="Enable debug level logging.")
+    return parser.parse_args()
+
+
+def main():
+    args = parse_args()
+    configure_logging(use_debug=args.debug)
+    logging.debug("Original options: %s", " ".join(sys.argv))
+
+    if not os.path.exists(args.out_dir):
+        os.mkdir(args.out_dir)
+
+    # generate thread arguments
+    subdir_q = Queue()
+    cnt_q = Queue()
+    for subdir in os.listdir(args.in_dir):
+        if os.path.isdir(os.path.join(args.in_dir, subdir)):
+            subdir_q.put(subdir)
+
+    threads = []
+    nthreads = min(max(subdir_q.qsize(), 1), 32)
+
+    # Start up the threads.
+    for i in range(nthreads):
+        t = Thread(target=post_process,
+                   args=(args.in_dir, subdir_q, args.out_dir,
+                         args.num_sessions, args.no_human_readable, cnt_q))
+        t.start()
+        threads.append(t)
+
+    # Wait for them to finish.
+    for t in threads:
+        t.join()
+
+    # Retrieve the counts
+    session_count = 0
+    transaction_count = 0
+    errors = defaultdict(int)
+    for count_tuple in list(cnt_q.queue):
+        session_count += count_tuple[0]
+        transaction_count += count_tuple[1]
+        for e in count_tuple[2]:
+            errors[e] += count_tuple[2][e]
+    summary = "Total {} sessions and {} transactions.".format(session_count, 
transaction_count)
+    logging.info(summary)
+    if errors:
+        logging.info("Total errors:")
+        for e in errors:
+            logging.info("{}: {}".format(e, errors[e]))
+    else:
+        logging.info("Total errors: 0")
+
+    with open("{}/summary.txt".format(args.out_dir), "w", encoding="ascii") as 
f:
+        f.write("{}\n".format(summary))
+
+
+if __name__ == "__main__":
+    sys.exit(main())

Reply via email to