Repository: incubator-airflow Updated Branches: refs/heads/master fedbacb0e -> b9efdc620
AIRFLOW-167: Add dag_state option in cli Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9db00511 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9db00511 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9db00511 Branch: refs/heads/master Commit: 9db00511da22c731d10cdc4ea40942c77b1b4008 Parents: 456dada Author: Sumit Maheshwari <sum...@qubole.com> Authored: Tue May 24 21:25:26 2016 +0530 Committer: Sumit Maheshwari <sum...@qubole.com> Committed: Thu May 26 13:14:27 2016 +0530 ---------------------------------------------------------------------- airflow/bin/cli.py | 16 ++++++++++++++++ airflow/models.py | 7 ++++++- tests/core.py | 4 ++++ 3 files changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 3184455..840c375 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -325,6 +325,18 @@ def task_state(args): print(ti.current_state()) +def dag_state(args): + """ + Returns the state of a DagRun at the command line. + + >>> airflow dag_state tutorial 2015-01-01T00:00:00.000000 + running + """ + dag = get_dag(args) + dr = DagRun.find(dag.dag_id, execution_date=args.execution_date) + print(dr[0].state if len(dr) > 0 else None) + + def list_dags(args): dagbag = DagBag(process_subdir(args.subdir)) s = textwrap.dedent("""\n @@ -886,6 +898,10 @@ class CLIFactory(object): 'help': "List all the DAGs", 'args': ('subdir', 'report'), }, { + 'func': dag_state, + 'help': "Get the status of a dag run", + 'args': ('dag_id', 'execution_date', 'subdir'), + }, { 'func': task_state, 'help': "Get the status of a task instance", 'args': ('dag_id', 'task_id', 'execution_date', 'subdir'), http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 67958f2..461f3c3 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -3438,7 +3438,8 @@ class DagRun(Base): @staticmethod @provide_session - def find(dag_id, run_id=None, state=None, external_trigger=None, session=None): + def find(dag_id, run_id=None, state=None, external_trigger=None, session=None, + execution_date=None): """ Returns a set of dag runs for the given search criteria. :param run_id: defines the the run id for this dag run @@ -3449,6 +3450,8 @@ class DagRun(Base): :type external_trigger: bool :param session: database session :type session: Session + :param execution_date: execution date for the dag + :type execution_date: string """ DR = DagRun @@ -3459,6 +3462,8 @@ class DagRun(Base): qry = qry.filter(DR.state == state) if external_trigger: qry = qry.filter(DR.external_trigger == external_trigger) + if execution_date: + qry = qry.filter(DR.execution_date == execution_date) dr = qry.all() return dr http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 4b5d563..80ad477 100644 --- a/tests/core.py +++ b/tests/core.py @@ -678,6 +678,10 @@ class CliTests(unittest.TestCase): 'task_state', 'example_bash_operator', 'runme_0', DEFAULT_DATE.isoformat()])) + def test_dag_state(self): + self.assertEqual(None, cli.dag_state(self.parser.parse_args([ + 'dag_state', 'example_bash_operator', DEFAULT_DATE.isoformat()]))) + def test_pause(self): args = self.parser.parse_args([ 'pause', 'example_bash_operator'])