phani8996 closed pull request #3988: [AIRFLOW-620] Add a dropdown and refresh 
button to tail selected number of lines from worker log
URL: https://github.com/apache/incubator-airflow/pull/3988
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 09bd0c1806..b5f7528332 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -1023,17 +1023,41 @@ def scheduler(args):
 @cli_utils.action_logging
 def serve_logs(args):
     print("Starting flask")
-    import flask
-    flask_app = flask.Flask(__name__)
+    from flask import Flask, request, Response, stream_with_context, 
send_from_directory
+    flask_app = Flask(__name__)
 
     @flask_app.route('/log/<path:filename>')
     def serve_logs(filename):  # noqa
+        def tail_logs(logdir, filename, num_lines):
+            logpath = "{logdir}/{filename}".format(logdir=logdir, 
filename=filename)
+            logsize = os.path.getsize(logpath)
+            if logsize >= 100 * 1024 * 1024:
+                p1 = subprocess.Popen(["tail", "-n " + str(num_lines), 
filename],
+                                      stdout=subprocess.PIPE, cwd=log)
+                out, err = p1.communicate()
+                out = "Tailing file\n\n" + out.decode("utf-8")
+            else:
+                fl = open("{log}//{filename}".format(log=log, 
filename=filename), "r")
+                lines = fl.readlines()
+                fl.close()
+                out = "".join(l for l in lines[-num_lines:])
+            line = "***** Showing only last {num_lines} lines from {filename} 
*****" \
+                   "\n\n\n{out}".format(num_lines=num_lines, 
filename=filename, out=out)
+            yield line
+        num_lines = request.args.get("num_lines")
+        try:
+            num_lines = int(num_lines)
+        except ValueError or TypeError:
+            num_lines = None
         log = os.path.expanduser(conf.get('core', 'BASE_LOG_FOLDER'))
-        return flask.send_from_directory(
-            log,
-            filename,
-            mimetype="application/json",
-            as_attachment=False)
+        if num_lines:
+            return Response(stream_with_context(tail_logs(log, filename, 
num_lines)))
+        else:
+            return send_from_directory(
+                log,
+                filename,
+                mimetype="application/json",
+                as_attachment=False)
 
     WORKER_LOG_SERVER_PORT = \
         int(conf.get('celery', 'WORKER_LOG_SERVER_PORT'))
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index bb4ab208d7..4a603b1b54 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -317,6 +317,10 @@ default_dag_run_display_number = 25
 # Enable werkzeug `ProxyFix` middleware
 enable_proxy_fix = False
 
+# Flag to enable tailing logs
+tail_logs = True
+num_lines = 100
+tail_lines_list = 50,100,200,500,1000
 
 [email]
 email_backend = airflow.utils.email.send_email_smtp
diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 113bd254ad..803c00e02b 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -20,6 +20,7 @@
 import logging
 import os
 import requests
+import subprocess
 
 from airflow import configuration as conf
 from airflow.configuration import AirflowConfigException
@@ -94,17 +95,38 @@ def _read(self, ti, try_number, metadata=None):
         # is needed to get correct log path.
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
+        if metadata and "num_lines" in metadata:
+            num_lines = metadata.num_lines
+        else:
+            num_lines = None
 
         log = ""
 
         if os.path.exists(location):
