[AIRFLOW-6] Remove dependency on Highcharts Highcharts' license is not compatible with the Apache 2.0 license. This patch removes Highcharts in favor of d3, however some charts are not supported anymore.
* This brings Maxime Beauchemin's work to master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a460081 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a460081 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a460081 Branch: refs/heads/master Commit: 0a460081bc7cba2d05434148f092b87d35aa8cd3 Parents: d243c00 Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Mon Jun 20 14:19:34 2016 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Mon Jun 20 14:53:30 2016 +0200 ---------------------------------------------------------------------- ` | 2347 ++++ airflow/bin/airflow | 13 + airflow/executors/base_executor.py | 14 + airflow/executors/celery_executor.py | 14 + airflow/hooks/__init__.py | 2 +- airflow/hooks/dbapi_hook.py | 1 - airflow/hooks/jdbc_hook.py | 3 +- airflow/hooks/oracle_hook.py | 1 + airflow/www/app.py | 4 +- airflow/www/blueprints.py | 6 - airflow/www/static/d3.tip.v0.6.3.js | 280 + airflow/www/static/d3.v3.min.js | 10 +- airflow/www/static/gantt-chart-d3v2.js | 247 + airflow/www/static/gantt.css | 38 + airflow/www/static/highcharts-more.js | 53 - airflow/www/static/highcharts.js | 308 - airflow/www/static/nv.d3.css | 769 ++ airflow/www/static/nv.d3.js | 14241 ++++++++++++++++++++ airflow/www/static/nvd3.tar.gz | Bin 0 -> 328377 bytes airflow/www/templates/airflow/chart.html | 37 +- airflow/www/templates/airflow/dag.html | 4 +- airflow/www/templates/airflow/gantt.html | 86 +- airflow/www/templates/airflow/highchart.html | 183 - airflow/www/templates/airflow/nvd3.html | 175 + airflow/www/utils.py | 11 +- airflow/www/views.py | 352 +- setup.py | 23 +- tests/core.py | 19 - 28 files changed, 18315 insertions(+), 926 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/` ---------------------------------------------------------------------- diff --git a/` b/` new file mode 100644 index 0000000..6331805 --- /dev/null +++ b/` @@ -0,0 +1,2347 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys + +import os +import socket +import importlib + +from functools import wraps +from datetime import datetime, timedelta +import dateutil.parser +import copy +from itertools import chain, product + +from past.utils import old_div +from past.builtins import basestring + +import inspect +import traceback + +import sqlalchemy as sqla +from sqlalchemy import or_, desc, and_ + + +from flask import redirect, url_for, request, Markup, Response, current_app, render_template +from flask_admin import BaseView, expose, AdminIndexView +from flask_admin.contrib.sqla import ModelView +from flask_admin.actions import action +from flask_login import flash +from flask._compat import PY2 + +import jinja2 +import markdown +import json + +from wtforms import ( + Form, SelectField, TextAreaField, PasswordField, StringField) + +from pygments import highlight, lexers +from pygments.formatters import HtmlFormatter + +import airflow +from airflow import configuration as conf +from airflow import models +from airflow import settings +from airflow.exceptions import AirflowException +from airflow.settings import Session +from airflow.models import XCom + +from airflow.utils.json import json_ser +from airflow.utils.state import State +from airflow.utils.db import provide_session +from airflow.utils.helpers import alchemy_to_dict +from airflow.utils import logging as log_utils +from airflow.www import utils as wwwutils +from airflow.www.forms import DateTimeForm, DateTimeWithNumRunsForm + +QUERY_LIMIT = 100000 +CHART_LIMIT = 200000 + +dagbag = models.DagBag(os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))) + +login_required = airflow.login.login_required +current_user = airflow.login.current_user +logout_user = airflow.login.logout_user + +FILTER_BY_OWNER = False +if conf.getboolean('webserver', 'FILTER_BY_OWNER'): + # filter_by_owner if authentication is enabled and filter_by_owner is true + FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED'] + + +def dag_link(v, c, m, p): + url = url_for( + 'airflow.graph', + dag_id=m.dag_id) + return Markup( + '<a href="{url}">{m.dag_id}</a>'.format(**locals())) + + +def log_link(v, c, m, p): + url = url_for( + 'airflow.log', + dag_id=m.dag_id, + task_id=m.task_id, + execution_date=m.execution_date.isoformat()) + return Markup( + '<a href="{url}">' + ' <span class="glyphicon glyphicon-book" aria-hidden="true">' + '</span></a>').format(**locals()) + + +def task_instance_link(v, c, m, p): + url = url_for( + 'airflow.task', + dag_id=m.dag_id, + task_id=m.task_id, + execution_date=m.execution_date.isoformat()) + url_root = url_for( + 'airflow.graph', + dag_id=m.dag_id, + root=m.task_id, + execution_date=m.execution_date.isoformat()) + return Markup( + """ + <span style="white-space: nowrap;"> + <a href="{url}">{m.task_id}</a> + <a href="{url_root}" title="Filter on this task and upstream"> + <span class="glyphicon glyphicon-filter" style="margin-left: 0px;" + aria-hidden="true"></span> + </a> + </span> + """.format(**locals())) + + +def state_token(state): + color = State.color(state) + return Markup( + '<span class="label" style="background-color:{color};">' + '{state}</span>'.format(**locals())) + + +def state_f(v, c, m, p): + return state_token(m.state) + + +def duration_f(v, c, m, p): + if m.end_date and m.duration: + return timedelta(seconds=m.duration) + + +def datetime_f(v, c, m, p): + attr = getattr(m, p) + dttm = attr.isoformat() if attr else '' + if datetime.now().isoformat()[:4] == dttm[:4]: + dttm = dttm[5:] + return Markup("<nobr>{}</nobr>".format(dttm)) + + +def nobr_f(v, c, m, p): + return Markup("<nobr>{}</nobr>".format(getattr(m, p))) + + +def label_link(v, c, m, p): + try: + default_params = eval(m.default_params) + except: + default_params = {} + url = url_for( + 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no, + **default_params) + return Markup("<a href='{url}'>{m.label}</a>".format(**locals())) + + +def pool_link(v, c, m, p): + url = '/admin/taskinstance/?flt1_pool_equals=' + m.pool + return Markup("<a href='{url}'>{m.pool}</a>".format(**locals())) + + +def pygment_html_render(s, lexer=lexers.TextLexer): + return highlight( + s, + lexer(), + HtmlFormatter(linenos=True), + ) + + +def render(obj, lexer): + out = "" + if isinstance(obj, basestring): + out += pygment_html_render(obj, lexer) + elif isinstance(obj, (tuple, list)): + for i, s in enumerate(obj): + out += "<div>List item #{}</div>".format(i) + out += "<div>" + pygment_html_render(s, lexer) + "</div>" + elif isinstance(obj, dict): + for k, v in obj.items(): + out += '<div>Dict item "{}"</div>'.format(k) + out += "<div>" + pygment_html_render(v, lexer) + "</div>" + return out + + +def wrapped_markdown(s): + return '<div class="rich_doc">' + markdown.markdown(s) + "</div>" + + +attr_renderer = { + 'bash_command': lambda x: render(x, lexers.BashLexer), + 'hql': lambda x: render(x, lexers.SqlLexer), + 'sql': lambda x: render(x, lexers.SqlLexer), + 'doc': lambda x: render(x, lexers.TextLexer), + 'doc_json': lambda x: render(x, lexers.JsonLexer), + 'doc_rst': lambda x: render(x, lexers.RstLexer), + 'doc_yaml': lambda x: render(x, lexers.YamlLexer), + 'doc_md': wrapped_markdown, + 'python_callable': lambda x: render( + inspect.getsource(x), lexers.PythonLexer), +} + + +def data_profiling_required(f): + ''' + Decorator for views requiring data profiling access + ''' + @wraps(f) + def decorated_function(*args, **kwargs): + if ( + current_app.config['LOGIN_DISABLED'] or + (not current_user.is_anonymous() and current_user.data_profiling()) + ): + return f(*args, **kwargs) + else: + flash("This page requires data profiling privileges", "error") + return redirect(url_for('admin.index')) + return decorated_function + + +def fused_slots(v, c, m, p): + url = ( + '/admin/taskinstance/' + + '?flt1_pool_equals=' + m.pool + + '&flt2_state_equals=running') + return Markup("<a href='{0}'>{1}</a>".format(url, m.used_slots())) + + +def fqueued_slots(v, c, m, p): + url = ( + '/admin/taskinstance/' + + '?flt1_pool_equals=' + m.pool + + '&flt2_state_equals=queued&sort=10&desc=1') + return Markup("<a href='{0}'>{1}</a>".format(url, m.queued_slots())) + + +class Airflow(BaseView): + + def is_visible(self): + return False + + @expose('/') + @login_required + def index(self): + return self.render('airflow/dags.html') + + @expose('/chart_data') + @data_profiling_required + @wwwutils.gzipped + # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key) + def chart_data(self): + session = settings.Session() + chart_id = request.args.get('chart_id') + csv = request.args.get('csv') == "true" + chart = session.query(models.Chart).filter_by(id=chart_id).first() + db = session.query( + models.Connection).filter_by(conn_id=chart.conn_id).first() + session.expunge_all() + session.commit() + session.close() + + payload = {} + payload['state'] = 'ERROR' + payload['error'] = '' + + # Processing templated fields + try: + args = eval(chart.default_params) + if type(args) is not type(dict()): + raise AirflowException('Not a dict') + except: + args = {} + payload['error'] += ( + "Default params is not valid, string has to evaluate as " + "a Python dictionary. ") + + request_dict = {k: request.args.get(k) for k in request.args} + from airflow import macros + args.update(request_dict) + args['macros'] = macros + sql = jinja2.Template(chart.sql).render(**args) + label = jinja2.Template(chart.label).render(**args) + payload['sql_html'] = Markup(highlight( + sql, + lexers.SqlLexer(), # Lexer call + HtmlFormatter(noclasses=True)) + ) + payload['label'] = label + + import pandas as pd + pd.set_option('display.max_colwidth', 100) + hook = db.get_hook() + try: + df = hook.get_pandas_df(wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type)) + df = df.fillna(0) + except Exception as e: + payload['error'] += "SQL execution failed. Details: " + str(e) + + if csv: + return Response( + response=df.to_csv(index=False), + status=200, + mimetype="application/text") + + if not payload['error'] and len(df) == CHART_LIMIT: + payload['warning'] = ( + "Data has been truncated to {0}" + " rows. Expect incomplete results.").format(CHART_LIMIT) + + if not payload['error'] and len(df) == 0: + payload['error'] += "Empty result set. " + elif ( + not payload['error'] and + chart.sql_layout == 'series' and + chart.chart_type != "datatable" and + len(df.columns) < 3): + payload['error'] += "SQL needs to return at least 3 columns. " + elif ( + not payload['error'] and + chart.sql_layout == 'columns'and + len(df.columns) < 2): + payload['error'] += "SQL needs to return at least 2 columns. " + elif not payload['error']: + import numpy as np + chart_type = chart.chart_type + + data = None + if chart_type == "datatable": + chart.show_datatable = True + if chart.show_datatable: + data = df.to_dict(orient="split") + data['columns'] = [{'title': c} for c in data['columns']] + + # Trying to convert time to something Highcharts likes + x_col = 1 if chart.sql_layout == 'series' else 0 + if chart.x_is_date: + try: + # From string to datetime + df[df.columns[x_col]] = pd.to_datetime( + df[df.columns[x_col]]) + except Exception as e: + raise AirflowException(str(e)) + df[df.columns[x_col]] = df[df.columns[x_col]].apply( + lambda x: int(x.strftime("%s")) * 1000) + + series = [] + colorAxis = None + if chart_type == 'datatable': + payload['data'] = data + payload['state'] = 'SUCCESS' + return wwwutils.json_response(payload) + + elif chart_type == 'para': + df.rename(columns={ + df.columns[0]: 'name', + df.columns[1]: 'group', + }, inplace=True) + return Response( + response=df.to_csv(index=False), + status=200, + mimetype="application/text") + + elif chart_type == 'heatmap': + color_perc_lbound = float( + request.args.get('color_perc_lbound', 0)) + color_perc_rbound = float( + request.args.get('color_perc_rbound', 1)) + color_scheme = request.args.get('color_scheme', 'blue_red') + + if color_scheme == 'blue_red': + stops = [ + [color_perc_lbound, '#00D1C1'], + [ + color_perc_lbound + + ((color_perc_rbound - color_perc_lbound)/2), + '#FFFFCC' + ], + [color_perc_rbound, '#FF5A5F'] + ] + elif color_scheme == 'blue_scale': + stops = [ + [color_perc_lbound, '#FFFFFF'], + [color_perc_rbound, '#2222FF'] + ] + elif color_scheme == 'fire': + diff = float(color_perc_rbound - color_perc_lbound) + stops = [ + [color_perc_lbound, '#FFFFFF'], + [color_perc_lbound + 0.33*diff, '#FFFF00'], + [color_perc_lbound + 0.66*diff, '#FF0000'], + [color_perc_rbound, '#000000'] + ] + else: + stops = [ + [color_perc_lbound, '#FFFFFF'], + [ + color_perc_lbound + + ((color_perc_rbound - color_perc_lbound)/2), + '#888888' + ], + [color_perc_rbound, '#000000'], + ] + + xaxis_label = df.columns[1] + yaxis_label = df.columns[2] + data = [] + for row in df.itertuples(): + data.append({ + 'x': row[2], + 'y': row[3], + 'value': row[4], + }) + x_format = '{point.x:%Y-%m-%d}' \ + if chart.x_is_date else '{point.x}' + series.append({ + 'data': data, + 'borderWidth': 0, + 'colsize': 24 * 36e5, + 'turboThreshold': sys.float_info.max, + 'tooltip': { + 'headerFormat': '', + 'pointFormat': ( + df.columns[1] + ': ' + x_format + '<br/>' + + df.columns[2] + ': {point.y}<br/>' + + df.columns[3] + ': <b>{point.value}</b>' + ), + }, + }) + colorAxis = { + 'stops': stops, + 'minColor': '#FFFFFF', + 'maxColor': '#000000', + 'min': 50, + 'max': 2200, + } + else: + if chart.sql_layout == 'series': + # User provides columns (series, x, y) + xaxis_label = df.columns[1] + yaxis_label = df.columns[2] + df[df.columns[2]] = df[df.columns[2]].astype(np.float) + df = df.pivot_table( + index=df.columns[1], + columns=df.columns[0], + values=df.columns[2], aggfunc=np.sum) + else: + # User provides columns (x, y, metric1, metric2, ...) + xaxis_label = df.columns[0] + yaxis_label = 'y' + df.index = df[df.columns[0]] + df = df.sort(df.columns[0]) + del df[df.columns[0]] + for col in df.columns: + df[col] = df[col].astype(np.float) + + for col in df.columns: + series.append({ + 'name': col, + 'data': [ + (k, df[col][k]) + for k in df[col].keys() + if not np.isnan(df[col][k])] + }) + series = [serie for serie in sorted( + series, key=lambda s: s['data'][0][1], reverse=True)] + + if chart_type == "stacked_area": + stacking = "normal" + chart_type = 'area' + elif chart_type == "percent_area": + stacking = "percent" + chart_type = 'area' + else: + stacking = None + hc = { + 'chart': { + 'type': chart_type + }, + 'plotOptions': { + 'series': { + 'marker': { + 'enabled': False + } + }, + 'area': {'stacking': stacking}, + }, + 'title': {'text': ''}, + 'xAxis': { + 'title': {'text': xaxis_label}, + 'type': 'datetime' if chart.x_is_date else None, + }, + 'yAxis': { + 'title': {'text': yaxis_label}, + }, + 'colorAxis': colorAxis, + 'tooltip': { + 'useHTML': True, + 'backgroundColor': None, + 'borderWidth': 0, + }, + 'series': series, + } + + if chart.y_log_scale: + hc['yAxis']['type'] = 'logarithmic' + hc['yAxis']['minorTickInterval'] = 0.1 + if 'min' in hc['yAxis']: + del hc['yAxis']['min'] + + payload['state'] = 'SUCCESS' + payload['hc'] = hc + payload['data'] = data + payload['request_dict'] = request_dict + return wwwutils.json_response(payload) + + @expose('/chart') + @data_profiling_required + def chart(self): + session = settings.Session() + chart_id = request.args.get('chart_id') + embed = request.args.get('embed') + chart = session.query(models.Chart).filter_by(id=chart_id).first() + session.expunge_all() + session.commit() + session.close() + if chart.chart_type == 'para': + return self.render('airflow/para/para.html', chart=chart) + + sql = "" + if chart.show_sql: + sql = Markup(highlight( + chart.sql, + lexers.SqlLexer(), # Lexer call + HtmlFormatter(noclasses=True)) + ) + return self.render( + 'airflow/highchart.html', + chart=chart, + title="Airflow - Chart", + sql=sql, + label=chart.label, + embed=embed) + + @expose('/dag_stats') + #@login_required + def dag_stats(self): + states = [ + State.SUCCESS, + State.RUNNING, + State.FAILED, + State.UPSTREAM_FAILED, + State.UP_FOR_RETRY, + State.QUEUED, + ] + task_ids = [] + dag_ids = [] + for dag in dagbag.dags.values(): + task_ids += dag.task_ids + if not dag.is_subdag: + dag_ids.append(dag.dag_id) + + TI = models.TaskInstance + DagRun = models.DagRun + session = Session() + + LastDagRun = ( + session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) + .group_by(DagRun.dag_id) + .subquery('last_dag_run') + ) + RunningDagRun = ( + session.query(DagRun.dag_id, DagRun.execution_date) + .filter(DagRun.state == State.RUNNING) + .subquery('running_dag_run') + ) + + # Select all task_instances from active dag_runs. + # If no dag_run is active, return task instances from most recent dag_run. + qry = ( + session.query(TI.dag_id, TI.state, sqla.func.count(TI.task_id)) + .outerjoin(RunningDagRun, and_( + RunningDagRun.c.dag_id == TI.dag_id, + RunningDagRun.c.execution_date == TI.execution_date) + ) + .outerjoin(LastDagRun, and_( + LastDagRun.c.dag_id == TI.dag_id, + LastDagRun.c.execution_date == TI.execution_date) + ) + .filter(TI.task_id.in_(task_ids)) + .filter(TI.dag_id.in_(dag_ids)) + .filter(or_( + RunningDagRun.c.dag_id != None, + LastDagRun.c.dag_id != None + )) + .group_by(TI.dag_id, TI.state) + ) + + data = {} + for dag_id, state, count in qry: + if dag_id not in data: + data[dag_id] = {} + data[dag_id][state] = count + session.commit() + session.close() + + payload = {} + for dag in dagbag.dags.values(): + payload[dag.safe_dag_id] = [] + for state in states: + try: + count = data[dag.dag_id][state] + except: + count = 0 + d = { + 'state': state, + 'count': count, + 'dag_id': dag.dag_id, + 'color': State.color(state) + } + payload[dag.safe_dag_id].append(d) + return wwwutils.json_response(payload) + + + @expose('/code') + @login_required + def code(self): + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + title = dag_id + try: + m = importlib.import_module(dag.module_name) + code = inspect.getsource(m) + html_code = highlight( + code, lexers.PythonLexer(), HtmlFormatter(linenos=True)) + except IOError as e: + html_code = str(e) + + return self.render( + 'airflow/dag_code.html', html_code=html_code, dag=dag, title=title, + root=request.args.get('root'), + demo_mode=conf.getboolean('webserver', 'demo_mode')) + + @expose('/dag_details') + @login_required + def dag_details(self): + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + title = "DAG details" + + session = settings.Session() + TI = models.TaskInstance + states = ( + session.query(TI.state, sqla.func.count(TI.dag_id)) + .filter(TI.dag_id == dag_id) + .group_by(TI.state) + .all() + ) + return self.render( + 'airflow/dag_details.html', + dag=dag, title=title, states=states, State=State) + + @current_app.errorhandler(404) + def circles(self): + return render_template( + 'airflow/circles.html', hostname=socket.gethostname()), 404 + + @current_app.errorhandler(500) + def show_traceback(self): + from airflow.utils import asciiart as ascii_ + return render_template( + 'airflow/traceback.html', + hostname=socket.gethostname(), + nukular=ascii_.nukular, + info=traceback.format_exc()), 500 + + @expose('/sandbox') + @login_required + def sandbox(self): + title = "Sandbox Suggested Configuration" + cfg_loc = conf.AIRFLOW_CONFIG + '.sandbox' + f = open(cfg_loc, 'r') + config = f.read() + f.close() + code_html = Markup(highlight( + config, + lexers.IniLexer(), # Lexer call + HtmlFormatter(noclasses=True)) + ) + return self.render( + 'airflow/code.html', + code_html=code_html, title=title, subtitle=cfg_loc) + + @expose('/noaccess') + def noaccess(self): + return self.render('airflow/noaccess.html') + + @expose('/headers') + def headers(self): + d = { + 'headers': {k: v for k, v in request.headers}, + } + if hasattr(current_user, 'is_superuser'): + d['is_superuser'] = current_user.is_superuser() + d['data_profiling'] = current_user.data_profiling() + d['is_anonymous'] = current_user.is_anonymous() + d['is_authenticated'] = current_user.is_authenticated() + if hasattr(current_user, 'username'): + d['username'] = current_user.username + return wwwutils.json_response(d) + + @expose('/pickle_info') + def pickle_info(self): + d = {} + dag_id = request.args.get('dag_id') + dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() + for dag in dags: + if not dag.is_subdag: + d[dag.dag_id] = dag.pickle_info() + return wwwutils.json_response(d) + + @expose('/login', methods=['GET', 'POST']) + def login(self): + return airflow.login.login(self, request) + + @expose('/logout') + def logout(self): + logout_user() + flash('You have been logged out.') + return redirect(url_for('admin.index')) + + @expose('/rendered') + @login_required + @wwwutils.action_logging + def rendered(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + dag = dagbag.get_dag(dag_id) + task = copy.copy(dag.get_task(task_id)) + ti = models.TaskInstance(task=task, execution_date=dttm) + try: + ti.render_templates() + except Exception as e: + flash("Error rendering template: " + str(e), "error") + title = "Rendered Template" + html_dict = {} + for template_field in task.__class__.template_fields: + content = getattr(task, template_field) + if template_field in attr_renderer: + html_dict[template_field] = attr_renderer[template_field](content) + else: + html_dict[template_field] = ( + "<pre><code>" + str(content) + "</pre></code>") + + return self.render( + 'airflow/ti_code.html', + html_dict=html_dict, + dag=dag, + task_id=task_id, + execution_date=execution_date, + form=form, + title=title,) + + @expose('/log') + @login_required + @wwwutils.action_logging + def log(self): + BASE_LOG_FOLDER = os.path.expanduser( + conf.get('core', 'BASE_LOG_FOLDER')) + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + execution_date = request.args.get('execution_date') + dag = dagbag.get_dag(dag_id) + log_relative = "{dag_id}/{task_id}/{execution_date}".format( + **locals()) + loc = os.path.join(BASE_LOG_FOLDER, log_relative) + loc = loc.format(**locals()) + log = "" + TI = models.TaskInstance + session = Session() + dttm = dateutil.parser.parse(execution_date) + ti = session.query(TI).filter( + TI.dag_id == dag_id, TI.task_id == task_id, + TI.execution_date == dttm).first() + dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + + if ti: + host = ti.hostname + log_loaded = False + + if socket.gethostname() == host: + try: + f = open(loc) + log += "".join(f.readlines()) + f.close() + log_loaded = True + except: + log = "*** Local log file not found.\n".format(loc) + else: + WORKER_LOG_SERVER_PORT = \ + conf.get('celery', 'WORKER_LOG_SERVER_PORT') + url = os.path.join( + "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative + ).format(**locals()) + log += "*** Log file isn't local.\n" + log += "*** Fetching here: {url}\n".format(**locals()) + try: + import requests + log += '\n' + requests.get(url).text + log_loaded = True + except: + log += "*** Failed to fetch log file from worker.\n".format( + **locals()) + + if not log_loaded: + # load remote logs + remote_log_base = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') + remote_log = os.path.join(remote_log_base, log_relative) + log += '\n*** Reading remote logs...\n' + + # S3 + if remote_log.startswith('s3:/'): + log += log_utils.S3Log().read(remote_log, return_error=True) + + # GCS + elif remote_log.startswith('gs:/'): + log += log_utils.GCSLog().read(remote_log, return_error=True) + + # unsupported + elif remote_log: + log += '*** Unsupported remote log location.' + + session.commit() + session.close() + + if PY2 and not isinstance(log, unicode): + log = log.decode('utf-8') + + title = "Log" + + return self.render( + 'airflow/ti_code.html', + code=log, dag=dag, title=title, task_id=task_id, + execution_date=execution_date, form=form) + + @expose('/task') + @login_required + @wwwutils.action_logging + def task(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + # Carrying execution_date through, even though it's irrelevant for + # this context + execution_date = request.args.get('execution_date') + dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + dag = dagbag.get_dag(dag_id) + if not dag or task_id not in dag.task_ids: + flash( + "Task [{}.{}] doesn't seem to exist" + " at the moment".format(dag_id, task_id), + "error") + return redirect('/admin/') + task = dag.get_task(task_id) + task = copy.copy(task) + task.resolve_template_files() + + attributes = [] + for attr_name in dir(task): + if not attr_name.startswith('_'): + attr = getattr(task, attr_name) + if type(attr) != type(self.task) and \ + attr_name not in attr_renderer: + attributes.append((attr_name, str(attr))) + + title = "Task Details" + # Color coding the special attributes that are code + special_attrs_rendered = {} + for attr_name in attr_renderer: + if hasattr(task, attr_name): + source = getattr(task, attr_name) + special_attrs_rendered[attr_name] = attr_renderer[attr_name](source) + + return self.render( + 'airflow/task.html', + attributes=attributes, + task_id=task_id, + execution_date=execution_date, + special_attrs_rendered=special_attrs_rendered, + form=form, + dag=dag, title=title) + + @expose('/xcom') + @login_required + @wwwutils.action_logging + def xcom(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + # Carrying execution_date through, even though it's irrelevant for + # this context + execution_date = request.args.get('execution_date') + dttm = dateutil.parser.parse(execution_date) + form = DateTimeForm(data={'execution_date': dttm}) + dag = dagbag.get_dag(dag_id) + if not dag or task_id not in dag.task_ids: + flash( + "Task [{}.{}] doesn't seem to exist" + " at the moment".format(dag_id, task_id), + "error") + return redirect('/admin/') + + session = Session() + xcomlist = session.query(XCom).filter( + XCom.dag_id == dag_id, XCom.task_id == task_id, + XCom.execution_date == dttm).all() + + attributes = [] + for xcom in xcomlist: + if not xcom.key.startswith('_'): + attributes.append((xcom.key, xcom.value)) + + title = "XCom" + return self.render( + 'airflow/xcom.html', + attributes=attributes, + task_id=task_id, + execution_date=execution_date, + form=form, + dag=dag, title=title)\ + + @expose('/run') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def run(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + dag = dagbag.get_dag(dag_id) + task = dag.get_task(task_id) + + execution_date = request.args.get('execution_date') + execution_date = dateutil.parser.parse(execution_date) + force = request.args.get('force') == "true" + deps = request.args.get('deps') == "true" + + try: + from airflow.executors import DEFAULT_EXECUTOR as executor + from airflow.executors import CeleryExecutor + if not isinstance(executor, CeleryExecutor): + flash("Only works with the CeleryExecutor, sorry", "error") + return redirect(origin) + except ImportError: + # in case CeleryExecutor cannot be imported it is not active either + flash("Only works with the CeleryExecutor, sorry", "error") + return redirect(origin) + + ti = models.TaskInstance(task=task, execution_date=execution_date) + executor.start() + executor.queue_task_instance( + ti, force=force, ignore_dependencies=deps) + executor.heartbeat() + flash( + "Sent {} to the message queue, " + "it should start any moment now.".format(ti)) + return redirect(origin) + + @expose('/clear') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def clear(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + dag = dagbag.get_dag(dag_id) + task = dag.get_task(task_id) + + execution_date = request.args.get('execution_date') + execution_date = dateutil.parser.parse(execution_date) + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + + dag = dag.sub_dag( + task_regex=r"^{0}$".format(task_id), + include_downstream=downstream, + include_upstream=upstream) + + end_date = execution_date if not future else None + start_date = execution_date if not past else None + if confirmed: + count = dag.clear( + start_date=start_date, + end_date=end_date) + + flash("{0} task instances have been cleared".format(count)) + return redirect(origin) + else: + tis = dag.clear( + start_date=start_date, + end_date=end_date, + dry_run=True) + if not tis: + flash("No task instances to clear", 'error') + response = redirect(origin) + else: + details = "\n".join([str(t) for t in tis]) + + response = self.render( + 'airflow/confirm.html', + message=( + "Here's the list of task instances you are about " + "to clear:"), + details=details,) + + return response + + @expose('/blocked') + @login_required + def blocked(self): + session = settings.Session() + DR = models.DagRun + dags = ( + session.query(DR.dag_id, sqla.func.count(DR.id)) + .filter(DR.state == State.RUNNING) + .group_by(DR.dag_id) + .all() + ) + payload = [] + for dag_id, active_dag_runs in dags: + max_active_runs = 0 + if dag_id in dagbag.dags: + max_active_runs = dagbag.dags[dag_id].max_active_runs + payload.append({ + 'dag_id': dag_id, + 'active_dag_run': active_dag_runs, + 'max_active_runs': max_active_runs, + }) + return wwwutils.json_response(payload) + + @expose('/success') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def success(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + dag = dagbag.get_dag(dag_id) + task = dag.get_task(task_id) + + execution_date = request.args.get('execution_date') + execution_date = dateutil.parser.parse(execution_date) + confirmed = request.args.get('confirmed') == "true" + upstream = request.args.get('upstream') == "true" + downstream = request.args.get('downstream') == "true" + future = request.args.get('future') == "true" + past = request.args.get('past') == "true" + MAX_PERIODS = 1000 + + # Flagging tasks as successful + session = settings.Session() + task_ids = [task_id] + end_date = ((dag.latest_execution_date or datetime.now()) + if future else execution_date) + + if 'start_date' in dag.default_args: + start_date = dag.default_args['start_date'] + elif dag.start_date: + start_date = dag.start_date + else: + start_date = execution_date + + start_date = execution_date if not past else start_date + + if downstream: + task_ids += [ + t.task_id + for t in task.get_flat_relatives(upstream=False)] + if upstream: + task_ids += [ + t.task_id + for t in task.get_flat_relatives(upstream=True)] + TI = models.TaskInstance + + if dag.schedule_interval == '@once': + dates = [start_date] + else: + dates = dag.date_range(start_date, end_date=end_date) + + tis = session.query(TI).filter( + TI.dag_id == dag_id, + TI.execution_date.in_(dates), + TI.task_id.in_(task_ids)).all() + tis_to_change = session.query(TI).filter( + TI.dag_id == dag_id, + TI.execution_date.in_(dates), + TI.task_id.in_(task_ids), + TI.state != State.SUCCESS).all() + tasks = list(product(task_ids, dates)) + tis_to_create = list( + set(tasks) - + set([(ti.task_id, ti.execution_date) for ti in tis])) + + tis_all_altered = list(chain( + [(ti.task_id, ti.execution_date) for ti in tis_to_change], + tis_to_create)) + + if len(tis_all_altered) > MAX_PERIODS: + flash("Too many tasks at once (>{0})".format( + MAX_PERIODS), 'error') + return redirect(origin) + + if confirmed: + for ti in tis_to_change: + ti.state = State.SUCCESS + session.commit() + + for task_id, task_execution_date in tis_to_create: + ti = TI( + task=dag.get_task(task_id), + execution_date=task_execution_date, + state=State.SUCCESS) + session.add(ti) + session.commit() + + session.commit() + session.close() + flash("Marked success on {} task instances".format( + len(tis_all_altered))) + + return redirect(origin) + else: + if not tis_all_altered: + flash("No task instances to mark as successful", 'error') + response = redirect(origin) + else: + tis = [] + for task_id, task_execution_date in tis_all_altered: + tis.append(TI( + task=dag.get_task(task_id), + execution_date=task_execution_date, + state=State.SUCCESS)) + details = "\n".join([str(t) for t in tis]) + + response = self.render( + 'airflow/confirm.html', + message=( + "Here's the list of task instances you are about " + "to mark as successful:"), + details=details,) + return response + + @expose('/tree') + @login_required + @wwwutils.gzipped + @wwwutils.action_logging + def tree(self): + dag_id = request.args.get('dag_id') + blur = conf.getboolean('webserver', 'demo_mode') + dag = dagbag.get_dag(dag_id) + root = request.args.get('root') + if root: + dag = dag.sub_dag( + task_regex=root, + include_downstream=False, + include_upstream=True) + + session = settings.Session() + + base_date = request.args.get('base_date') + num_runs = request.args.get('num_runs') + num_runs = int(num_runs) if num_runs else 25 + + if base_date: + base_date = dateutil.parser.parse(base_date) + else: + base_date = dag.latest_execution_date or datetime.now() + + dates = dag.date_range(base_date, num=-abs(num_runs)) + min_date = dates[0] if dates else datetime(2000, 1, 1) + + DR = models.DagRun + dag_runs = ( + session.query(DR) + .filter( + DR.dag_id==dag.dag_id, + DR.execution_date<=base_date, + DR.execution_date>=min_date) + .all() + ) + dag_runs = { + dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} + + tis = dag.get_task_instances( + session, start_date=min_date, end_date=base_date) + dates = sorted(list({ti.execution_date for ti in tis})) + max_date = max([ti.execution_date for ti in tis]) if dates else None + task_instances = {} + for ti in tis: + tid = alchemy_to_dict(ti) + dr = dag_runs.get(ti.execution_date) + tid['external_trigger'] = dr['external_trigger'] if dr else False + task_instances[(ti.task_id, ti.execution_date)] = tid + + expanded = [] + # The default recursion traces every path so that tree view has full + # expand/collapse functionality. After 5,000 nodes we stop and fall + # back on a quick DFS search for performance. See PR #320. + node_count = [0] + node_limit = 5000 / max(1, len(dag.roots)) + + def recurse_nodes(task, visited): + visited.add(task) + node_count[0] += 1 + + children = [ + recurse_nodes(t, visited) for t in task.upstream_list + if node_count[0] < node_limit or t not in visited] + + # D3 tree uses children vs _children to define what is + # expanded or not. The following block makes it such that + # repeated nodes are collapsed by default. + children_key = 'children' + if task.task_id not in expanded: + expanded.append(task.task_id) + elif children: + children_key = "_children" + + return { + 'name': task.task_id, + 'instances': [ + task_instances.get((task.task_id, d)) or { + 'execution_date': d.isoformat(), + 'task_id': task.task_id + } + for d in dates], + children_key: children, + 'num_dep': len(task.upstream_list), + 'operator': task.task_type, + 'retries': task.retries, + 'owner': task.owner, + 'start_date': task.start_date, + 'end_date': task.end_date, + 'depends_on_past': task.depends_on_past, + 'ui_color': task.ui_color, + } + data = { + 'name': '[DAG]', + 'children': [recurse_nodes(t, set()) for t in dag.roots], + 'instances': [ + dag_runs.get(d) or {'execution_date': d.isoformat()} + for d in dates], + } + + data = json.dumps(data, indent=4, default=json_ser) + session.commit() + session.close() + + form = DateTimeWithNumRunsForm(data={'base_date': max_date, + 'num_runs': num_runs}) + return self.render( + 'airflow/tree.html', + operators=sorted( + list(set([op.__class__ for op in dag.tasks])), + key=lambda x: x.__name__ + ), + root=root, + form=form, + dag=dag, data=data, blur=blur) + + @expose('/graph') + @login_required + @wwwutils.gzipped + @wwwutils.action_logging + def graph(self): + session = settings.Session() + dag_id = request.args.get('dag_id') + blur = conf.getboolean('webserver', 'demo_mode') + arrange = request.args.get('arrange', "LR") + dag = dagbag.get_dag(dag_id) + if dag_id not in dagbag.dags: + flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") + return redirect('/admin/') + + root = request.args.get('root') + if root: + dag = dag.sub_dag( + task_regex=root, + include_upstream=True, + include_downstream=False) + + nodes = [] + edges = [] + for task in dag.tasks: + nodes.append({ + 'id': task.task_id, + 'value': { + 'label': task.task_id, + 'labelStyle': "fill:{0};".format(task.ui_fgcolor), + 'style': "fill:{0};".format(task.ui_color), + } + }) + + def get_upstream(task): + for t in task.upstream_list: + edge = { + 'u': t.task_id, + 'v': task.task_id, + } + if edge not in edges: + edges.append(edge) + get_upstream(t) + + for t in dag.roots: + get_upstream(t) + + dttm = request.args.get('execution_date') + if dttm: + dttm = dateutil.parser.parse(dttm) + else: + dttm = dag.latest_execution_date or datetime.now().date() + + DR = models.DagRun + drs = ( + session.query(DR) + .filter_by(dag_id=dag_id) + .order_by(desc(DR.execution_date)).all() + ) + dr_choices = [] + dr_state = None + for dr in drs: + dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) + if dttm == dr.execution_date: + dr_state = dr.state + + class GraphForm(Form): + execution_date = SelectField("DAG run", choices=dr_choices) + arrange = SelectField("Layout", choices=( + ('LR', "Left->Right"), + ('RL', "Right->Left"), + ('TB', "Top->Bottom"), + ('BT', "Bottom->Top"), + )) + form = GraphForm( + data={'execution_date': dttm.isoformat(), 'arrange': arrange}) + + task_instances = { + ti.task_id: alchemy_to_dict(ti) + for ti in dag.get_task_instances(session, dttm, dttm)} + tasks = { + t.task_id: { + 'dag_id': t.dag_id, + 'task_type': t.task_type, + } + for t in dag.tasks} + if not tasks: + flash("No tasks found", "error") + session.commit() + session.close() + doc_md = markdown.markdown(dag.doc_md) if hasattr(dag, 'doc_md') else '' + + return self.render( + 'airflow/graph.html', + dag=dag, + form=form, + width=request.args.get('width', "100%"), + height=request.args.get('height', "800"), + execution_date=dttm.isoformat(), + state_token=state_token(dr_state), + doc_md=doc_md, + arrange=arrange, + operators=sorted( + list(set([op.__class__ for op in dag.tasks])), + key=lambda x: x.__name__ + ), + blur=blur, + root=root or '', + task_instances=json.dumps(task_instances, indent=2), + tasks=json.dumps(tasks, indent=2), + nodes=json.dumps(nodes, indent=2), + edges=json.dumps(edges, indent=2),) + + @expose('/duration') + @login_required + @wwwutils.action_logging + def duration(self): + from nvd3 import lineChart + import time + session = settings.Session() + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + base_date = request.args.get('base_date') + num_runs = request.args.get('num_runs') + num_runs = int(num_runs) if num_runs else 25 + + if base_date: + base_date = dateutil.parser.parse(base_date) + else: + base_date = dag.latest_execution_date or datetime.now() + + dates = dag.date_range(base_date, num=-abs(num_runs)) + min_date = dates[0] if dates else datetime(2000, 1, 1) + + root = request.args.get('root') + if root: + dag = dag.sub_dag( + task_regex=root, + include_upstream=True, + include_downstream=False) + + chart = lineChart(name="lineChart", x_is_date=True, height=750, width=600) + for task in dag.tasks: + y = [] + x = [] + for ti in task.get_task_instances(session, start_date=min_date, + end_date=base_date): + if ti.duration: + dttm = int(time.mktime(ti.execution_date.timetuple())) * 1000 + x.append(dttm) + y.append(float(ti.duration) / (60*60)) + if x: + chart.add_serie(name=task.task_id, x=x, y=y) + + tis = dag.get_task_instances( + session, start_date=min_date, end_date=base_date) + dates = sorted(list({ti.execution_date for ti in tis})) + max_date = max([ti.execution_date for ti in tis]) if dates else None + + session.commit() + session.close() + + form = DateTimeWithNumRunsForm(data={'base_date': max_date, + 'num_runs': num_runs}) + chart.buildhtml() + return self.render( + 'airflow/chart.html', + dag=dag, + demo_mode=conf.getboolean('webserver', 'demo_mode'), + root=root, + form=form, + chart=chart, + ) + + @expose('/landing_times') + @login_required + @wwwutils.action_logging + def landing_times(self): + session = settings.Session() + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + base_date = request.args.get('base_date') + num_runs = request.args.get('num_runs') + num_runs = int(num_runs) if num_runs else 25 + + if base_date: + base_date = dateutil.parser.parse(base_date) + else: + base_date = dag.latest_execution_date or datetime.now() + + dates = dag.date_range(base_date, num=-abs(num_runs)) + min_date = dates[0] if dates else datetime(2000, 1, 1) + + root = request.args.get('root') + if root: + dag = dag.sub_dag( + task_regex=root, + include_upstream=True, + include_downstream=False) + + all_data = [] + for task in dag.tasks: + data = [] + for ti in task.get_task_instances(session, start_date=min_date, + end_date=base_date): + if ti.end_date: + ts = ti.execution_date + if dag.schedule_interval: + ts = dag.following_schedule(ts) + secs = old_div((ti.end_date - ts).total_seconds(), 60*60) + data.append([ti.execution_date.isoformat(), secs]) + all_data.append({'data': data, 'name': task.task_id}) + + tis = dag.get_task_instances( + session, start_date=min_date, end_date=base_date) + dates = sorted(list({ti.execution_date for ti in tis})) + max_date = max([ti.execution_date for ti in tis]) if dates else None + + session.commit() + session.close() + + form = DateTimeWithNumRunsForm(data={'base_date': max_date, + 'num_runs': num_runs}) + return self.render( + 'airflow/chart.html', + dag=dag, + data=json.dumps(all_data), + height="700px", + chart_options={'yAxis': {'title': {'text': 'hours after 00:00'}}}, + demo_mode=conf.getboolean('webserver', 'demo_mode'), + root=root, + form=form, + ) + + @expose('/paused') + @login_required + @wwwutils.action_logging + def paused(self): + DagModel = models.DagModel + dag_id = request.args.get('dag_id') + session = settings.Session() + orm_dag = session.query( + DagModel).filter(DagModel.dag_id == dag_id).first() + if request.args.get('is_paused') == 'false': + orm_dag.is_paused = True + else: + orm_dag.is_paused = False + session.merge(orm_dag) + session.commit() + session.close() + + dagbag.get_dag(dag_id) + return "OK" + + @expose('/refresh') + @login_required + @wwwutils.action_logging + def refresh(self): + DagModel = models.DagModel + dag_id = request.args.get('dag_id') + session = settings.Session() + orm_dag = session.query( + DagModel).filter(DagModel.dag_id == dag_id).first() + + if orm_dag: + orm_dag.last_expired = datetime.now() + session.merge(orm_dag) + session.commit() + session.close() + + dagbag.get_dag(dag_id) + flash("DAG [{}] is now fresh as a daisy".format(dag_id)) + return redirect('/') + + @expose('/refresh_all') + @login_required + @wwwutils.action_logging + def refresh_all(self): + dagbag.collect_dags(only_if_updated=False) + flash("All DAGs are now up to date") + return redirect('/') + + @expose('/gantt') + @login_required + @wwwutils.action_logging + def gantt(self): + + session = settings.Session() + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + demo_mode = conf.getboolean('webserver', 'demo_mode') + + root = request.args.get('root') + if root: + dag = dag.sub_dag( + task_regex=root, + include_upstream=True, + include_downstream=False) + + dttm = request.args.get('execution_date') + if dttm: + dttm = dateutil.parser.parse(dttm) + else: + dttm = dag.latest_execution_date or datetime.now().date() + + form = DateTimeForm(data={'execution_date': dttm}) + + tis = [ + ti + for ti in dag.get_task_instances(session, dttm, dttm) + if ti.start_date] + tis = sorted(tis, key=lambda ti: ti.start_date) + tasks = [] + data = [] + for i, ti in enumerate(tis): + end_date = ti.end_date or datetime.now() + tasks += [ti.task_id] + color = State.color(ti.state) + data.append({ + 'x': i, + 'low': int(ti.start_date.strftime('%s')) * 1000, + 'high': int(end_date.strftime('%s')) * 1000, + 'color': color, + }) + height = (len(tis) * 25) + 50 + session.commit() + session.close() + + hc = { + 'chart': { + 'type': 'columnrange', + 'inverted': True, + 'height': height, + }, + 'xAxis': {'categories': tasks, 'alternateGridColor': '#FAFAFA'}, + 'yAxis': {'type': 'datetime'}, + 'title': { + 'text': None + }, + 'plotOptions': { + 'series': { + 'cursor': 'pointer', + 'minPointLength': 4, + }, + }, + 'legend': { + 'enabled': False + }, + 'series': [{ + 'data': data + }] + } + return self.render( + 'airflow/gantt.html', + dag=dag, + execution_date=dttm.isoformat(), + form=form, + hc=json.dumps(hc, indent=4), + height=height, + demo_mode=demo_mode, + root=root, + ) + + @expose('/object/task_instances') + @login_required + @wwwutils.action_logging + def task_instances(self): + session = settings.Session() + dag_id = request.args.get('dag_id') + dag = dagbag.get_dag(dag_id) + + dttm = request.args.get('execution_date') + if dttm: + dttm = dateutil.parser.parse(dttm) + else: + return ("Error: Invalid execution_date") + + task_instances = { + ti.task_id: alchemy_to_dict(ti) + for ti in dag.get_task_instances(session, dttm, dttm)} + + return json.dumps(task_instances) + + @expose('/variables/<form>', methods=["GET", "POST"]) + @login_required + @wwwutils.action_logging + def variables(self, form): + try: + if request.method == 'POST': + data = request.json + if data: + session = settings.Session() + var = models.Variable(key=form, val=json.dumps(data)) + session.add(var) + session.commit() + return "" + else: + return self.render( + 'airflow/variables/{}.html'.format(form) + ) + except: + return ("Error: form airflow/variables/{}.html " + "not found.").format(form), 404 + + +class HomeView(AdminIndexView): + @expose("/") + @login_required + def index(self): + session = Session() + DM = models.DagModel + qry = None + # filter the dags if filter_by_owner and current user is not superuser + do_filter = FILTER_BY_OWNER and (not current_user.is_superuser()) + if do_filter: + qry = ( + session.query(DM) + .filter( + ~DM.is_subdag, DM.is_active, + DM.owners == current_user.username) + .all() + ) + else: + qry = session.query(DM).filter(~DM.is_subdag, DM.is_active).all() + orm_dags = {dag.dag_id: dag for dag in qry} + import_errors = session.query(models.ImportError).all() + for ie in import_errors: + flash( + "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), + "error") + session.expunge_all() + session.commit() + session.close() + dags = dagbag.dags.values() + if do_filter: + dags = { + dag.dag_id: dag + for dag in dags + if ( + dag.owner == current_user.username and (not dag.parent_dag) + ) + } + else: + dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag} + all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys())) + return self.render( + 'airflow/dags.html', + dags=dags, + orm_dags=orm_dags, + all_dag_ids=all_dag_ids) + + +class QueryView(wwwutils.DataProfilingMixin, BaseView): + @expose('/') + @wwwutils.gzipped + def query(self): + session = settings.Session() + dbs = session.query(models.Connection).order_by( + models.Connection.conn_id).all() + session.expunge_all() + db_choices = list( + ((db.conn_id, db.conn_id) for db in dbs if db.get_hook())) + conn_id_str = request.args.get('conn_id') + csv = request.args.get('csv') == "true" + sql = request.args.get('sql') + + class QueryForm(Form): + conn_id = SelectField("Layout", choices=db_choices) + sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget()) + data = { + 'conn_id': conn_id_str, + 'sql': sql, + } + results = None + has_data = False + error = False + if conn_id_str: + db = [db for db in dbs if db.conn_id == conn_id_str][0] + hook = db.get_hook() + try: + df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type)) + # df = hook.get_pandas_df(sql) + has_data = len(df) > 0 + df = df.fillna('') + results = df.to_html( + classes=[ + 'table', 'table-bordered', 'table-striped', 'no-wrap'], + index=False, + na_rep='', + ) if has_data else '' + except Exception as e: + flash(str(e), 'error') + error = True + + if has_data and len(df) == QUERY_LIMIT: + flash( + "Query output truncated at " + str(QUERY_LIMIT) + + " rows", 'info') + + if not has_data and error: + flash('No data', 'error') + + if csv: + return Response( + response=df.to_csv(index=False), + status=200, + mimetype="application/text") + + form = QueryForm(request.form, data=data) + session.commit() + session.close() + return self.render( + 'airflow/query.html', form=form, + title="Ad Hoc Query", + results=results or '', + has_data=has_data) + + +class AirflowModelView(ModelView): + list_template = 'airflow/model_list.html' + edit_template = 'airflow/model_edit.html' + create_template = 'airflow/model_create.html' + column_display_actions = True + page_size = 500 + + +class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView): + """ + Modifying the base ModelView class for non edit, browse only operations + """ + named_filter_urls = True + can_create = False + can_edit = False + can_delete = False + column_display_pk = True + + +class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView): + column_list = ('pool', 'slots', 'used_slots', 'queued_slots') + column_formatters = dict( + pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots) + named_filter_urls = True + + +class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly): + verbose_name_plural = "SLA misses" + verbose_name = "SLA miss" + column_list = ( + 'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp') + column_formatters = dict( + task_id=task_instance_link, + execution_date=datetime_f, + timestamp=datetime_f, + dag_id=dag_link) + named_filter_urls = True + column_searchable_list = ('dag_id', 'task_id',) + column_filters = ( + 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date') + form_widget_args = { + 'email_sent': {'disabled': True}, + 'timestamp': {'disabled': True}, + } + +class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): + verbose_name = "chart" + verbose_name_plural = "charts" + form_columns = ( + 'label', + 'owner', + 'conn_id', + 'chart_type', + 'show_datatable', + 'x_is_date', + 'y_log_scale', + 'show_sql', + 'height', + 'sql_layout', + 'sql', + 'default_params',) + column_list = ( + 'label', 'conn_id', 'chart_type', 'owner', 'last_modified',) + column_formatters = dict(label=label_link, last_modified=datetime_f) + column_default_sort = ('last_modified', True) + create_template = 'airflow/chart/create.html' + edit_template = 'airflow/chart/edit.html' + column_filters = ('label', 'owner.username', 'conn_id') + column_searchable_list = ('owner.username', 'label', 'sql') + column_descriptions = { + 'label': "Can include {{ templated_fields }} and {{ macros }}", + 'chart_type': "The type of chart to be displayed", + 'sql': "Can include {{ templated_fields }} and {{ macros }}.", + 'height': "Height of the chart, in pixels.", + 'conn_id': "Source database to run the query against", + 'x_is_date': ( + "Whether the X axis should be casted as a date field. Expect most " + "intelligible date formats to get casted properly." + ), + 'owner': ( + "The chart's owner, mostly used for reference and filtering in " + "the list view." + ), + 'show_datatable': + "Whether to display an interactive data table under the chart.", + 'default_params': ( + 'A dictionary of {"key": "values",} that define what the ' + 'templated fields (parameters) values should be by default. ' + 'To be valid, it needs to "eval" as a Python dict. ' + 'The key values will show up in the url\'s querystring ' + 'and can be altered there.' + ), + 'show_sql': "Whether to display the SQL statement as a collapsible " + "section in the chart page.", + 'y_log_scale': "Whether to use a log scale for the Y axis.", + 'sql_layout': ( + "Defines the layout of the SQL that the application should " + "expect. Depending on the tables you are sourcing from, it may " + "make more sense to pivot / unpivot the metrics." + ), + } + column_labels = { + 'sql': "SQL", + 'height': "Chart Height", + 'sql_layout': "SQL Layout", + 'show_sql': "Display the SQL Statement", + 'default_params': "Default Parameters", + } + form_choices = { + 'chart_type': [ + ('line', 'Line Chart'), + ('spline', 'Spline Chart'), + ('bar', 'Bar Chart'), + ('para', 'Parallel Coordinates'), + ('column', 'Column Chart'), + ('area', 'Overlapping Area Chart'), + ('stacked_area', 'Stacked Area Chart'), + ('percent_area', 'Percent Area Chart'), + ('heatmap', 'Heatmap'), + ('datatable', 'No chart, data table only'), + ], + 'sql_layout': [ + ('series', 'SELECT series, x, y FROM ...'), + ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'), + ], + 'conn_id': [ + (c.conn_id, c.conn_id) + for c in ( + Session().query(models.Connection.conn_id) + .group_by(models.Connection.conn_id) + ) + ] + } + + def on_model_change(self, form, model, is_created=True): + if model.iteration_no is None: + model.iteration_no = 0 + else: + model.iteration_no += 1 + if not model.user_id and current_user and hasattr(current_user, 'id'): + model.user_id = current_user.id + model.last_modified = datetime.now() + + +class KnowEventView(wwwutils.DataProfilingMixin, AirflowModelView): + verbose_name = "known event" + verbose_name_plural = "known events" + form_columns = ( + 'label', + 'event_type', + 'start_date', + 'end_date', + 'reported_by', + 'description') + column_list = ( + 'label', 'event_type', 'start_date', 'end_date', 'reported_by') + column_default_sort = ("start_date", True) + + +class KnowEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView): + pass + +''' +# For debugging / troubleshooting +mv = KnowEventTypeView( + models.KnownEventType, + Session, name="Known Event Types", category="Manage") +admin.add_view(mv) +class DagPickleView(SuperUserMixin, ModelView): + pass +mv = DagPickleView( + models.DagPickle, + Session, name="Pickles", category="Manage") +admin.add_view(mv) +''' + + +class VariableView(wwwutils.LoginMixin, AirflowModelView): + verbose_name = "Variable" + verbose_name_plural = "Variables" + form_columns = ( + 'key', + 'val', + ) + column_list = ('key', 'is_encrypted',) + column_filters = ('key', 'val') + column_searchable_list = ('key', 'val') + form_widget_args = { + 'is_encrypted': {'disabled': True}, + 'val': { + 'rows': 20, + } + } + + +class JobModelView(ModelViewOnly): + verbose_name_plural = "jobs" + verbose_name = "job" + column_default_sort = ('start_date', True) + column_filters = ( + 'job_type', 'dag_id', 'state', + 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat') + column_formatters = dict( + start_date=datetime_f, + end_date=datetime_f, + hostname=nobr_f, + state=state_f, + latest_heartbeat=datetime_f) + + +class DagRunModelView(ModelViewOnly): + verbose_name_plural = "DAG Runs" + can_delete = True + can_edit = True + can_create = True + column_editable_list = ('state',) + verbose_name = "dag run" + column_default_sort = ('execution_date', True) + form_choices = { + 'state': [ + ('success', 'success'), + ('running', 'running'), + ('failed', 'failed'), + ], + } + column_list = ( + 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') + column_filters = column_list + column_searchable_list = ('dag_id', 'state', 'run_id') + column_formatters = dict( + execution_date=datetime_f, + state=state_f, + start_date=datetime_f, + dag_id=dag_link) + + @action('set_running', "Set state to 'running'", None) + def action_set_running(self, ids): + self.set_dagrun_state(ids, State.RUNNING) + + @action('set_failed', "Set state to 'failed'", None) + def action_set_failed(self, ids): + self.set_dagrun_state(ids, State.FAILED) + + @action('set_success', "Set state to 'success'", None) + def action_set_success(self, ids): + self.set_dagrun_state(ids, State.SUCCESS) + + @provide_session + def set_dagrun_state(self, ids, target_state, session=None): + try: + DR = models.DagRun + count = 0 + for dr in session.query(DR).filter(DR.id.in_(ids)).all(): + count += 1 + dr.state = target_state + if target_state == State.RUNNING: + dr.start_date = datetime.now() + else: + dr.end_date = datetime.now() + session.commit() + flash( + "{count} dag runs were set to '{target_state}'".format(**locals())) + except Exception as ex: + if not self.handle_view_exception(ex): + raise Exception("Ooops") + flash('Failed to set state', 'error') + + +class LogModelView(ModelViewOnly): + verbose_name_plural = "logs" + verbose_name = "log" + column_default_sort = ('dttm', True) + column_filters = ('dag_id', 'task_id', 'execution_date') + column_formatters = dict( + dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link) + + +class TaskInstanceModelView(ModelViewOnly): + verbose_name_plural = "task instances" + verbose_name = "task instance" + column_filters = ( + 'state', 'dag_id', 'task_id', 'execution_date', 'hostname', + 'queue', 'pool', 'operator', 'start_date', 'end_date') + named_filter_urls = True + column_formatters = dict( + log=log_link, task_id=task_instance_link, + hostname=nobr_f, + state=state_f, + execution_date=datetime_f, + start_date=datetime_f, + end_date=datetime_f, + queued_dttm=datetime_f, + dag_id=dag_link, duration=duration_f) + column_searchable_list = ('dag_id', 'task_id', 'state') + column_default_sort = ('start_date', True) + form_choices = { + 'state': [ + ('success', 'success'), + ('running', 'running'), + ('failed', 'failed'), + ], + } + column_list = ( + 'state', 'dag_id', 'task_id', 'execution_date', 'operator', + 'start_date', 'end_date', 'duration', 'job_id', 'hostname', + 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', + 'pool', 'log') + can_delete = True + page_size = 500 + + @action('set_running', "Set state to 'running'", None) + def action_set_running(self, ids): + self.set_task_instance_state(ids, State.RUNNING) + + @action('set_failed', "Set state to 'failed'", None) + def action_set_failed(self, ids): + self.set_task_instance_state(ids, State.FAILED) + + @action('set_success', "Set state to 'success'", None) + def action_set_success(self, ids): + self.set_task_instance_state(ids, State.SUCCESS) + + @action('set_retry', "Set state to 'up_for_retry'", None) + def action_set_retry(self, ids): + self.set_task_instance_state(ids, State.UP_FOR_RETRY) + + @provide_session + def set_task_instance_state(self, ids, target_state, session=None): + try: + TI = models.TaskInstance + for count, id in enumerate(ids): + task_id, dag_id, execution_date = id.split(',') + execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') + ti = session.query(TI).filter(TI.task_id == task_id, + TI.dag_id == dag_id, + TI.execution_date == execution_date).one() + ti.state = target_state + count += 1 + session.commit() + flash( + "{count} task instances were set to '{target_state}'".format(**locals())) + except Exception as ex: + if not self.handle_view_exception(ex): + raise Exception("Ooops") + flash('Failed to set state', 'error') + + +class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): + create_template = 'airflow/conn_create.html' + edit_template = 'airflow/conn_edit.html' + list_template = 'airflow/conn_list.html' + form_columns = ( + 'conn_id', + 'conn_type', + 'host', + 'schema', + 'login', + 'password', + 'port', + 'extra', + 'extra__jdbc__drv_path', + 'extra__jdbc__drv_clsname', + 'extra__google_cloud_platform__project', + 'extra__google_cloud_platform__key_path', + 'extra__google_cloud_platform__service_account', + 'extra__google_cloud_platform__scope', + ) + verbose_name = "Connection" + verbose_name_plural = "Connections" + column_default_sort = ('conn_id', False) + column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',) + form_overrides = dict(_password=PasswordField) + form_widget_args = { + 'is_extra_encrypted': {'disabled': True}, + 'is_encrypted': {'disabled': True}, + } + # Used to customized the form, the forms elements get rendered + # and results are stored in the extra field as json. All of these + # need to be prefixed with extra__ and then the conn_type ___ as in + # extra__{conn_type}__name. You can also hide form elements and rename + # others from the connection_form.js file + form_extra_fields = { + 'extra__jdbc__drv_path' : StringField('Driver Path'), + 'extra__jdbc__drv_clsname': StringField('Driver Class'), + 'extra__google_cloud_platform__project': StringField('Project'), + 'extra__google_cloud_platform__key_path': StringField('Keyfile Path'), + 'extra__google_cloud_platform__service_account': StringField('Service Account'), + 'extra__google_cloud_platform__scope': StringField('Scopes (comma seperated)'), + + } + form_choices = { + 'conn_type': [ + ('bigquery', 'BigQuery',), + ('datastore', 'Google Datastore'), + ('ftp', 'FTP',), + ('google_cloud_storage', 'Google Cloud Storage'), + ('google_cloud_platform', 'Google Cloud Platform'), + ('hdfs', 'HDFS',), + ('http', 'HTTP',), + ('hive_cli', 'Hive Client Wrapper',), + ('hive_metastore', 'Hive Metastore Thrift',), + ('hiveserver2', 'Hive Server 2 Thrift',), + ('jdbc', 'Jdbc Connection',), + ('mysql', 'MySQL',), + ('postgres', 'Postgres',), + ('oracle', 'Oracle',), + ('vertica', 'Vertica',), + ('presto', 'Presto',), + ('s3', 'S3',), + ('samba', 'Samba',), + ('sqlite', 'Sqlite',), + ('ssh', 'SSH',), + ('cloudant', 'IBM Cloudant',), + ('mssql', 'Microsoft SQL Server'), + ('mesos_framework-id', 'Mesos Framework ID'), + ] + } + + def on_model_change(self, form, model, is_created): + formdata = form.data + if formdata['conn_type'] in ['jdbc', 'google_cloud_platform']: + extra = { + key:formdata[key] + for key in self.form_extra_fields.keys() if key in formdata} + model.extra = json.dumps(extra) + + @classmethod + def alert_fernet_key(cls): + return conf.get('core', 'fernet_key') is None + + @classmethod + def is_secure(self): + """ + Used to display a message in the Connection list view making it clear + that the passwords and `extra` field can't be encrypted. + """ + is_secure = False + try: + import cryptography + conf.get('core', 'fernet_key') + is_secure = True + except: + pass + return is_secure + + def on_form_prefill(self, form, id): + try: + d = json.loads(form.data.get('extra', '{}')) + except Exception as e: + d = {} + + for field in list(self.form_extra_fields.keys()): + value = d.get(field, '') + if value: + field = getattr(form, field) + field.data = value + + +class UserModelView(wwwutils.SuperUserMixin, AirflowModelView): + verbose_name = "User" + verbose_name_plural = "Users" + column_default_sort = 'username' + + +class ConfigurationView(wwwutils.SuperUserMixin, BaseView): + @expose('/') + def conf(self): + raw = request.args.get('raw') == "true" + title = "Airflow Configuration" + subtitle = conf.AIRFLOW_CONFIG + if conf.getboolean("webserver", "expose_config"): + with open(conf.AIRFLOW_CONFIG, 'r') as f: + config = f.read() + else: + config = ( + "# You Airflow administrator chose not to expose the " + "configuration, most likely for security reasons.") + if raw: + return Response( + response=config, + status=200, + mimetype="application/text") + else: + code_html = Markup(highlight( + config, + lexers.IniLexer(), # Lexer call + HtmlFormatter(noclasses=True)) + ) + return self.render( + 'airflow/code.html', + pre_subtitle=settings.HEADER + " v" + airflow.__version__, + code_html=code_html, title=title, subtitle=subtitle) + + +class DagModelView(wwwutils.SuperUserMixin, ModelView): + column_list = ('dag_id', 'owners') + column_editable_list = ('is_paused',) + form_excluded_columns = ('is_subdag', 'is_active') + column_searchable_list = ('dag_id',) + column_filters = ( + 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag', + 'last_scheduler_run', 'last_expired') + form_widget_args = { + 'last_scheduler_run': {'disabled': True}, + 'fileloc': {'disabled': True}, + 'is_paused': {'disabled': True}, + 'last_pickled': {'disabled': True}, + 'pickle_id': {'disabled': True}, + 'last_loaded': {'disabled': True}, + 'last_expired': {'disabled': True}, + 'pickle_size': {'disabled': True}, + 'scheduler_lock': {'disabled': True}, + 'owners': {'disabled': True}, + } + column_formatters = dict( + dag_id=dag_link, + ) + can_delete = False + can_create = False + page_size = 50 + list_template = 'airflow/list_dags.html' + named_filter_urls = True + + def get_query(self): + """ + Default filters for model + """ + return ( + super(DagModelView, self) + .get_query() + .filter(or_(models.DagModel.is_active, models.DagModel.is_paused)) + .filter(~models.DagModel.is_subdag) + ) + + def get_count_query(self): + """ + Default filters for model + """ + return ( + super(DagModelView, self) + .get_count_query() + .filter(models.DagModel.is_active) + .filter(~models.DagModel.is_subdag) + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/bin/airflow ---------------------------------------------------------------------- diff --git a/airflow/bin/airflow b/airflow/bin/airflow index 80f1135..0598596 100755 --- a/airflow/bin/airflow +++ b/airflow/bin/airflow @@ -1,4 +1,17 @@ #!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import logging import os from airflow import configuration http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 2e88fa9..ca63443 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import range from airflow import configuration http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a460081/airflow/executors/celery_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index de56baf..04414fb 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -1,3 +1,17 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from builtins import object import logging import subprocess