Repository: incubator-airflow Updated Branches: refs/heads/master 98b4df945 -> 9f2c16a0a
[AIRFLOW-1681] Add batch clear in task instance view Allow users to batch clear selected task instance(s) in task instance view. Only state(s) of selected task instance(s) will be cleared--no upstream nor downstream task instance will be affected. DAG(s) involved will be set to "RUNNING" state, same as existing "clear" operation. Keeping both "Delete" and "Clear" operations for more smooth user habit transition--informing DAG state change in pop-up (check screenshots). Closes #2681 from yrqls21/add-batch-clear-in-task- instance-view Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0 Branch: refs/heads/master Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5 Parents: 98b4df9 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Wed Oct 11 14:05:33 2017 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Wed Oct 11 14:05:35 2017 -0700 ---------------------------------------------------------------------- airflow/www/views.py | 62 +++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py ---------------------------------------------------------------------- diff --git a/airflow/www/views.py b/airflow/www/views.py index bc63b5b..81ee61f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -37,7 +37,7 @@ import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all from flask import ( - abort, redirect, url_for, request, Markup, Response, current_app, render_template, + abort, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView @@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly): 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') - can_delete = True page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) @@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly): def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY) - @action('delete', - lazy_gettext('Delete'), - lazy_gettext('Are you sure you want to delete selected records?')) - def action_delete(self, ids): - """ - As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete(). - - TODO: this method should be removed once the below bug is fixed on Flask-Admin side. - https://github.com/flask-admin/flask-admin/issues/1226 - """ - if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): - self.delete_task_instances(ids) - else: - super(TaskInstanceModelView, self).action_delete(ids) - @provide_session - def set_task_instance_state(self, ids, target_state, session=None): + @action('clear', + lazy_gettext('Clear'), + lazy_gettext( + 'Are you sure you want to clear the state of the selected task instance(s)' + ' and set their dagruns to the running state?')) + def action_clear(self, ids, session=None): try: TI = models.TaskInstance - count = len(ids) + + dag_to_tis = {} + for id in ids: task_id, dag_id, execution_date = id.split(',') - execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') + ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() - ti.state = target_state + + dag = dagbag.get_dag(dag_id) + tis = dag_to_tis.setdefault(dag, []) + tis.append(ti) + + for dag, tis in dag_to_tis.items(): + models.clear_task_instances(tis, session, dag=dag) + session.commit() - flash( - "{count} task instances were set to '{target_state}'".format(**locals())) + flash("{0} task instances have been cleared".format(len(ids))) + except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") - flash('Failed to set state', 'error') + flash('Failed to clear task instances', 'error') @provide_session - def delete_task_instances(self, ids, session=None): + def set_task_instance_state(self, ids, target_state, session=None): try: TI = models.TaskInstance - count = 0 + count = len(ids) for id in ids: task_id, dag_id, execution_date = id.split(',') execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') - count += session.query(TI).filter(TI.task_id == task_id, - TI.dag_id == dag_id, - TI.execution_date == execution_date).delete() + ti = session.query(TI).filter(TI.task_id == task_id, + TI.dag_id == dag_id, + TI.execution_date == execution_date).one() + ti.state = target_state session.commit() - flash("{count} task instances were deleted".format(**locals())) + flash( + "{count} task instances were set to '{target_state}'".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") - flash('Failed to delete', 'error') + flash('Failed to set state', 'error') def get_one(self, id): """