-            try:
-                with open(location) as f:
-                    log += "*** Reading local file: {}\n".format(location)
-                    log += "".join(f.readlines())
-            except Exception as e:
-                log = "*** Failed to load local log file: 
{}\n".format(location)
-                log += "*** {}\n".format(str(e))
+            if num_lines:
+                try:
+                    logsize = os.path.getsize(location)
+                    if logsize >= 100 * 1024 * 1024:
+                        p1 = subprocess.Popen(["tail", "-n " + str(num_lines), 
location],
+                                              stdout=subprocess.PIPE)
+                        out, err = p1.communicate()
+                        log = "Tailing file\n\n" + out.decode("utf-8")
+                    else:
+                        fl = open(location, "r")
+                        lines = fl.readlines()
+                        fl.close()
+                        log = "".join(l for l in lines[-num_lines:])
+                except Exception as e:
+                    log = "*** Failed to load local log file: 
{}\n".format(location)
+                    log += "*** {}\n".format(str(e))
+            else:
+                try:
+                    with open(location) as f:
+                        log += "*** Reading local file: {}\n".format(location)
+                        log += "".join(f.readlines())
+                except Exception as e:
+                    log = "*** Failed to load local log file: 
{}\n".format(location)
+                    log += "*** {}\n".format(str(e))
         else:
             url = os.path.join(
                 "http://{ti.hostname}:{worker_log_server_port}/log";, 
log_relative_path
@@ -112,6 +134,8 @@ def _read(self, ti, try_number, metadata=None):
                 ti=ti,
                 worker_log_server_port=conf.get('celery', 
'WORKER_LOG_SERVER_PORT')
             )
+            if num_lines:
+                url = "{url}?num_lines={num_lines}".format(url=url, 
num_lines=num_lines)
             log += "*** Log file does not exist: {}\n".format(location)
             log += "*** Fetching from: {}\n".format(url)
             try:
diff --git a/airflow/www/templates/airflow/ti_log.html 
b/airflow/www/templates/airflow/ti_log.html
index 2b50061b62..91e55bdab9 100644
--- a/airflow/www/templates/airflow/ti_log.html
+++ b/airflow/www/templates/airflow/ti_log.html
@@ -29,6 +29,26 @@ <h4>{{ title }}</h4>
     </a>
   </li>
   {% endfor %}
+  <ul class="nav nav-pills pull-right">
+    <li>
+      <div class="dropdown">
+        <button class="btn btn-primary dropdown-toggle" type="button" 
data-toggle="dropdown" id="tailLinesList">
+          <span id="selected">Tail Logs</span><span class="caret"></span>
+        </button>
+        <ul class="dropdown-menu" aria-labelledby="tailLinesList" 
id="linesList">
+          {% for tail_line in tail_lines_list %}
+            <li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">{{ tail_line }}</a></li>
+          {% endfor %}
+          <li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">{{ full_log_title }}</a></li>
+        </ul>
+      </div>
+    </li>
+    <li>
+      <button class="btn btn-default pull-right refresh_button" 
id="refresh-btn">
+        <span class="glyphicon glyphicon-refresh" aria-hidden="true" 
title="Refresh Logs"></span>
+      </button>
+    </li>
+  </ul>
 </ul>
 <div class="tab-content">
   {% for log in logs %}
@@ -52,6 +72,8 @@ <h4>{{ title }}</h4>
     const ANIMATION_SPEED = 1000;
     // Total number of tabs to show.
     const TOTAL_ATTEMPTS = "{{ logs|length }}";
+    // Text filter to render full log
+    const FULL_LOG_TITLE = "{{ full_log_title }}";
 
     // Recursively fetch logs from flask endpoint.
     function recurse(delay=DELAY) {
@@ -71,7 +93,7 @@ <h4>{{ title }}</h4>
     }
 
     // Streaming log with auto-tailing.
-    function autoTailingLog(try_number, metadata=null, auto_tailing=false) {
+    function autoTailingLog(try_number, metadata=null, auto_tailing=false, 
append_log=true) {
       console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ 
task_id }}, \
        execution_date: {{ execution_date }}, try_number: " + try_number + ", 
metadata: " + JSON.stringify(metadata));
 
@@ -107,7 +129,11 @@ <h4>{{ title }}</h4>
               var should_scroll = true
             }
             // The message may contain HTML, so either have to escape it or 
write it as text.
-            document.getElementById(`try-${try_number}`).textContent += 
res.message + "\n";
+            if(append_log){
+             document.getElementById(`try-${try_number}`).textContent += 
res.message + "\n";
+            }else{
+              document.getElementById(`try-${try_number}`).textContent = 
res.message + "\n";
+            }
             // Auto scroll window to the end if current window location is 
