[jira] [Updated] (AIRFLOW-2007) task failed first and then become green

2018-01-15 Thread Minghao Hu (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Minghao Hu updated AIRFLOW-2007:

Description: 
When I start to run a dag which contain only one task, the task failed 
immediately and then become green later.

The task just call linux shell command to copy a big file from a folder to 
another. When I start the task, it become red(failed)(meanwhile, the dagrun 
become red too), but the "copy" process is running on the shell background 
normally. Then because the dagrun failed, the dag starts the next dag run and 
failed again.

After the process finished normally, the task become green but the dagrun don't.

 

Check:

1.I have checked the logs but there is no error log found.

2.After check the airflow database, I found task_instance table is inserted a 
failed task record when the task starts. dag_run table is inserted a running 
record. After a while the dag_run become failed.

3.For the job table, it is inserted a running record, and become success after 
the shell command finished.(which is normally process)

 

My question:

1.Why the task failed immediately when it started.

2.Where does the airflow code modify the task_instance table?

> task failed first and then become green
> ---
>
> Key: AIRFLOW-2007
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2007
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: Airflow 1.7.1
>Reporter: Minghao Hu
>Priority: Blocker
>
> When I start to run a dag which contain only one task, the task failed 
> immediately and then become green later.
> The task just call linux shell command to copy a big file from a folder to 
> another. When I start the task, it become red(failed)(meanwhile, the dagrun 
> become red too), but the "copy" process is running on the shell background 
> normally. Then because the dagrun failed, the dag starts the next dag run and 
> failed again.
> After the process finished normally, the task become green but the dagrun 
> don't.
>  
> Check:
> 1.I have checked the logs but there is no error log found.
> 2.After check the airflow database, I found task_instance table is inserted a 
> failed task record when the task starts. dag_run table is inserted a running 
> record. After a while the dag_run become failed.
> 3.For the job table, it is inserted a running record, and become success 
> after the shell command finished.(which is normally process)
>  
> My question:
> 1.Why the task failed immediately when it started.
> 2.Where does the airflow code modify the task_instance table?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2007) task failed first and then become green

2018-01-15 Thread Minghao Hu (JIRA)
Minghao Hu created AIRFLOW-2007:
---

 Summary: task failed first and then become green
 Key: AIRFLOW-2007
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2007
 Project: Apache Airflow
  Issue Type: Bug
  Components: DagRun
Affects Versions: Airflow 1.7.1
Reporter: Minghao Hu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-2003) Use flask caching instead of flask cache

2018-01-15 Thread Fokko Driesprong (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong resolved AIRFLOW-2003.
---
   Resolution: Fixed
Fix Version/s: 2.0.0

Issue resolved by pull request #2944
[https://github.com/apache/incubator-airflow/pull/2944]

> Use flask caching instead of flask cache 
> -
>
> Key: AIRFLOW-2003
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>
> Flask cache has been unmaintained for over 3 years. Flask-caching is the 
> community maintained version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2003) Use flask caching instead of flask cache

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326557#comment-16326557
 ] 

ASF subversion and git services commented on AIRFLOW-2003:
--

Commit 88130a5d7eec9e28c7e21a03e35170590e4bd0b6 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=88130a5 ]

[AIRFLOW-2003] Use flask-caching instead of flask-cache

Flask-cache has been unmaintained for over three
years,
flask-caching is the community supported version.

Closes #2944 from bolkedebruin/AIRFLOW-2003


> Use flask caching instead of flask cache 
> -
>
> Key: AIRFLOW-2003
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>
> Flask cache has been unmaintained for over 3 years. Flask-caching is the 
> community maintained version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2003) Use flask caching instead of flask cache

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326558#comment-16326558
 ] 

ASF subversion and git services commented on AIRFLOW-2003:
--

Commit 88130a5d7eec9e28c7e21a03e35170590e4bd0b6 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=88130a5 ]

[AIRFLOW-2003] Use flask-caching instead of flask-cache

Flask-cache has been unmaintained for over three
years,
flask-caching is the community supported version.

Closes #2944 from bolkedebruin/AIRFLOW-2003


