Repository: incubator-airflow Updated Branches: refs/heads/master 2c931abb3 -> a89d1015c
[AIRFLOW-356][AIRFLOW-355][AIRFLOW-354] Replace nobr, enable DAG only exists locally message, change edit DAG icon Addresses the following issues: - [https://issues.apache.org/jira/browse/AIRFLOW-356](https://issues.apache.org/jira/browse/AIRFLOW-356) - [https://issues.apache.org/jira/browse/AIRFLOW-355](https://issues.apache.org/jira/browse/AIRFLOW-355) - [https://issues.apache.org/jira/browse/AIRFLOW-354](https://issues.apache.org/jira/browse/AIRFLOW-354) - Replace `<nobr>` with `flexbox` - "This DAG seems to be existing only locally" now shows up - Change edit DAG icon from info to edit - Rename `dttm` variable to `file_last_changed_on_disk` - Rename `dags` variable to `webserver_dags` - Adds a comment clarifying what `self.file_last_changed` is - Clarifies what the `dag.last_expired` column represents - Refactors some previously very nested logic in `views.py` and adds comments - Properly indents `dags.html` and adds comments to it - Edit DAG icon changed - Home page now sort of responsive, no longer fixed width - User will occasionally see "This DAG seems to be existing only locally" message - Verify that edit dag button is now an edit icon and click on it - Resized home page, check that last column does not wrap ![image](https://cloud.githubusercontent.com/assets/130362/17126889/2e7adb12-52b6-11e6-9a18-b31e424e4be8.png) Clean up html, replace nobr with flexbox Refactor HomeView Rename variables and update comments Closes #1678 from zodiac/xuanji_refactor Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a89d1015 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a89d1015 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a89d1015 Branch: refs/heads/master Commit: a89d1015ceb6abccd53572bf38a44544d0182ca9 Parents: 2c931ab Author: Li Xuanji <xua...@gmail.com> Authored: Tue Jul 26 10:30:02 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Jul 26 10:30:11 2016 -0700 ---------------------------------------------------------------------- airflow/models.py | 20 ++--- airflow/www/templates/airflow/dags.html | 109 +++++++++++++++++---------- airflow/www/views.py | 82 ++++++++++---------- 3 files changed, 126 insertions(+), 85 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a89d1015/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 4e6eb0f..f589b3e 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -163,6 +163,7 @@ class DagBag(LoggingMixin): self.dag_folder = dag_folder self.dags = {} self.sync_to_db = sync_to_db + # the file's last modified timestamp when we last read it self.file_last_changed = {} self.executor = executor self.import_errors = {} @@ -192,7 +193,7 @@ class DagBag(LoggingMixin): if dag.is_subdag: root_dag_id = dag.parent_dag.dag_id - # If the root_dag_id is absent or expired + # If the dag corresponding to root_dag_id is absent or expired orm_dag = DagModel.get_current(root_dag_id) if orm_dag and ( root_dag_id not in self.dags or @@ -201,10 +202,11 @@ class DagBag(LoggingMixin): dag.last_loaded < orm_dag.last_expired ) ): - # Reprocessing source file + # Reprocess source file found_dags = self.process_file( filepath=orm_dag.fileloc, only_if_updated=False) + # If the source file no longer exports `dag_id`, delete it from self.dags if found_dags and dag_id in [dag.dag_id for dag in found_dags]: return self.dags[dag_id] elif dag_id in self.dags: @@ -225,10 +227,10 @@ class DagBag(LoggingMixin): try: # This failed before in what may have been a git sync # race condition - dttm = datetime.fromtimestamp(os.path.getmtime(filepath)) + file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath)) if only_if_updated \ and filepath in self.file_last_changed \ - and dttm == self.file_last_changed[filepath]: + and file_last_changed_on_disk == self.file_last_changed[filepath]: return found_dags except Exception as e: @@ -257,7 +259,7 @@ class DagBag(LoggingMixin): except Exception as e: self.logger.exception("Failed to import: " + filepath) self.import_errors[filepath] = str(e) - self.file_last_changed[filepath] = dttm + self.file_last_changed[filepath] = file_last_changed_on_disk else: zip_file = zipfile.ZipFile(filepath) @@ -288,7 +290,7 @@ class DagBag(LoggingMixin): except Exception as e: self.logger.exception("Failed to import: " + filepath) self.import_errors[filepath] = str(e) - self.file_last_changed[filepath] = dttm + self.file_last_changed[filepath] = file_last_changed_on_disk for m in mods: for dag in list(m.__dict__.values()): @@ -301,7 +303,7 @@ class DagBag(LoggingMixin): found_dags.append(dag) found_dags += dag.subdags - self.file_last_changed[filepath] = dttm + self.file_last_changed[filepath] = file_last_changed_on_disk return found_dags @provide_session @@ -2441,8 +2443,8 @@ class DagModel(Base): last_scheduler_run = Column(DateTime) # Last time this DAG was pickled last_pickled = Column(DateTime) - # When the DAG received a refreshed signal last, used to know when - # we need to force refresh + # Time when the DAG last received a refresh signal + # (e.g. the DAG's "refresh" button was clicked in the web UI) last_expired = Column(DateTime) # Whether (one of) the scheduler is scheduling this DAG at the moment scheduler_lock = Column(Boolean) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a89d1015/airflow/www/templates/airflow/dags.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 4d5bf3a..c579726 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -1,13 +1,13 @@ -{# +{# Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -46,22 +46,27 @@ </thead> <tbody> {% for dag_id in all_dag_ids %} - {% set dag = dags[dag_id] if dag_id in dags else None %} + {% set dag = webserver_dags[dag_id] if dag_id in webserver_dags else None %} <tr> + <!-- Column 1: Edit dag --> <td class="text-center" style="width:10px;"> {% if dag_id in orm_dags %} <a href="/admin/dagmodel/edit/?id={{ dag_id }}" title="Info"> - <span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> + <span class="glyphicon glyphicon-edit" aria-hidden="true"></span> </a> {% endif %} </td> + + <!-- Column 2: Turn dag on/off --> <td> {% if dag_id in orm_dags %} <input id="toggle-{{ dag_id }}" dag_id="{{ dag_id }}" type="checkbox" {{ "checked" if not orm_dags[dag_id].is_paused else "" }} data-toggle="toggle" data-size="mini"> {% endif %} </td> + + <!-- Column 3: Name --> <td> - {% if dag %} + {% if dag_id in webserver_dags %} <a href="{{ url_for('airflow.tree', dag_id=dag.dag_id) }}"> {{ dag_id }} </a> @@ -69,47 +74,75 @@ {{ dag_id }} <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database."></span> {% endif %} - {% if dag_id not in orm_dags and False %} + {% if dag_id not in orm_dags %} <span class="glyphicon glyphicon-info-sign" class="info" aria-hidden="true" title="This DAG seems to be existing only locally. The master scheduler doesn't seem to be aware of its existence."></span> {% endif %} </td> + + <!-- Column 4: Dag Schedule --> + <td> + {% if dag_id in webserver_dags %} + <a class="label label-default schedule {{ dag.dag_id }}" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}"> + {{ dag.schedule_interval }} + </a> + {% endif %} + </td> + + <!-- Column 5: Dag Owners --> <td> - <a class="label label-default schedule {{ dag.dag_id }}" href="/admin/dagrun/?flt2_dag_id_equals={{ dag.dag_id }}"> - {{ dag.schedule_interval }}</a></td> - <td>{{ dag.owner if dag else orm_dags[dag_id].owners }}</td> + {{ dag.owner if dag else orm_dags[dag_id].owners }} + </td> + + <!-- Column 6: Recent Statuses --> <td style="padding:0px; width:200px; height:10px;"> <svg height="10" width="10" id='dag-{{ dag.safe_dag_id }}' style="display: block;"></svg> </td> - <td class="text-center" style="width:160px;"><nobr> + + <!-- Column 7: Links --> + <td class="text-center" style="display:flex; flex-direction:row; justify-content:space-around;"> {% if dag %} - <a href="{{ url_for("airflow.tree", dag_id=dag.dag_id, num_runs=25) }}" title="Tree View"> - <span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span> - </a> - <a href="{{ url_for("airflow.graph", dag_id=dag.dag_id) }}" title="Graph View"> - <span class="glyphicon glyphicon-certificate" aria-hidden="true"></span> - </a> - <a href="{{ url_for("airflow.duration", dag_id=dag.dag_id) }}" title="Tasks Duration"> - <span class="glyphicon glyphicon-stats" aria-hidden="true"></span> - </a> - <a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}" title="Landing Times"> - <span class="glyphicon glyphicon-plane" aria-hidden="true"></span> - </a> - <a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}" title="Gantt View"> - <span class="glyphicon glyphicon-align-left" aria-hidden="true"></span> - <i class="icon-align-left"></i> - </a> - <a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}" title="Code View"> - <span class="glyphicon glyphicon-flash" aria-hidden="true"></span> - </a> - <a href="/admin/log/?sort=1&desc=1&flt1_dag_id_equals={{ dag.dag_id }}" title="Logs"> - <i class="icon-list"></i> - <span class="glyphicon glyphicon-align-justify" aria-hidden="true"></span> - </a> + + <!-- Tree --> + <a href="{{ url_for('airflow.tree', dag_id=dag.dag_id, num_runs=25) }}" title="Tree View"> + <span class="glyphicon glyphicon-tree-deciduous" aria-hidden="true"></span> + </a> + + <!-- Graph --> + <a href="{{ url_for('airflow.graph', dag_id=dag.dag_id) }}" title="Graph View"> + <span class="glyphicon glyphicon-certificate" aria-hidden="true"></span> + </a> + + <!-- Duration --> + <a href="{{ url_for('airflow.duration', dag_id=dag.dag_id) }}" title="Tasks Duration"> + <span class="glyphicon glyphicon-stats" aria-hidden="true"></span> + </a> + + <!-- Landing Times --> + <a href="{{ url_for("airflow.landing_times", dag_id=dag.dag_id) }}" title="Landing Times"> + <span class="glyphicon glyphicon-plane" aria-hidden="true"></span> + </a> + + <!-- Gantt --> + <a href="{{ url_for("airflow.gantt", dag_id=dag.dag_id) }}" title="Gantt View"> + <span class="glyphicon glyphicon-align-left" aria-hidden="true"></span> + </a> + + <!-- Code --> + <a href="{{ url_for("airflow.code", dag_id=dag.dag_id) }}" title="Code View"> + <span class="glyphicon glyphicon-flash" aria-hidden="true"></span> + </a> + + <!-- Logs --> + <a href="/admin/log/?sort=1&desc=1&flt1_dag_id_equals={{ dag.dag_id }}" title="Logs"> + <span class="glyphicon glyphicon-align-justify" aria-hidden="true"></span> + </a> {% endif %} - <a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}" title="Refresh"> - <span class="glyphicon glyphicon-refresh" aria-hidden="true"></span> - </a> - </nobr> + + <!-- Refresh --> + <a href="{{ url_for("airflow.refresh", dag_id=dag_id) }}" title="Refresh"> + <span class="glyphicon glyphicon-refresh" aria-hidden="true"></span> + </a> + </td> </tr> {% endfor %} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a89d1015/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index aff6b54..cffbb1d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1620,32 +1620,33 @@ class HomeView(AdminIndexView): session = Session() DM = models.DagModel qry = None - # filter the dags if filter_by_owner and current user is not superuser + + # restrict the dags shown if filter_by_owner and current user is not superuser do_filter = FILTER_BY_OWNER and (not current_user.is_superuser()) owner_mode = conf.get('webserver', 'OWNER_MODE').strip().lower() + # read orm_dags from the db + qry = session.query(DM) qry_fltr = [] - if do_filter: - if owner_mode == 'ldapgroup': - qry_fltr = ( - qry.filter( - ~DM.is_subdag, DM.is_active, - DM.owners.in_(current_user.ldap_groups)) - .all() - ) - elif owner_mode == 'user': - qry_fltr = ( - qry.filter( - ~DM.is_subdag, DM.is_active, - DM.owners == current_user.user.username) - .all() - ) + if do_filter and owner_mode == 'ldapgroup': + qry_fltr = qry.filter( + ~DM.is_subdag, DM.is_active, + DM.owners.in_(current_user.ldap_groups) + ).all() + elif do_filter and owner_mode == 'user': + qry_fltr = qry.filter( + ~DM.is_subdag, DM.is_active, + DM.owners == current_user.user.username + ).all() else: - qry_fltr = qry.filter(~DM.is_subdag, DM.is_active).all() + qry_fltr = qry.filter( + ~DM.is_subdag, DM.is_active + ).all() orm_dags = {dag.dag_id: dag for dag in qry_fltr} + import_errors = session.query(models.ImportError).all() for ie in import_errors: flash( @@ -1654,30 +1655,35 @@ class HomeView(AdminIndexView): session.expunge_all() session.commit() session.close() - dags = dagbag.dags.values() - if do_filter: - if owner_mode == 'ldapgroup': - dags = { - dag.dag_id: dag - for dag in dags - if ( - dag.owner in current_user.ldap_groups and (not dag.parent_dag) - ) - } - elif owner_mode == 'user': - dags = { - dag.dag_id: dag - for dag in dags - if ( - dag.owner == current_user.user.username and (not dag.parent_dag) - ) - } + + # get a list of all non-subdag dags visible to everyone + unfiltered_webserver_dags = [dag for dag in dagbag.dags.values() if not dag.parent_dag] + + # optionally filter to get only dags that the user should see + if do_filter and owner_mode == 'ldapgroup': + # only show dags owned by someone in @current_user.ldap_groups + webserver_dags = { + dag.dag_id: dag + for dag in unfiltered_webserver_dags + if dag.owner in current_user.ldap_groups + } + elif do_filter and owner_mode == 'user': + # only show dags owned by @current_user.user.username + webserver_dags = { + dag.dag_id: dag + for dag in unfiltered_webserver_dags + if dag.owner == current_user.user.username + } else: - dags = {dag.dag_id: dag for dag in dags if not dag.parent_dag} - all_dag_ids = sorted(set(orm_dags.keys()) | set(dags.keys())) + webserver_dags = { + dag.dag_id: dag + for dag in unfiltered_webserver_dags + } + + all_dag_ids = sorted(set(orm_dags.keys()) | set(webserver_dags.keys())) return self.render( 'airflow/dags.html', - dags=dags, + webserver_dags=webserver_dags, orm_dags=orm_dags, all_dag_ids=all_dag_ids)