[jira] [Commented] (AIRFLOW-1488) Add a sensor operator to wait on DagRuns

2018-12-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16723488#comment-16723488
 ] 

ASF GitHub Bot commented on AIRFLOW-1488:
-

stale[bot] closed pull request #2500: [AIRFLOW-1488] Add the DagRunSensor 
operator.
URL: https://github.com/apache/incubator-airflow/pull/2500
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/operators/dagrun_sensor.py 
b/airflow/contrib/operators/dagrun_sensor.py
new file mode 100644
index 00..f4465626af
--- /dev/null
+++ b/airflow/contrib/operators/dagrun_sensor.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from airflow import settings
+from airflow.utils.state import State
+from airflow.utils.decorators import apply_defaults
+from airflow.models import DagRun
+from airflow.operators.sensors import BaseSensorOperator
+
+
+class DagRunSensor(BaseSensorOperator):
+"""
+Waits for a DAG run to complete.
+
+:param external_dag_id: The dag_id that you want to wait for
+:type external_dag_id: string
+:param allowed_states: list of allowed states, default is ``['success']``
+:type allowed_states: list
+:param execution_delta: time difference with the previous execution to look
+at, the default is the same execution_date as the current task.  For
+yesterday, use [positive!] datetime.timedelta(days=1). Either
+execution_delta or execution_date_fn can be passed to DagRunSensor, but not
+both.
+:type execution_delta: datetime.timedelta
+:param execution_date_fn: function that receives the current execution date
+and returns the desired execution dates to query. Either execution_delta or
+execution_date_fn can be passed to DagRunSensor, but not both.
+:type execution_date_fn: callable
+"""
+@apply_defaults
+def __init__(
+self,
+external_dag_id,
+allowed_states=None,
+execution_delta=None,
+execution_date_fn=None,
+*args, **kwargs):
+super(DagRunSensor, self).__init__(*args, **kwargs)
+
+if execution_delta is not None and execution_date_fn is not None:
+raise ValueError(
+'Only one of `execution_date` or `execution_date_fn` may'
+'be provided to DagRunSensor; not both.')
+
+self.allowed_states = allowed_states or [State.SUCCESS]
+self.execution_delta = execution_delta
+self.execution_date_fn = execution_date_fn
+self.external_dag_id = external_dag_id
+
+def poke(self, context):
+if self.execution_delta:
+dttm = context['execution_date'] - self.execution_delta
+elif self.execution_date_fn:
+dttm = self.execution_date_fn(context['execution_date'])
+else:
+dttm = context['execution_date']
+
+dttm_filter = dttm if isinstance(dttm, list) else [dttm]
+serialized_dttm_filter = ','.join([datetime.isoformat() for datetime in
+   dttm_filter])
+
+logging.info(
+ 'Poking for '
+ '{self.external_dag_id}.'
+ '{serialized_dttm_filter} ... '.format(**locals()))
+
+session = settings.Session()
+count = session.query(DagRun).filter(
+DagRun.dag_id == self.external_dag_id,
+DagRun.state.in_(self.allowed_states),
+DagRun.execution_date.in_(dttm_filter),
+).count()
+session.commit()
+session.close()
+return count == len(dttm_filter)
diff --git a/tests/contrib/operators/test_dagrun_sensor.py 
b/tests/contrib/operators/test_dagrun_sensor.py
new file mode 100644
index 00..74e4d46ccb
--- /dev/null
+++ b/tests/contrib/operators/test_dagrun_sensor.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# 

[jira] [Commented] (AIRFLOW-1488) Add a sensor operator to wait on DagRuns

2018-12-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16712260#comment-16712260
 ] 

ASF GitHub Bot commented on AIRFLOW-1488:
-

ybendana opened a new pull request #4291: [AIRFLOW-1488] Add the DagRunSensor 
operator
URL: https://github.com/apache/incubator-airflow/pull/4291
 
 
   ExternalTaskSensor allows one to wait on any valid combination of
   (dag_id, task_id). It is desirable though to be able to wait on entire
   DagRuns, as opposed to specific task instances in those DAGs.
   
   This pull request adds a new sensor in contrib called DagRunSensor.
   This version is a different approach from previous pull requests
   addressing the same issue.  In this case, the DagRunSensor takes a
   trigger_task_id, the id of a task that triggers DagRuns.  The trigger
   task returns a list of run_ids of the DagRuns it triggered and the
   DagRunSensor polls their status. For this purpose the
   TriggerDagRunOperator was modified so that it stores the run_id of the
   triggered DagRun.
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
 - https://issues.apache.org/jira/browse/AIRFLOW-XXX
 - In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   See the above description.  I also copied quite a bit from the previous pull 
requests on this issue #2500 and #3234.  Unlike those attempts, the focus here 
is on triggered DagRuns.  I realize it may not be generally applicable to 
everyone's workflows but it has been very useful for us. At one point I 
considered using subdags but since they have had their issues I think this a 
good alternative.
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   I included test_dagrun_sensor.py and test_dagrun_sensor_dag.py.  Currently, 
I'm running into an issue executing the test in the Docker environment because 
the sensor task has to wait on the triggered DagRun.  The DagRun is triggered 
but is never scheduled.  I think this is because the unit test uses the 
SequentialExecutor and SQLite.  If I run the scheduler manually, the triggered 
DagRun executes. I'm not sure what to do about this. For now, I've commented 
the successful DagRun test and have only the failed DagRun test active.
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a sensor operator to wait on DagRuns
> 
>
> Key: AIRFLOW-1488
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1488
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: contrib, operators
>Reporter: Yati
>Assignee: Yati
>Priority: Major
>
> The 
> [ExternalTaskSensor|https://airflow.incubator.apache.org/code.html#airflow.operators.ExternalTaskSensor]
>  operator already allows for encoding dependencies on tasks in external DAGs. 
> However, when you have teams, each owning multiple small-to-medium sized 
> DAGs, it is desirable to be able to wait on an external DagRun as a whole. 
> This allows the owners of an upstream DAG to refactor their code freely by 
> splitting/squashing task responsibilities, without 

[jira] [Commented] (AIRFLOW-1488) Add a sensor operator to wait on DagRuns

2018-09-02 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-1488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601625#comment-16601625
 ] 

Apache Spark commented on AIRFLOW-1488:
---

User 'milanvdm' has created a pull request for this issue:
https://github.com/apache/incubator-airflow/pull/3234

> Add a sensor operator to wait on DagRuns
> 
>
> Key: AIRFLOW-1488
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1488
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: contrib, operators
>Reporter: Yati
>Assignee: Yati
>Priority: Major
>
> The 
> [ExternalTaskSensor|https://airflow.incubator.apache.org/code.html#airflow.operators.ExternalTaskSensor]
>  operator already allows for encoding dependencies on tasks in external DAGs. 
> However, when you have teams, each owning multiple small-to-medium sized 
> DAGs, it is desirable to be able to wait on an external DagRun as a whole. 
> This allows the owners of an upstream DAG to refactor their code freely by 
> splitting/squashing task responsibilities, without worrying about dependent 
> DAGs breaking.
> I'll now enumerate the easiest ways of achieving this that come to mind:
> * Make all DAGs always have a join DummyOperator in the end, with a task id 
> that follows some convention, e.g., "{{ dag_id }}.__end__".
> * Make ExternalTaskSensor poke for a DagRun instead of TaskInstances when the 
> external_task_id argument is None.
> * Implement a separate DagRunSensor operator.
> After considerations, we decided to implement a separate operator, which 
> we've been using in the team for our workflows, and I think it would make a 
> good addition to contrib.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)