> Use flask caching instead of flask cache 
> -
>
> Key: AIRFLOW-2003
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>
> Flask cache has been unmaintained for over 3 years. Flask-caching is the 
> community maintained version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2003] Use flask-caching instead of flask-cache

2018-01-15 Thread fokko
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a34a4865b -> 88130a5d7


[AIRFLOW-2003] Use flask-caching instead of flask-cache

Flask-cache has been unmaintained for over three
years,
flask-caching is the community supported version.

Closes #2944 from bolkedebruin/AIRFLOW-2003


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/88130a5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/88130a5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/88130a5d

Branch: refs/heads/master
Commit: 88130a5d7eec9e28c7e21a03e35170590e4bd0b6
Parents: a34a486
Author: Bolke de Bruin 
Authored: Mon Jan 15 21:12:03 2018 +0100
Committer: Fokko Driesprong 
Committed: Mon Jan 15 21:12:03 2018 +0100

--
 airflow/www/app.py | 2 +-
 setup.py   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/88130a5d/airflow/www/app.py
--
diff --git a/airflow/www/app.py b/airflow/www/app.py
index 74e669a..0b71c17 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -17,7 +17,7 @@ import six
 
 from flask import Flask
 from flask_admin import Admin, base
-from flask_cache import Cache
+from flask_caching import Cache
 from flask_wtf.csrf import CSRFProtect
 csrf = CSRFProtect()
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/88130a5d/setup.py
--
diff --git a/setup.py b/setup.py
index f0e8406..9358090 100644
--- a/setup.py
+++ b/setup.py
@@ -208,7 +208,7 @@ def do_setup():
 'dill>=0.2.2, <0.3',
 'flask>=0.11, <0.12',
 'flask-admin==1.4.1',
-'flask-cache>=0.13.1, <0.14',
+'flask-caching>=1.3.3, <1.4.0',
 'flask-login==0.2.11',
 'flask-swagger==0.2.13',
 'flask-wtf>=0.14, <0.15',



[jira] [Resolved] (AIRFLOW-2002) Do not swallow exceptions when importing logging confs

2018-01-15 Thread Fokko Driesprong (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong resolved AIRFLOW-2002.
---
   Resolution: Fixed
Fix Version/s: (was: 1.10.0)
   2.0.0

Issue resolved by pull request #2945
[https://github.com/apache/incubator-airflow/pull/2945]

> Do not swallow exceptions when importing logging confs
> --
>
> Key: AIRFLOW-2002
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2002
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 1.9.0
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2002) Do not swallow exceptions when importing logging confs

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326552#comment-16326552
 ] 

ASF subversion and git services commented on AIRFLOW-2002:
--

Commit a34a4865b118ffa8f5389ecb58ea3e0ee6970b04 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a34a486 ]

[AIRFLOW-2002] Do not swallow exception on logging import

Closes #2945 from bolkedebruin/AIRFLOW-2002


> Do not swallow exceptions when importing logging confs
> --
>
> Key: AIRFLOW-2002
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2002
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 1.9.0
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2002) Do not swallow exceptions when importing logging confs

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326553#comment-16326553
 ] 

ASF subversion and git services commented on AIRFLOW-2002:
--

Commit a34a4865b118ffa8f5389ecb58ea3e0ee6970b04 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a34a486 ]

[AIRFLOW-2002] Do not swallow exception on logging import

Closes #2945 from bolkedebruin/AIRFLOW-2002


> Do not swallow exceptions when importing logging confs
> --
>
> Key: AIRFLOW-2002
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2002
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: logging
>Affects Versions: 1.9.0
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-2004) flask.login does not contain flash

