Repository: incubator-airflow
Updated Branches:
  refs/heads/master fedbacb0e -> b9efdc620


AIRFLOW-167: Add dag_state option in cli


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9db00511
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9db00511
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9db00511

Branch: refs/heads/master
Commit: 9db00511da22c731d10cdc4ea40942c77b1b4008
Parents: 456dada
Author: Sumit Maheshwari <sum...@qubole.com>
Authored: Tue May 24 21:25:26 2016 +0530
Committer: Sumit Maheshwari <sum...@qubole.com>
Committed: Thu May 26 13:14:27 2016 +0530

----------------------------------------------------------------------
 airflow/bin/cli.py | 16 ++++++++++++++++
 airflow/models.py  |  7 ++++++-
 tests/core.py      |  4 ++++
 3 files changed, 26 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 3184455..840c375 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -325,6 +325,18 @@ def task_state(args):
     print(ti.current_state())
 
 
+def dag_state(args):
+    """
+    Returns the state of a DagRun at the command line.
+
+    >>> airflow dag_state tutorial 2015-01-01T00:00:00.000000
+    running
+    """
+    dag = get_dag(args)
+    dr = DagRun.find(dag.dag_id, execution_date=args.execution_date)
+    print(dr[0].state if len(dr) > 0 else None)
+
+
 def list_dags(args):
     dagbag = DagBag(process_subdir(args.subdir))
     s = textwrap.dedent("""\n
@@ -886,6 +898,10 @@ class CLIFactory(object):
             'help': "List all the DAGs",
             'args': ('subdir', 'report'),
         }, {
+            'func': dag_state,
+            'help': "Get the status of a dag run",
+            'args': ('dag_id', 'execution_date', 'subdir'),
+        }, {
             'func': task_state,
             'help': "Get the status of a task instance",
             'args': ('dag_id', 'task_id', 'execution_date', 'subdir'),

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 67958f2..461f3c3 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3438,7 +3438,8 @@ class DagRun(Base):
 
     @staticmethod
     @provide_session
-    def find(dag_id, run_id=None, state=None, external_trigger=None, 
session=None):
+    def find(dag_id, run_id=None, state=None, external_trigger=None, 
session=None,
+             execution_date=None):
         """
         Returns a set of dag runs for the given search criteria.
         :param run_id: defines the the run id for this dag run
@@ -3449,6 +3450,8 @@ class DagRun(Base):
         :type external_trigger: bool
         :param session: database session
         :type session: Session
+        :param execution_date: execution date for the dag
+        :type execution_date: string
         """
         DR = DagRun
 
@@ -3459,6 +3462,8 @@ class DagRun(Base):
             qry = qry.filter(DR.state == state)
         if external_trigger:
             qry = qry.filter(DR.external_trigger == external_trigger)
+        if execution_date:
+            qry = qry.filter(DR.execution_date == execution_date)
         dr = qry.all()
 
         return dr

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9db00511/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 4b5d563..80ad477 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -678,6 +678,10 @@ class CliTests(unittest.TestCase):
             'task_state', 'example_bash_operator', 'runme_0',
             DEFAULT_DATE.isoformat()]))
 
+    def test_dag_state(self):
+        self.assertEqual(None, cli.dag_state(self.parser.parse_args([
+            'dag_state', 'example_bash_operator', DEFAULT_DATE.isoformat()])))
+
     def test_pause(self):
         args = self.parser.parse_args([
             'pause', 'example_bash_operator'])

Reply via email to