phani8996 closed pull request #3988: [AIRFLOW-620] Add a dropdown and refresh button to tail selected number of lines from worker log URL: https://github.com/apache/incubator-airflow/pull/3988
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 09bd0c1806..b5f7528332 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1023,17 +1023,41 @@ def scheduler(args): @cli_utils.action_logging def serve_logs(args): print("Starting flask") - import flask - flask_app = flask.Flask(__name__) + from flask import Flask, request, Response, stream_with_context, send_from_directory + flask_app = Flask(__name__) @flask_app.route('/log/<path:filename>') def serve_logs(filename): # noqa + def tail_logs(logdir, filename, num_lines): + logpath = "{logdir}/{filename}".format(logdir=logdir, filename=filename) + logsize = os.path.getsize(logpath) + if logsize >= 100 * 1024 * 1024: + p1 = subprocess.Popen(["tail", "-n " + str(num_lines), filename], + stdout=subprocess.PIPE, cwd=log) + out, err = p1.communicate() + out = "Tailing file\n\n" + out.decode("utf-8") + else: + fl = open("{log}//{filename}".format(log=log, filename=filename), "r") + lines = fl.readlines() + fl.close() + out = "".join(l for l in lines[-num_lines:]) + line = "***** Showing only last {num_lines} lines from {filename} *****" \ + "\n\n\n{out}".format(num_lines=num_lines, filename=filename, out=out) + yield line + num_lines = request.args.get("num_lines") + try: + num_lines = int(num_lines) + except ValueError or TypeError: + num_lines = None log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER')) - return flask.send_from_directory( - log, - filename, - mimetype="application/json", - as_attachment=False) + if num_lines: + return Response(stream_with_context(tail_logs(log, filename, num_lines))) + else: + return send_from_directory( + log, + filename, + mimetype="application/json", + as_attachment=False) WORKER_LOG_SERVER_PORT = \ int(conf.get('celery', 'WORKER_LOG_SERVER_PORT')) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bb4ab208d7..4a603b1b54 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -317,6 +317,10 @@ default_dag_run_display_number = 25 # Enable werkzeug `ProxyFix` middleware enable_proxy_fix = False +# Flag to enable tailing logs +tail_logs = True +num_lines = 100 +tail_lines_list = 50,100,200,500,1000 [email] email_backend = airflow.utils.email.send_email_smtp diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 113bd254ad..803c00e02b 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -20,6 +20,7 @@ import logging import os import requests +import subprocess from airflow import configuration as conf from airflow.configuration import AirflowConfigException @@ -94,17 +95,38 @@ def _read(self, ti, try_number, metadata=None): # is needed to get correct log path. log_relative_path = self._render_filename(ti, try_number) location = os.path.join(self.local_base, log_relative_path) + if metadata and "num_lines" in metadata: + num_lines = metadata.num_lines + else: + num_lines = None log = "" if os.path.exists(location): - try: - with open(location) as f: - log += "*** Reading local file: {}\n".format(location) - log += "".join(f.readlines()) - except Exception as e: - log = "*** Failed to load local log file: {}\n".format(location) - log += "*** {}\n".format(str(e)) + if num_lines: + try: + logsize = os.path.getsize(location) + if logsize >= 100 * 1024 * 1024: + p1 = subprocess.Popen(["tail", "-n " + str(num_lines), location], + stdout=subprocess.PIPE) + out, err = p1.communicate() + log = "Tailing file\n\n" + out.decode("utf-8") + else: + fl = open(location, "r") + lines = fl.readlines() + fl.close() + log = "".join(l for l in lines[-num_lines:]) + except Exception as e: + log = "*** Failed to load local log file: {}\n".format(location) + log += "*** {}\n".format(str(e)) + else: + try: + with open(location) as f: + log += "*** Reading local file: {}\n".format(location) + log += "".join(f.readlines()) + except Exception as e: + log = "*** Failed to load local log file: {}\n".format(location) + log += "*** {}\n".format(str(e)) else: url = os.path.join( "http://{ti.hostname}:{worker_log_server_port}/log", log_relative_path @@ -112,6 +134,8 @@ def _read(self, ti, try_number, metadata=None): ti=ti, worker_log_server_port=conf.get('celery', 'WORKER_LOG_SERVER_PORT') ) + if num_lines: + url = "{url}?num_lines={num_lines}".format(url=url, num_lines=num_lines) log += "*** Log file does not exist: {}\n".format(location) log += "*** Fetching from: {}\n".format(url) try: diff --git a/airflow/www/templates/airflow/ti_log.html b/airflow/www/templates/airflow/ti_log.html index 2b50061b62..91e55bdab9 100644 --- a/airflow/www/templates/airflow/ti_log.html +++ b/airflow/www/templates/airflow/ti_log.html @@ -29,6 +29,26 @@ <h4>{{ title }}</h4> </a> </li> {% endfor %} + <ul class="nav nav-pills pull-right"> + <li> + <div class="dropdown"> + <button class="btn btn-primary dropdown-toggle" type="button" data-toggle="dropdown" id="tailLinesList"> + <span id="selected">Tail Logs</span><span class="caret"></span> + </button> + <ul class="dropdown-menu" aria-labelledby="tailLinesList" id="linesList"> + {% for tail_line in tail_lines_list %} + <li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">{{ tail_line }}</a></li> + {% endfor %} + <li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">{{ full_log_title }}</a></li> + </ul> + </div> + </li> + <li> + <button class="btn btn-default pull-right refresh_button" id="refresh-btn"> + <span class="glyphicon glyphicon-refresh" aria-hidden="true" title="Refresh Logs"></span> + </button> + </li> + </ul> </ul> <div class="tab-content"> {% for log in logs %} @@ -52,6 +72,8 @@ <h4>{{ title }}</h4> const ANIMATION_SPEED = 1000; // Total number of tabs to show. const TOTAL_ATTEMPTS = "{{ logs|length }}"; + // Text filter to render full log + const FULL_LOG_TITLE = "{{ full_log_title }}"; // Recursively fetch logs from flask endpoint. function recurse(delay=DELAY) { @@ -71,7 +93,7 @@ <h4>{{ title }}</h4> } // Streaming log with auto-tailing. - function autoTailingLog(try_number, metadata=null, auto_tailing=false) { + function autoTailingLog(try_number, metadata=null, auto_tailing=false, append_log=true) { console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ task_id }}, \ execution_date: {{ execution_date }}, try_number: " + try_number + ", metadata: " + JSON.stringify(metadata)); @@ -107,7 +129,11 @@ <h4>{{ title }}</h4> var should_scroll = true } // The message may contain HTML, so either have to escape it or write it as text. - document.getElementById(`try-${try_number}`).textContent += res.message + "\n"; + if(append_log){ + document.getElementById(`try-${try_number}`).textContent += res.message + "\n"; + }else{ + document.getElementById(`try-${try_number}`).textContent = res.message + "\n"; + } // Auto scroll window to the end if current window location is near the end. if(should_scroll) { $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED); @@ -130,11 +156,55 @@ <h4>{{ title }}</h4> // returns at most 10k documents. We want the ability // to display all logs in the front-end. // An optimization here is to render from latest attempt. + var metadata = {{ metadata|safe }} for(let i = TOTAL_ATTEMPTS; i >= 1; i--) { // Only auto_tailing the page when streaming the latest attempt. - autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS)); + autoTailingLog(i, metadata, auto_tailing=(i == TOTAL_ATTEMPTS)); + } + var num_lines = parseInt(metadata.num_lines); + if(!isNaN(num_lines)){ + $("span#selected").text(num_lines); + $('span#selected').attr('data-original-title', `Tail last ${num_lines} lines`); + $("ul#linesList").prepend(`<li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">${num_lines}</a></li><li class="dropdown-divider"></li>`) + } + var text = $("span#selected").text() + var tail_lines = parseInt(text); + if(isNaN(tail_lines) && text != FULL_LOG_TITLE){ + $("#refresh-btn").attr("disabled", "disabled"); } }); + function select_tail_lines(obj){ + $('span#selected').text(obj.innerText); + if(!isNaN(parseInt(obj.innerText))){ + $('span#selected').attr('data-original-title', `Tail last ${obj.innerText} lines`); + } + if(obj.innerText === FULL_LOG_TITLE){ + $('span#selected').attr('data-original-title', "Fetch full log"); + } + if(isNaN(parseInt(obj.innerText)) && obj.innerText != FULL_LOG_TITLE){ + $("#refresh-btn").attr("disabled", "disabled"); + }else{ + $("#refresh-btn").prop("disabled", false); + } + } + $("#refresh-btn").click(() => { + var active_tab = parseInt($("li.active[role=presentation]").children()[0].innerText); + var text = $("span#selected").text(); + var tail_lines = parseInt(text); + if(isNaN(tail_lines) && text != FULL_LOG_TITLE){ + alert(`${text} is not a valid integer`); + return; + }else if(isNaN(active_tab)){ + alert(`Invalid integer for active tab`) + return; + } + var metadata = {num_lines: tail_lines}; + if(text === FULL_LOG_TITLE){ + metadata = {} + } + autoTailingLog(active_tab, metadata, true, false); + }); + </script> {% endblock %} diff --git a/airflow/www/views.py b/airflow/www/views.py index d9078caa39..53e4e0424f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -777,10 +777,13 @@ def get_logs_with_metadata(self, session=None): dttm = pendulum.parse(execution_date) try_number = int(request.args.get('try_number')) metadata = request.args.get('metadata') - metadata = json.loads(metadata) - # metadata may be null - if not metadata: + if metadata: + try: + metadata = json.loads(metadata) + except TypeError or json.decoder.JSONDecodeError: + metadata = {} + else: metadata = {} # Convert string datetime into actual datetime @@ -836,6 +839,23 @@ def log(self, session=None): dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) + metadata = {} + try: + tailing_required = conf.getboolean('webserver', 'tail_logs') + if tailing_required: + num_lines = conf.getint('webserver', 'num_lines') + metadata = { + "num_lines": num_lines + } + except Exception: + pass + tail_lines_list = [100, 200, 500, 1000] + try: + tail_lines_list = conf.get('webserver', 'tail_lines_list') + tail_lines_list = list(map(int, tail_lines_list.split(','))) + except Exception: + pass + tail_lines_list.sort() ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, @@ -847,7 +867,8 @@ def log(self, session=None): 'airflow/ti_log.html', logs=logs, dag=dag, title="Log by attempts", dag_id=dag.dag_id, task_id=task_id, - execution_date=execution_date, form=form) + execution_date=execution_date, form=form, full_log_title="Full Log", + metadata=json.dumps(metadata), tail_lines_list=tail_lines_list) @expose('/task') @login_required diff --git a/airflow/www_rbac/templates/airflow/ti_log.html b/airflow/www_rbac/templates/airflow/ti_log.html index c873f67727..d51744e747 100644 --- a/airflow/www_rbac/templates/airflow/ti_log.html +++ b/airflow/www_rbac/templates/airflow/ti_log.html @@ -29,6 +29,26 @@ <h4>{{ title }}</h4> </a> </li> {% endfor %} + <ul class="nav nav-pills pull-right"> + <li> + <div class="dropdown"> + <button class="btn btn-primary dropdown-toggle" type="button" data-toggle="dropdown" id="tailLinesList"> + <span id="selected">Tail Logs</span><span class="caret"></span> + </button> + <ul class="dropdown-menu" aria-labelledby="tailLinesList" id="linesList"> + {% for tail_line in tail_lines_list %} + <li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">{{ tail_line }}</a></li> + {% endfor %} + <li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">{{ full_log_title }}</a></li> + </ul> + </div> + </li> + <li> + <button class="btn btn-default pull-right refresh_button" id="refresh-btn"> + <span class="glyphicon glyphicon-refresh" aria-hidden="true" title="Refresh Logs"></span> + </button> + </li> + </ul> </ul> <div class="tab-content"> {% for log in logs %} @@ -52,6 +72,8 @@ <h4>{{ title }}</h4> const ANIMATION_SPEED = 1000; // Total number of tabs to show. const TOTAL_ATTEMPTS = "{{ logs|length }}"; + // Text filter to render full log + const FULL_LOG_TITLE = "{{ full_log_title }}"; // Recursively fetch logs from flask endpoint. function recurse(delay=DELAY) { @@ -71,7 +93,7 @@ <h4>{{ title }}</h4> } // Streaming log with auto-tailing. - function autoTailingLog(try_number, metadata=null, auto_tailing=false) { + function autoTailingLog(try_number, metadata=null, auto_tailing=false, append_log=true) { console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ task_id }}, \ execution_date: {{ execution_date }}, try_number: " + try_number + ", metadata: " + JSON.stringify(metadata)); @@ -107,7 +129,11 @@ <h4>{{ title }}</h4> var should_scroll = true } // The message may contain HTML, so either have to escape it or write it as text. - document.getElementById(`try-${try_number}`).textContent += res.message + "\n"; + if(append_log){ + document.getElementById(`try-${try_number}`).textContent += res.message + "\n"; + }else{ + document.getElementById(`try-${try_number}`).textContent = res.message + "\n"; + } // Auto scroll window to the end if current window location is near the end. if(should_scroll) { $("html, body").animate({ scrollTop: $(document).height() }, ANIMATION_SPEED); @@ -130,11 +156,55 @@ <h4>{{ title }}</h4> // returns at most 10k documents. We want the ability // to display all logs in the front-end. // An optimization here is to render from latest attempt. + var metadata = {{ metadata|safe }} for(let i = TOTAL_ATTEMPTS; i >= 1; i--) { // Only auto_tailing the page when streaming the latest attempt. - autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS)); + autoTailingLog(i, metadata, auto_tailing=(i == TOTAL_ATTEMPTS)); + } + var num_lines = parseInt(metadata.num_lines); + if(!isNaN(num_lines)){ + $("span#selected").text(num_lines); + $('span#selected').attr('data-original-title', `Tail last ${num_lines} lines`); + $("ul#linesList").prepend(`<li class="dropdown-item"><a href="#" onClick="select_tail_lines(this)">${num_lines}</a></li><li class="dropdown-divider"></li>`) + } + var text = $("span#selected").text() + var tail_lines = parseInt(text); + if(isNaN(tail_lines) && text != FULL_LOG_TITLE){ + $("#refresh-btn").attr("disabled", "disabled"); } }); + function select_tail_lines(obj){ + $('span#selected').text(obj.innerText); + if(!isNaN(parseInt(obj.innerText))){ + $('span#selected').attr('data-original-title', `Tail last ${obj.innerText} lines`); + } + if(obj.innerText === FULL_LOG_TITLE){ + $('span#selected').attr('data-original-title', "Fetch full log"); + } + if(isNaN(parseInt(obj.innerText)) && obj.innerText != FULL_LOG_TITLE){ + $("#refresh-btn").attr("disabled", "disabled"); + }else{ + $("#refresh-btn").prop("disabled", false); + } + } + $("#refresh-btn").click(() => { + var active_tab = parseInt($("li.active[role=presentation]").children()[0].innerText); + var text = $("span#selected").text(); + var tail_lines = parseInt(text); + if(isNaN(tail_lines) && text != FULL_LOG_TITLE){ + alert(`${text} is not a valid integer`); + return; + }else if(isNaN(active_tab)){ + alert(`Invalid integer for active tab`) + return; + } + var metadata = {num_lines: tail_lines}; + if(text === FULL_LOG_TITLE){ + metadata = {} + } + autoTailingLog(active_tab, metadata, true, false); + }); + </script> {% endblock %} diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index e6e505c41a..5ae70473e2 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -510,10 +510,13 @@ def get_logs_with_metadata(self, session=None): dttm = pendulum.parse(execution_date) try_number = int(request.args.get('try_number')) metadata = request.args.get('metadata') - metadata = json.loads(metadata) - # metadata may be null - if not metadata: + if metadata: + try: + metadata = json.loads(metadata) + except TypeError or json.decoder.JSONDecodeError: + metadata = {} + else: metadata = {} # Convert string datetime into actual datetime @@ -570,6 +573,23 @@ def log(self, session=None): dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) + metadata = {} + try: + tailing_required = conf.getboolean('webserver', 'tail_logs') + if tailing_required: + num_lines = conf.getint('webserver', 'num_lines') + metadata = { + "num_lines": num_lines + } + except Exception: + pass + tail_lines_list = [100, 200, 500, 1000] + try: + tail_lines_list = conf.get('webserver', 'tail_lines_list') + tail_lines_list = list(map(int, tail_lines_list.split(','))) + except Exception: + pass + tail_lines_list.sort() ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, @@ -581,7 +601,8 @@ def log(self, session=None): 'airflow/ti_log.html', logs=logs, dag=dag, title="Log by attempts", dag_id=dag.dag_id, task_id=task_id, - execution_date=execution_date, form=form) + execution_date=execution_date, form=form, full_log_title="Full Log", + metadata=json.dumps(metadata), tail_lines_list=tail_lines_list) @expose('/task') @has_dag_access(can_dag_read=True) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services