near the end.
             if(should_scroll) {
               $("html, body").animate({ scrollTop: $(document).height() }, 
ANIMATION_SPEED);
@@ -130,11 +156,55 @@ <h4>{{ title }}</h4>
       // returns at most 10k documents. We want the ability
       // to display all logs in the front-end.
       // An optimization here is to render from latest attempt.
+      var metadata = {{ metadata|safe }}
       for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
         // Only auto_tailing the page when streaming the latest attempt.
-        autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+        autoTailingLog(i, metadata, auto_tailing=(i == TOTAL_ATTEMPTS));
+      }
+      var num_lines = parseInt(metadata.num_lines);
+      if(!isNaN(num_lines)){
+        $("span#selected").text(num_lines);
+        $('span#selected').attr('data-original-title', `Tail last ${num_lines} 
lines`);
+        $("ul#linesList").prepend(`<li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">${num_lines}</a></li><li 
class="dropdown-divider"></li>`)
+      }
+      var text = $("span#selected").text()
+      var tail_lines = parseInt(text);
+      if(isNaN(tail_lines) && text != FULL_LOG_TITLE){
+        $("#refresh-btn").attr("disabled", "disabled");
       }
     });
+    function select_tail_lines(obj){
+      $('span#selected').text(obj.innerText);
+      if(!isNaN(parseInt(obj.innerText))){
+        $('span#selected').attr('data-original-title', `Tail last 
${obj.innerText} lines`);
+      }
+      if(obj.innerText === FULL_LOG_TITLE){
+        $('span#selected').attr('data-original-title', "Fetch full log");
+      }
+      if(isNaN(parseInt(obj.innerText)) && obj.innerText != FULL_LOG_TITLE){
+        $("#refresh-btn").attr("disabled", "disabled");
+      }else{
+        $("#refresh-btn").prop("disabled", false);
+      }
+    }
+    $("#refresh-btn").click(() => {
+      var active_tab = 
parseInt($("li.active[role=presentation]").children()[0].innerText);
+      var text = $("span#selected").text();
+      var tail_lines = parseInt(text);
+      if(isNaN(tail_lines) && text != FULL_LOG_TITLE){
+        alert(`${text} is not a valid integer`);
+        return;
+      }else if(isNaN(active_tab)){
+        alert(`Invalid integer for active tab`)
+        return;
+      }
+      var metadata = {num_lines: tail_lines};
+      if(text === FULL_LOG_TITLE){
+        metadata = {}
+      }
+      autoTailingLog(active_tab, metadata, true, false);
+    });
+
 
 </script>
 {% endblock %}
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d9078caa39..53e4e0424f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -777,10 +777,13 @@ def get_logs_with_metadata(self, session=None):
         dttm = pendulum.parse(execution_date)
         try_number = int(request.args.get('try_number'))
         metadata = request.args.get('metadata')
-        metadata = json.loads(metadata)
-
         # metadata may be null
-        if not metadata:
+        if metadata:
+            try:
+                metadata = json.loads(metadata)
+            except TypeError or json.decoder.JSONDecodeError:
+                metadata = {}
+        else:
             metadata = {}
 
         # Convert string datetime into actual datetime
@@ -836,6 +839,23 @@ def log(self, session=None):
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
+        metadata = {}
+        try:
+            tailing_required = conf.getboolean('webserver', 'tail_logs')
+            if tailing_required:
+                num_lines = conf.getint('webserver', 'num_lines')
+                metadata = {
+                    "num_lines": num_lines
+                }
+        except Exception:
+            pass
+        tail_lines_list = [100, 200, 500, 1000]
+        try:
+            tail_lines_list = conf.get('webserver', 'tail_lines_list')
+            tail_lines_list = list(map(int, tail_lines_list.split(',')))
+        except Exception:
+            pass
+        tail_lines_list.sort()
 
         ti = session.query(models.TaskInstance).filter(
             models.TaskInstance.dag_id == dag_id,
@@ -847,7 +867,8 @@ def log(self, session=None):
             'airflow/ti_log.html',
             logs=logs, dag=dag, title="Log by attempts",
             dag_id=dag.dag_id, task_id=task_id,
-            execution_date=execution_date, form=form)
+            execution_date=execution_date, form=form, full_log_title="Full 
Log",
+            metadata=json.dumps(metadata), tail_lines_list=tail_lines_list)
 
     @expose('/task')
     @login_required