2018-01-15 Thread Fokko Driesprong (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong resolved AIRFLOW-2004.
---
   Resolution: Fixed
Fix Version/s: 2.0.0

Issue resolved by pull request #2943
[https://github.com/apache/incubator-airflow/pull/2943]

> flask.login does not contain flash
> --
>
> Key: AIRFLOW-2004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2004
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


incubator-airflow git commit: [AIRFLOW-2004] Import flash from flask not flask.login

2018-01-15 Thread fokko
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 1abe7f6d5 -> 7cf7cd7ca


[AIRFLOW-2004] Import flash from flask not flask.login

Closes #2943 from bolkedebruin/AIRFLOW-2004


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7cf7cd7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7cf7cd7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7cf7cd7c

Branch: refs/heads/master
Commit: 7cf7cd7cae73c3e1e256a04bbd9588146a339f9b
Parents: 1abe7f6
Author: Bolke de Bruin 
Authored: Mon Jan 15 21:04:38 2018 +0100
Committer: Fokko Driesprong 
Committed: Mon Jan 15 21:04:38 2018 +0100

--
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7cf7cd7c/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 716e9fe..cc73c8b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -46,7 +46,7 @@ from flask_admin.contrib.sqla import ModelView
 from flask_admin.actions import action
 from flask_admin.babel import lazy_gettext
 from flask_admin.tools import iterdecode
-from flask_login import flash
+from flask import flash
 from flask._compat import PY2
 
 from jinja2.sandbox import ImmutableSandboxedEnvironment



[jira] [Commented] (AIRFLOW-2004) flask.login does not contain flash

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326548#comment-16326548
 ] 

ASF subversion and git services commented on AIRFLOW-2004:
--

Commit 7cf7cd7cae73c3e1e256a04bbd9588146a339f9b in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7cf7cd7 ]

[AIRFLOW-2004] Import flash from flask not flask.login

Closes #2943 from bolkedebruin/AIRFLOW-2004


> flask.login does not contain flash
> --
>
> Key: AIRFLOW-2004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2004
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2004) flask.login does not contain flash

2018-01-15 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326549#comment-16326549
 ] 

ASF subversion and git services commented on AIRFLOW-2004:
--

Commit 7cf7cd7cae73c3e1e256a04bbd9588146a339f9b in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7cf7cd7 ]

[AIRFLOW-2004] Import flash from flask not flask.login

Closes #2943 from bolkedebruin/AIRFLOW-2004


> flask.login does not contain flash
> --
>
> Key: AIRFLOW-2004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2004
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

2018-01-15 Thread Bolke de Bruin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326374#comment-16326374
 ] 

Bolke de Bruin commented on AIRFLOW-1055:
-

 It is fixen in master

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -
>
> Key: AIRFLOW-1055
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8
>Reporter: Siddharth Anand
>Assignee: Muhammad Ahmmad
>Priority: Critical
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
> print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, 
> microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': False,
> 'start_date': START_DATE,
> 'sla': timedelta(hours=2)
> }
> dag = DAG(
> dag_id = 'manage_sla_once_dag',
> default_args = default_args,
> catchup = False,  
> schedule_interval = '@once',
> sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work started] (AIRFLOW-2006) Add log catching capability to kubernetes operator

2018-01-15 Thread Daniel Imberman (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-2006 started by Daniel Imberman.

> Add log catching capability to kubernetes operator
> --
>
> Key: AIRFLOW-2006
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2006
> Project: Apache Airflow
>  Issue Type: Sub-task
>Reporter: Daniel Imberman
>Assignee: Daniel Imberman
>Priority: Minor
>
> For the kubernetes operator, we can use the kubernetes logging API to gather 
> logs into the central airflow instance so they show up on the UI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2006) Add log catching capability to kubernetes operator

2018-01-15 Thread Daniel Imberman (JIRA)
Daniel Imberman created AIRFLOW-2006:


 Summary: Add log catching capability to kubernetes operator
 Key: AIRFLOW-2006
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2006
 Project: Apache Airflow
  Issue Type: Sub-task
Reporter: Daniel Imberman
Assignee: Daniel Imberman


For the kubernetes operator, we can use the kubernetes logging API to gather 
logs into the central airflow instance so they show up on the UI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

2018-01-15 Thread Muhammad Ahmmad (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326320#comment-16326320
 ] 

Muhammad Ahmmad commented on AIRFLOW-1055:
--

[~bolke], I was wondering where/when the fix for this issue was submitted?

