Repository: incubator-airflow Updated Branches: refs/heads/master 6e3bcd318 -> 3c450fbe1
[AIRFLOW-936] Add clear/mark success for DAG in the UI This PR adds a modal popup when clicking circle DAG icon in Airflow tree view UI. It adds the functionalities to clear/mark success of the entire DAG run. This behavior is equivalent to individually clear/mark each task instance in the DAG run. The original logic of editing DAG run page is moved to the button "Edit DAG Run". Closes #2339 from AllisonWang/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c450fbe Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c450fbe Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c450fbe Branch: refs/heads/master Commit: 3c450fbe1abad7b76b59b6b3b15c6e29b4ad8d0f Parents: 6e3bcd3 Author: Allison Wang <allisonwang...@gmail.com> Authored: Tue Jun 13 18:56:41 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Tue Jun 13 18:56:44 2017 -0700 ---------------------------------------------------------------------- airflow/api/common/experimental/mark_tasks.py | 33 +++- airflow/www/templates/airflow/dag.html | 60 +++++++ airflow/www/templates/airflow/tree.html | 11 +- airflow/www/views.py | 113 +++++++++--- tests/api/common/mark_tasks.py | 189 ++++++++++++++++++++- 5 files changed, 372 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/api/common/experimental/mark_tasks.py ---------------------------------------------------------------------- diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 0ddbf98..82eb4b5 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -22,7 +22,6 @@ from airflow.utils.state import State from sqlalchemy import or_ - def _create_dagruns(dag, execution_dates, state, run_id_template): """ Infers from the dates which dag runs need to be created and does so. @@ -181,7 +180,39 @@ def set_state(task, execution_date, upstream=False, downstream=False, if len(sub_dag_ids) > 0: tis_altered += qry_sub_dag.all() + session.expunge_all() session.close() return tis_altered +def set_dag_run_state(dag, execution_date, state=State.SUCCESS, commit=False): + """ + Set the state of a dag run and all task instances associated with the dag + run for a specific execution date. + :param dag: the DAG of which to alter state + :param execution_date: the execution date from which to start looking + :param state: the state to which the DAG need to be set + :param commit: commit DAG and tasks to be altered to the database + :return: list of tasks that have been created and updated + :raises: AssertionError if dag or execution_date is invalid + """ + res = [] + + if not dag or not execution_date: + return res + + # Mark all task instances in the dag run + for task in dag.tasks: + task.dag = dag + new_state = set_state(task=task, execution_date=execution_date, + state=state, commit=commit) + res.extend(new_state) + + # Mark the dag run + if commit: + drs = DagRun.find(dag.dag_id, execution_date=execution_date) + for dr in drs: + dr.dag = dag + dr.update_state() + + return res http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/dag.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index e5a305c..706ed32 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -216,6 +216,36 @@ </div> </div> </div> + <!-- Modal for dag --> + <div class="modal fade" id="dagModal" + tabindex="-1" role="dialog" + aria-labelledby="dagModalLabel" aria-hidden="true"> + <div class="modal-dialog"> + <div class="modal-content"> + <div class="modal-header"> + <h4 class="modal-title" id="dagModalLabel"> + <span id='dag_id'></span> + </h4> + </div> + <div class="modal-body"> + <button id="btn_edit_dagrun" type="button" class="btn btn-primary"> + Edit + </button> + <button id="btn_dagrun_clear" type="button" class="btn btn-primary"> + Clear + </button> + <button id="btn_dagrun_success" type="button" class="btn btn-primary"> + Mark Success + </button> + </div> + <div class="modal-footer"> + <button type="button" class="btn btn-default" data-dismiss="modal"> + Close + </button> + </div> + </div> + </div> + </div> {% endblock %} {% block tail %} {{ lib.form_js() }} @@ -239,6 +269,7 @@ function updateQueryStringParameter(uri, key, value) { $('.never_active').removeClass('active'); }); + var id = ''; var dag_id = '{{ dag.dag_id }}'; var task_id = ''; var exection_date = ''; @@ -263,6 +294,14 @@ function updateQueryStringParameter(uri, key, value) { } } + function call_modal_dag(dag) { + id = dag && dag.id; + execution_date = dag && dag.execution_date; + $('#dag_id').html(dag_id); + $('#dagModal').modal({}); + $("#dagModal").css("margin-top","0px"); + } + $("#btn_rendered").click(function(){ url = "{{ url_for('airflow.rendered') }}" + "?task_id=" + task_id + @@ -328,6 +367,15 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $("#btn_dagrun_clear").click(function(){ + url = "{{ url_for('airflow.dagrun_clear') }}" + + "?task_id=" + encodeURIComponent(task_id) + + "&dag_id=" + encodeURIComponent(dag_id) + + "&execution_date=" + execution_date + + "&origin=" + encodeURIComponent(window.location); + window.location = url; + }); + $("#btn_success").click(function(){ url = "{{ url_for('airflow.success') }}" + "?task_id=" + encodeURIComponent(task_id) + @@ -342,6 +390,14 @@ function updateQueryStringParameter(uri, key, value) { window.location = url; }); + $('#btn_dagrun_success').click(function(){ + url = "{{ url_for('airflow.dagrun_success') }}" + + "?dag_id=" + encodeURIComponent(dag_id) + + "&execution_date=" + execution_date + + "&origin=" + encodeURIComponent(window.location); + window.location = url; + }); + $("#btn_gantt").click(function(){ url = "{{ url_for('airflow.gantt') }}" + "?dag_id=" + dag_id + @@ -367,5 +423,9 @@ function updateQueryStringParameter(uri, key, value) { $.post(url); }); + $('#btn_edit_dagrun').click(function(){ + window.location = '/admin/dagrun/edit/?id=' + id; + }); + </script> {% endblock %} http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/templates/airflow/tree.html ---------------------------------------------------------------------- diff --git a/airflow/www/templates/airflow/tree.html b/airflow/www/templates/airflow/tree.html index be3655a..b570fae 100644 --- a/airflow/www/templates/airflow/tree.html +++ b/airflow/www/templates/airflow/tree.html @@ -1,13 +1,13 @@ -{# +{# Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -232,9 +232,8 @@ function update(source) { .enter() .append('rect') .on("click", function(d){ - if(d.task_id === undefined){ - window.location = '/admin/dagrun/edit/?id=' + d.id; - } + if(d.task_id === undefined) + call_modal_dag(d); else if(nodeobj[d.task_id].operator=='SubDagOperator') call_modal(d.task_id, d.execution_date, true); else http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index b401434..541c3ff 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -58,6 +58,7 @@ import airflow from airflow import configuration as conf from airflow import models from airflow import settings +from airflow.api.common.experimental.mark_tasks import set_dag_run_state from airflow.exceptions import AirflowException from airflow.settings import Session from airflow.models import XCom, DagRun @@ -1026,6 +1027,36 @@ class Airflow(BaseView): "it should start any moment now.".format(dag_id)) return redirect(origin) + def _clear_dag_tis(self, dag, start_date, end_date, origin, + recursive=False, confirmed=False): + if confirmed: + count = dag.clear( + start_date=start_date, + end_date=end_date, + include_subdags=recursive) + + flash("{0} task instances have been cleared".format(count)) + return redirect(origin) + + tis = dag.clear( + start_date=start_date, + end_date=end_date, + include_subdags=recursive, + dry_run=True) + if not tis: + flash("No task instances to clear", 'error') + response = redirect(origin) + else: + details = "\n".join([str(t) for t in tis]) + + response = self.render( + 'airflow/confirm.html', + message=("Here's the list of task instances you are about " + "to clear:"), + details=details) + + return response + @expose('/clear') @login_required @wwwutils.action_logging @@ -1052,34 +1083,28 @@ class Airflow(BaseView): end_date = execution_date if not future else None start_date = execution_date if not past else None - if confirmed: - count = dag.clear( - start_date=start_date, - end_date=end_date, - include_subdags=recursive) - flash("{0} task instances have been cleared".format(count)) - return redirect(origin) - else: - tis = dag.clear( - start_date=start_date, - end_date=end_date, - include_subdags=recursive, - dry_run=True) - if not tis: - flash("No task instances to clear", 'error') - response = redirect(origin) - else: - details = "\n".join([str(t) for t in tis]) + return self._clear_dag_tis(dag, start_date, end_date, origin, + recursive=recursive, confirmed=confirmed) - response = self.render( - 'airflow/confirm.html', - message=( - "Here's the list of task instances you are about " - "to clear:"), - details=details,) + @expose('/dagrun_clear') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def dagrun_clear(self): + dag_id = request.args.get('dag_id') + task_id = request.args.get('task_id') + origin = request.args.get('origin') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == "true" - return response + dag = dagbag.get_dag(dag_id) + execution_date = dateutil.parser.parse(execution_date) + start_date = execution_date + end_date = execution_date + + return self._clear_dag_tis(dag, start_date, end_date, origin, + recursive=True, confirmed=confirmed) @expose('/blocked') @login_required @@ -1104,6 +1129,44 @@ class Airflow(BaseView): }) return wwwutils.json_response(payload) + @expose('/dagrun_success') + @login_required + @wwwutils.action_logging + @wwwutils.notify_owner + def dagrun_success(self): + dag_id = request.args.get('dag_id') + execution_date = request.args.get('execution_date') + confirmed = request.args.get('confirmed') == 'true' + origin = request.args.get('origin') + + if not execution_date: + flash('Invalid execution date', 'error') + return redirect(origin) + + execution_date = dateutil.parser.parse(execution_date) + dag = dagbag.get_dag(dag_id) + + if not dag: + flash('Cannot find DAG: {}'.format(dag_id), 'error') + return redirect(origin) + + new_dag_state = set_dag_run_state(dag, execution_date, state=State.SUCCESS, + commit=confirmed) + + if confirmed: + flash('Marked success on {} task instances'.format(len(new_dag_state))) + return redirect(origin) + + else: + details = '\n'.join([str(t) for t in new_dag_state]) + + response = self.render('airflow/confirm.html', + message=("Here's the list of task instances you are " + "about to mark as successful:"), + details=details) + + return response + @expose('/success') @login_required @wwwutils.action_logging http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c450fbe/tests/api/common/mark_tasks.py ---------------------------------------------------------------------- diff --git a/tests/api/common/mark_tasks.py b/tests/api/common/mark_tasks.py index e01f3ad..8a3759f 100644 --- a/tests/api/common/mark_tasks.py +++ b/tests/api/common/mark_tasks.py @@ -16,11 +16,12 @@ import unittest from airflow import models -from airflow.api.common.experimental.mark_tasks import set_state, _create_dagruns +from airflow.api.common.experimental.mark_tasks import ( + set_state, _create_dagruns, set_dag_run_state) from airflow.settings import Session from airflow.utils.dates import days_ago from airflow.utils.state import State - +from datetime import datetime, timedelta DEV_NULL = "/dev/null" @@ -207,5 +208,189 @@ class TestMarkTasks(unittest.TestCase): self.session.close() +class TestMarkDAGRun(unittest.TestCase): + def setUp(self): + self.dagbag = models.DagBag(include_examples=True) + self.dag1 = self.dagbag.dags['test_example_bash_operator'] + self.dag2 = self.dagbag.dags['example_subdag_operator'] + + self.execution_dates = [days_ago(3), days_ago(2), days_ago(1)] + + self.session = Session() + + def verify_dag_run_states(self, dag, date, state=State.SUCCESS): + drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date) + dr = drs[0] + self.assertEqual(dr.get_state(), state) + tis = dr.get_task_instances(session=self.session) + for ti in tis: + self.assertEqual(ti.state, state) + + def test_set_running_dag_run_state(self): + date = self.execution_dates[0] + dr = self.dag1.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.RUNNING, + execution_date=date, + session=self.session + ) + for ti in dr.get_task_instances(session=self.session): + ti.set_state(State.RUNNING, self.session) + + altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + + # All of the task should be altered + self.assertEqual(len(altered), len(self.dag1.tasks)) + self.verify_dag_run_states(self.dag1, date) + + def test_set_success_dag_run_state(self): + date = self.execution_dates[0] + + dr = self.dag1.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.SUCCESS, + execution_date=date, + session=self.session + ) + for ti in dr.get_task_instances(session=self.session): + ti.set_state(State.SUCCESS, self.session) + + altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + + # None of the task should be altered + self.assertEqual(len(altered), 0) + self.verify_dag_run_states(self.dag1, date) + + def test_set_failed_dag_run_state(self): + date = self.execution_dates[0] + dr = self.dag1.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.FAILED, + execution_date=date, + session=self.session + ) + dr.get_task_instance('runme_0').set_state(State.FAILED, self.session) + + altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + + # All of the task should be altered + self.assertEqual(len(altered), len(self.dag1.tasks)) + self.verify_dag_run_states(self.dag1, date) + + def test_set_mixed_dag_run_state(self): + """ + This test checks function set_dag_run_state with mixed task instance + state. + """ + date = self.execution_dates[0] + dr = self.dag1.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.FAILED, + execution_date=date, + session=self.session + ) + # success task + dr.get_task_instance('runme_0').set_state(State.SUCCESS, self.session) + # skipped task + dr.get_task_instance('runme_1').set_state(State.SKIPPED, self.session) + # retry task + dr.get_task_instance('runme_2').set_state(State.UP_FOR_RETRY, self.session) + # queued task + dr.get_task_instance('also_run_this').set_state(State.QUEUED, self.session) + # running task + dr.get_task_instance('run_after_loop').set_state(State.RUNNING, self.session) + # failed task + dr.get_task_instance('run_this_last').set_state(State.FAILED, self.session) + + altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=True) + + self.assertEqual(len(altered), len(self.dag1.tasks) - 1) # only 1 task succeeded + self.verify_dag_run_states(self.dag1, date) + + def test_set_state_without_commit(self): + date = self.execution_dates[0] + + # Running dag run and task instances + dr = self.dag1.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.RUNNING, + execution_date=date, + session=self.session + ) + for ti in dr.get_task_instances(session=self.session): + ti.set_state(State.RUNNING, self.session) + + altered = set_dag_run_state(self.dag1, date, state=State.SUCCESS, commit=False) + + # All of the task should be altered + self.assertEqual(len(altered), len(self.dag1.tasks)) + + # Both dag run and task instances' states should remain the same + self.verify_dag_run_states(self.dag1, date, State.RUNNING) + + def test_set_state_with_multiple_dagruns(self): + dr1 = self.dag2.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.FAILED, + execution_date=self.execution_dates[0], + session=self.session + ) + dr2 = self.dag2.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.FAILED, + execution_date=self.execution_dates[1], + session=self.session + ) + dr3 = self.dag2.create_dagrun( + run_id='manual__' + datetime.now().isoformat(), + state=State.RUNNING, + execution_date=self.execution_dates[2], + session=self.session + ) + + altered = set_dag_run_state(self.dag2, self.execution_dates[1], + state=State.SUCCESS, commit=True) + + # Recursively count number of tasks in the dag + def count_dag_tasks(dag): + count = len(dag.tasks) + subdag_counts = [count_dag_tasks(subdag) for subdag in dag.subdags] + count += sum(subdag_counts) + return count + + self.assertEqual(len(altered), count_dag_tasks(self.dag2)) + self.verify_dag_run_states(self.dag2, self.execution_dates[1]) + + # Make sure other dag status are not changed + dr1 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[0]) + dr1 = dr1[0] + self.assertEqual(dr1.get_state(), State.FAILED) + dr3 = models.DagRun.find(dag_id=self.dag2.dag_id, execution_date=self.execution_dates[2]) + dr3 = dr3[0] + self.assertEqual(dr3.get_state(), State.RUNNING) + + def test_set_dag_run_state_edge_cases(self): + # Dag does not exist + altered = set_dag_run_state(None, self.execution_dates[0]) + self.assertEqual(len(altered), 0) + + # Invalid execution date + altered = set_dag_run_state(self.dag1, None) + self.assertEqual(len(altered), 0) + self.assertRaises(AssertionError, set_dag_run_state, self.dag1, timedelta(microseconds=-1)) + + # DagRun does not exist + # This will throw AssertionError since dag.latest_execution_date does not exist + self.assertRaises(AssertionError, set_dag_run_state, self.dag1, self.execution_dates[0]) + + def tearDown(self): + self.dag1.clear() + self.dag2.clear() + + self.session.query(models.DagRun).delete() + self.session.query(models.TaskInstance).delete() + self.session.query(models.DagStat).delete() + self.session.commit() + if __name__ == '__main__': unittest.main()