diff --git a/airflow/www_rbac/templates/airflow/ti_log.html 
b/airflow/www_rbac/templates/airflow/ti_log.html
index c873f67727..d51744e747 100644
--- a/airflow/www_rbac/templates/airflow/ti_log.html
+++ b/airflow/www_rbac/templates/airflow/ti_log.html
@@ -29,6 +29,26 @@ <h4>{{ title }}</h4>
     </a>
   </li>
   {% endfor %}
+  <ul class="nav nav-pills pull-right">
+    <li>
+      <div class="dropdown">
+        <button class="btn btn-primary dropdown-toggle" type="button" 
data-toggle="dropdown" id="tailLinesList">
+          <span id="selected">Tail Logs</span><span class="caret"></span>
+        </button>
+        <ul class="dropdown-menu" aria-labelledby="tailLinesList" 
id="linesList">
+          {% for tail_line in tail_lines_list %}
+            <li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">{{ tail_line }}</a></li>
+          {% endfor %}
+          <li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">{{ full_log_title }}</a></li>
+        </ul>
+      </div>
+    </li>
+    <li>
+      <button class="btn btn-default pull-right refresh_button" 
id="refresh-btn">
+        <span class="glyphicon glyphicon-refresh" aria-hidden="true" 
title="Refresh Logs"></span>
+      </button>
+    </li>
+  </ul>
 </ul>
 <div class="tab-content">
   {% for log in logs %}
@@ -52,6 +72,8 @@ <h4>{{ title }}</h4>
     const ANIMATION_SPEED = 1000;
     // Total number of tabs to show.
     const TOTAL_ATTEMPTS = "{{ logs|length }}";
+    // Text filter to render full log
+    const FULL_LOG_TITLE = "{{ full_log_title }}";
 
     // Recursively fetch logs from flask endpoint.
     function recurse(delay=DELAY) {
@@ -71,7 +93,7 @@ <h4>{{ title }}</h4>
     }
 
     // Streaming log with auto-tailing.
-    function autoTailingLog(try_number, metadata=null, auto_tailing=false) {
+    function autoTailingLog(try_number, metadata=null, auto_tailing=false, 
append_log=true) {
       console.debug("Auto-tailing log for dag_id: {{ dag_id }}, task_id: {{ 
task_id }}, \
        execution_date: {{ execution_date }}, try_number: " + try_number + ", 
metadata: " + JSON.stringify(metadata));
 
@@ -107,7 +129,11 @@ <h4>{{ title }}</h4>
               var should_scroll = true
             }
             // The message may contain HTML, so either have to escape it or 
write it as text.
-            document.getElementById(`try-${try_number}`).textContent += 
res.message + "\n";
+            if(append_log){
+             document.getElementById(`try-${try_number}`).textContent += 
res.message + "\n";
+            }else{
+              document.getElementById(`try-${try_number}`).textContent = 
res.message + "\n";
+            }
             // Auto scroll window to the end if current window location is 
near the end.
             if(should_scroll) {
               $("html, body").animate({ scrollTop: $(document).height() }, 
ANIMATION_SPEED);
@@ -130,11 +156,55 @@ <h4>{{ title }}</h4>
       // returns at most 10k documents. We want the ability
       // to display all logs in the front-end.
       // An optimization here is to render from latest attempt.
+      var metadata = {{ metadata|safe }}
       for(let i = TOTAL_ATTEMPTS; i >= 1; i--) {
         // Only auto_tailing the page when streaming the latest attempt.
-        autoTailingLog(i, null, auto_tailing=(i == TOTAL_ATTEMPTS));
+        autoTailingLog(i, metadata, auto_tailing=(i == TOTAL_ATTEMPTS));
+      }
+      var num_lines = parseInt(metadata.num_lines);
+      if(!isNaN(num_lines)){
+        $("span#selected").text(num_lines);
+        $('span#selected').attr('data-original-title', `Tail last ${num_lines} 
lines`);
+        $("ul#linesList").prepend(`<li class="dropdown-item"><a href="#" 
onClick="select_tail_lines(this)">${num_lines}</a></li><li 
class="dropdown-divider"></li>`)
+      }
+      var text = $("span#selected").text()
+      var tail_lines = parseInt(text);
+      if(isNaN(tail_lines) && text != FULL_LOG_TITLE){
+        $("#refresh-btn").attr("disabled", "disabled");
       }
     });