I'm working on a fix for this issue, indeed it happens for once dags with 
catchup is enabled. The fix is simple; but it affects  AIRFLOW-1013, which 
needs more time to be fixed.

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -
>
> Key: AIRFLOW-1055
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8
>Reporter: Siddharth Anand
>Assignee: Muhammad Ahmmad
>Priority: Critical
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
> print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, 
> microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': False,
> 'start_date': START_DATE,
> 'sla': timedelta(hours=2)
> }
> dag = DAG(
> dag_id = 'manage_sla_once_dag',
> default_args = default_args,
> catchup = False,  
> schedule_interval = '@once',
> sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (AIRFLOW-1019) active_dagruns shouldn't include paused DAGs

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1019.
-
   Resolution: Fixed
Fix Version/s: (was: 1.10.0)
   1.9.0

> active_dagruns shouldn't include paused DAGs
> 
>
> Key: AIRFLOW-1019
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1019
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>Affects Versions: 1.8.0
>Reporter: Dan Davydov
>Priority: Critical
> Fix For: 1.9.0
>
>
> Since 1.8.0 Airflow resets orphaned tasks (tasks that are in the DB but not 
> in the executor's memory). The problem is that Airflow counts dagruns in 
> paused DAGs as running as long as the dagruns state is running. Instead we 
> should join against non-paused DAGs everywhere we calculate active dagruns 
> (e.g. in _process_task_instances in the Scheduler class in jobs.py). If there 
> are enough paused DAGs it brings the scheduler to a halt especially on 
> scheduler restarts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-1055.
---
   Resolution: Fixed
Fix Version/s: (was: 1.10.0)
   1.9.0

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -
>
> Key: AIRFLOW-1055
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8
>Reporter: Siddharth Anand
>Assignee: Muhammad Ahmmad
>Priority: Critical
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
> print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, 
> microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': False,
> 'start_date': START_DATE,
> 'sla': timedelta(hours=2)
> }
> dag = DAG(
> dag_id = 'manage_sla_once_dag',
> default_args = default_args,
> catchup = False,  
> schedule_interval = '@once',
> sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-1617) XSS Vulnerability in Variable endpoint

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-1617.
---
Resolution: Fixed

> XSS Vulnerability in Variable endpoint
> --
>
> Key: AIRFLOW-1617
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1617
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.8.2
>Reporter: Bolke de Bruin
>Priority: Critical
>  Labels: security
> Fix For: 1.9.0
>
>
> Variable view has an XSS vulnerability when the Variable template does not 
> exist. The input is returned to the user as is, without escaping.
> Original report by Seth Long. CVE is pending



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (AIRFLOW-1617) XSS Vulnerability in Variable endpoint

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin reopened AIRFLOW-1617:
-

> XSS Vulnerability in Variable endpoint
> --
>
> Key: AIRFLOW-1617
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1617
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.8.2
>Reporter: Bolke de Bruin
>Priority: Critical
>  Labels: security
> Fix For: 1.9.0
>
>
> Variable view has an XSS vulnerability when the Variable template does not 
> exist. The input is returned to the user as is, without escaping.
> Original report by Seth Long. CVE is pending



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1617) XSS Vulnerability in Variable endpoint

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-1617:

Fix Version/s: (was: 1.10.0)
   1.9.0

> XSS Vulnerability in Variable endpoint
> --
>
> Key: AIRFLOW-1617
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1617
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.8.2
>Reporter: Bolke de Bruin
>Priority: Critical
>  Labels: security
> Fix For: 1.9.0
>
>
> Variable view has an XSS vulnerability when the Variable template does not 
> exist. The input is returned to the user as is, without escaping.
> Original report by Seth Long. CVE is pending



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-1617) XSS Vulnerability in Variable endpoint

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-1617.
---
Resolution: Fixed

> XSS Vulnerability in Variable endpoint
> --
>
> Key: AIRFLOW-1617
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1617
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: 1.8.2
>Reporter: Bolke de Bruin
>Priority: Critical
>  Labels: security
> Fix For: 1.10.0
>
>
> Variable view has an XSS vulnerability when the Variable template does not 
> exist. The input is returned to the user as is, without escaping.
> Original report by Seth Long. CVE is pending



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-894) Trigger Rules not functioning

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-894.
--
Resolution: Fixed

Outdated

> Trigger Rules not functioning
> -
>
> Key: AIRFLOW-894
> URL: https://issues.apache.org/jira/browse/AIRFLOW-894
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: dependencies
>Affects Versions: Airflow 1.7.1.3
>Reporter: Matt Inwood
>Priority: Blocker
> Attachments: airflow_error.PNG
>
>
> Code below fails to schedule the join task. This includes with trigger rules 
> for all_done, and one_success. It seems to only occur when dynamically 
> generating tasks. 
> from airflow import DAG
> from airflow.operators import PythonOperator, BranchPythonOperator, 
> DummyOperator
> from datetime import datetime, timedelta
> from datetime import datetime
> from slackclient import SlackClient
> default_args = {
> 'owner': 'analytics',
> 'depends_on_past': False,
> #'start_date': sixty_days_ago,
> 'start_date': datetime(2017, 2, 22),
> 'retries': 0
> # 'retry_delay': timedelta(seconds=30),
> }
> dag = DAG(
> 'Valet_Data',
> default_args=default_args,
> schedule_interval='*/5 * * * *',
> dagrun_timeout=timedelta(seconds=60))
> def valet_function(locdata, ds, **kwargs):
> if locdata == 'D':
> print(intentionalFail)
> join = DummyOperator(
> task_id='join',
> trigger_rule='all_done',
> dag=dag
> )
> list = ['A','B','C','D','E','F','G','H','I','J','Z']
> for l in list:
> task = PythonOperator(
> task_id='{0}_PANTS'.format(l),
> provide_context=True,
> python_callable=valet_function,
> op_kwargs={'locdata': l},
> # on_failure_callback=on_failure,
> # on_success_callback=on_success,
> dag=dag,
> )



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-1103) How we can check our developer programmes are pick which environment variables

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-1103.
---
Resolution: Invalid

Please assk this at the mailinglist

> How we can check our developer programmes are pick which environment variables
> --
>
> Key: AIRFLOW-1103
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1103
> Project: Apache Airflow
>  Issue Type: Task
>Reporter: udhay
>Priority: Blocker
>
> Hi 
> I have updated environment variable .bashrc .profile   but those are not 
> picking up from programmes which is running from airflow
> what is the command to check what are the environment variables are picking 
> by airflow
> ps eww 22559
> can you guid me on this command or any other tool to find which environment 
> variables are taking



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-1693) airflow initdb throws errors

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-1693.
---
Resolution: Invalid

> airflow initdb throws errors
> 
>
> Key: AIRFLOW-1693
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1693
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: configuration, db
>Affects Versions: 1.8.0
> Environment: Mac OS, Python 3.6
>Reporter: Gauri K
>Assignee: Mark Grover
>Priority: Blocker
>  Labels: newbie
>
> I am having installation problems. The log for `airflow initdb` are
> Traceback (most recent call last):
>   File "/Users/gauripradeep/ENVS/scheduler/bin/airflow", line 17, in 
> from airflow import configuration
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/airflow/__init__.py",
>  line 30, in 
> from airflow import settings
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/airflow/settings.py",
>  line 159, in 
> configure_orm()
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/airflow/settings.py",
>  line 147, in configure_orm
> engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/sqlalchemy/engine/__init__.py",
>  line 391, in create_engine
> return strategy.create(*args, **kwargs)
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py",
>  line 80, in create
> dbapi = dialect_cls.dbapi(**dbapi_args)
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/sqlalchemy/dialects/mysql/mysqldb.py",
>  line 110, in dbapi
> return __import__('MySQLdb')
>   File 
> "/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/MySQLdb/__init__.py",
>  line 19, in 
> import _mysql
> ImportError: 
> dlopen(/Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/_mysql.cpython-36m-darwin.so,
>  2): Library not loaded: /usr/local/opt/mysql/lib/libmysqlclient.20.dylib
>   Referenced from: 
> /Users/gauripradeep/ENVS/scheduler/lib/python3.6/site-packages/_mysql.cpython-36m-darwin.so
>   Reason: image not found
> Other commands and dependencies
> pip3 install mysqlclient
> and pip3 install mysql-connector-python-rf
> pip3 install airflow[mysql]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-1927) TaskInstance should also convert naive dates

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-1927:

Fix Version/s: 1.10.0