+    function select_tail_lines(obj){
+      $('span#selected').text(obj.innerText);
+      if(!isNaN(parseInt(obj.innerText))){
+        $('span#selected').attr('data-original-title', `Tail last 
${obj.innerText} lines`);
+      }
+      if(obj.innerText === FULL_LOG_TITLE){
+        $('span#selected').attr('data-original-title', "Fetch full log");
+      }
+      if(isNaN(parseInt(obj.innerText)) && obj.innerText != FULL_LOG_TITLE){
+        $("#refresh-btn").attr("disabled", "disabled");
+      }else{
+        $("#refresh-btn").prop("disabled", false);
+      }
+    }
+    $("#refresh-btn").click(() => {
+      var active_tab = 
parseInt($("li.active[role=presentation]").children()[0].innerText);
+      var text = $("span#selected").text();
+      var tail_lines = parseInt(text);
+      if(isNaN(tail_lines) && text != FULL_LOG_TITLE){
+        alert(`${text} is not a valid integer`);
+        return;
+      }else if(isNaN(active_tab)){
+        alert(`Invalid integer for active tab`)
+        return;
+      }
+      var metadata = {num_lines: tail_lines};
+      if(text === FULL_LOG_TITLE){
+        metadata = {}
+      }
+      autoTailingLog(active_tab, metadata, true, false);
+    });
+
 
 </script>
 {% endblock %}
diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py
index e6e505c41a..5ae70473e2 100644
--- a/airflow/www_rbac/views.py
+++ b/airflow/www_rbac/views.py
@@ -510,10 +510,13 @@ def get_logs_with_metadata(self, session=None):
         dttm = pendulum.parse(execution_date)
         try_number = int(request.args.get('try_number'))
         metadata = request.args.get('metadata')
-        metadata = json.loads(metadata)
-
         # metadata may be null
-        if not metadata:
+        if metadata:
+            try:
+                metadata = json.loads(metadata)
+            except TypeError or json.decoder.JSONDecodeError:
+                metadata = {}
+        else:
             metadata = {}
 
         # Convert string datetime into actual datetime
@@ -570,6 +573,23 @@ def log(self, session=None):
         dttm = pendulum.parse(execution_date)
         form = DateTimeForm(data={'execution_date': dttm})
         dag = dagbag.get_dag(dag_id)
+        metadata = {}
+        try:
+            tailing_required = conf.getboolean('webserver', 'tail_logs')
+            if tailing_required:
+                num_lines = conf.getint('webserver', 'num_lines')
+                metadata = {
+                    "num_lines": num_lines
+                }
+        except Exception:
+            pass
+        tail_lines_list = [100, 200, 500, 1000]
+        try:
+            tail_lines_list = conf.get('webserver', 'tail_lines_list')
+            tail_lines_list = list(map(int, tail_lines_list.split(',')))
+        except Exception:
+            pass
+        tail_lines_list.sort()
 
         ti = session.query(models.TaskInstance).filter(
             models.TaskInstance.dag_id == dag_id,
@@ -581,7 +601,8 @@ def log(self, session=None):
             'airflow/ti_log.html',
             logs=logs, dag=dag, title="Log by attempts",
             dag_id=dag.dag_id, task_id=task_id,
-            execution_date=execution_date, form=form)
+            execution_date=execution_date, form=form, full_log_title="Full 
Log",
+            metadata=json.dumps(metadata), tail_lines_list=tail_lines_list)
 
     @expose('/task')
     @has_dag_access(can_dag_read=True)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to