> TaskInstance should also convert naive dates
> 
>
> Key: AIRFLOW-1927
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1927
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Assignee: Sumit Maheshwari
>Priority: Critical
> Fix For: 1.10.0, 2.0.0
>
>
> Task Instances cannot be saved without time zone information anymore. 
> Although airflow itself is fine, some people like to instantiate 
> TaskInstances themselves. Conversion should therefore be as with Dags.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (AIRFLOW-2005) Paramiko is dependency for sshoperator

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin closed AIRFLOW-2005.
---
Resolution: Fixed

> Paramiko is dependency for sshoperator
> --
>
> Key: AIRFLOW-2005
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2005
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.9.0
>Reporter: Bolke de Bruin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2005) Paramiko is dependency for sshoperator

2018-01-15 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-2005:
---

 Summary: Paramiko is dependency for sshoperator
 Key: AIRFLOW-2005
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2005
 Project: Apache Airflow
  Issue Type: Bug
Affects Versions: 1.9.0
Reporter: Bolke de Bruin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2004) flask.login does not contain flash

2018-01-15 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-2004:
---

 Summary: flask.login does not contain flash
 Key: AIRFLOW-2004
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2004
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Bolke de Bruin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2003) Use flask caching instead of flask cache

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-2003:

Description: Flask cache has been unmaintained for over 3 years. 
Flask-caching is the community maintained version

> Use flask caching instead of flask cache 
> -
>
> Key: AIRFLOW-2003
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
>
> Flask cache has been unmaintained for over 3 years. Flask-caching is the 
> community maintained version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2003) Use flask caching instread

2018-01-15 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-2003:
---

 Summary: Use flask caching instread
 Key: AIRFLOW-2003
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Bolke de Bruin






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2003) Use flask caching instead of flask cache

2018-01-15 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-2003:

Summary: Use flask caching instead of flask cache   (was: Use flask caching 
instread)

> Use flask caching instead of flask cache 
> -
>
> Key: AIRFLOW-2003
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2003
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2001) Make sensors relinquish their execution slots

2018-01-15 Thread Yati (JIRA)
Yati created AIRFLOW-2001:
-

 Summary: Make sensors relinquish their execution slots
 Key: AIRFLOW-2001
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2001
 Project: Apache Airflow
  Issue Type: Bug
  Components: db, scheduler
Reporter: Yati
Assignee: Yati


A sensor task instance should not take up an execution slot for the entirety of 
its lifetime (as is currently the case). Indeed, for reasons outlined below, it 
would be better if sensor execution was preempted by the scheduler by parking 
it away from the slot till the next poll.

 Some sensors sense for a condition to be true which is affected only by an 
external party (e.g., materialization by external means of certain rows in a 
table). By external, I mean external to the Airflow installation in question, 
such that the producing entity itself does not need an execution slot in an 
Airflow pool. If all sensors and their dependencies were of this nature, there 
would be no issue. Unfortunately, a lot of real world DAGs have sensor 
dependencies on results produced by another task, typically in some other DAG, 
but scheduled by the same Airflow scheduler.

Consider a simple example (arrow direction represents "must happen before", 
just like in Airflow): DAG1(a >> b) and DAG2(c:sensor(DAG1.b) >> d). In other 
words, The opening task c of the second dag has a sensor dependency on the 
ending task b of the first dag. Imagine we have a single pool with 10 execution 
slots, and somehow task instances for c fill up the pool, while the 
corresponding task instances of DAG1.b have not had a chance to execute (in the 
real world this happens because of, say, back-fills or reprocesses by clearing 
those sensors instances and their upstream). This is a deadlock situation, 
since no progress can be made here – the sensors have filled up the pool 
waiting on tasks that themselves will never get a chance to run. This problem 
has been [acknowledged 
here|https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls]

One way (suggested by Fokko) to solve this is to always run sensors on their 
pool, and to be careful with the concurrency settings of sensor tasks. This is 
what a lot of users do now, but there are better solutions to this. Since all 
the sensor interface allows for is a poll, we can, after each poll, "park" the 
sensor's execution slot and yield it to other tasks. In the above scenario, 
there would be no "filling up" of the pool by sensors tasks, as they will be 
polled, determined to be still unfulfilled, and then parked away, thereby 
giving a chance to other tasks.

This would likely have some changes to the DB, and of course to the scheduler.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)