[jira] [Commented] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

2018-01-15 Thread Muhammad Ahmmad (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326320#comment-16326320
 ] 

Muhammad Ahmmad commented on AIRFLOW-1055:
--

[~bolke], I was wondering where/when the fix for this issue was submitted?

I'm working on a fix for this issue, indeed it happens for once dags with 
catchup is enabled. The fix is simple; but it affects  AIRFLOW-1013, which 
needs more time to be fixed.

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -
>
> Key: AIRFLOW-1055
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8
>Reporter: Siddharth Anand
>Assignee: Muhammad Ahmmad
>Priority: Critical
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
> print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, 
> microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': False,
> 'start_date': START_DATE,
> 'sla': timedelta(hours=2)
> }
> dag = DAG(
> dag_id = 'manage_sla_once_dag',
> default_args = default_args,
> catchup = False,  
> schedule_interval = '@once',
> sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (AIRFLOW-867) Tons of unit tests are ignored

2017-09-20 Thread Muhammad Ahmmad (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muhammad Ahmmad reassigned AIRFLOW-867:
---

Assignee: Muhammad Ahmmad  (was: George Sakkis)

> Tons of unit tests are ignored
> --
>
> Key: AIRFLOW-867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: tests
>Reporter: George Sakkis
>Assignee: Muhammad Ahmmad
>
> I was poking around in tests and found out that lots of tests are not 
> discovered by nosetests:
> {noformat}
> $ nosetests -q --collect-only 
> --
> Ran 254 tests in 0.948s
> $ grep -R 'def test' tests/ | wc -l
> 360
> {noformat}
> Initially I thought it might be related to not having installed all extra 
> dependencies but it turns out it's because apparently nosetests expects 
> explicit import of the related modules instead of discovering them 
> automatically (like py.test). For example, when adding an {{from 
> .ti_deps.deps.runnable_exec_date_dep import *}} in {{tests/__init__.py}} it 
> finds 260 tests, while when commenting out all imports in this module it 
> finds only 15!
> h4. Possible options
> * Quick fix: Add the necessary missing "import *" to discover all current 
> tests.
> * Better fix: Rename all test modules to start with "test_"
>   -Move from nosetests to py.test and get rid of the ugly error-prone 'import 
> *' hack.-



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False

2017-09-08 Thread Muhammad Ahmmad (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muhammad Ahmmad reassigned AIRFLOW-1055:


Assignee: Muhammad Ahmmad  (was: Ruslan Dautkhanov)

> airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
> -
>
> Key: AIRFLOW-1055
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1055
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8
>Reporter: Siddharth Anand
>Assignee: Muhammad Ahmmad
>Priority: Critical
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> import datetime as dt
> import airflow
> from airflow.models import DAG
> from airflow.operators.dummy_operator import DummyOperator
> from datetime import datetime, timedelta
> def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis):
> print('Executing SLA miss callback')
> now = datetime.now()
> now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, 
> microsecond=0)
> START_DATE = now_to_the_hour + timedelta(hours=-3)
> DAG_NAME = 'manage_sla_once_dag'
> default_args = {
> 'owner': 'sanand',
> 'depends_on_past': False,
> 'start_date': START_DATE,
> 'sla': timedelta(hours=2)
> }
> dag = DAG(
> dag_id = 'manage_sla_once_dag',
> default_args = default_args,
> catchup = False,  
> schedule_interval = '@once',
> sla_miss_callback = sla_alert_func
> )
> task1 = DummyOperator(task_id='task1', dag=dag)
> {code}
> This issue works if  catchup = True. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (AIRFLOW-1013) airflow/jobs.py:manage_slas() exception for @once dag

2017-09-08 Thread Muhammad Ahmmad (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muhammad Ahmmad reassigned AIRFLOW-1013:


Assignee: Muhammad Ahmmad

> airflow/jobs.py:manage_slas() exception for @once dag
> -
>
> Key: AIRFLOW-1013
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1013
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: Airflow 1.8, 1.8.1, 1.8.0, 1.8.2
>Reporter: Ruslan Dautkhanov
>Assignee: Muhammad Ahmmad
>Priority: Blocker
>  Labels: dagrun, once, scheduler, sla
> Fix For: 1.9.0
>
>
> Getting following exception 
> {noformat}
> [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 346, in helper
> pickle_dags)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1581, in process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 1175, in _process_dags
> self.manage_slas(dag)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py",
>  line 53, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", 
> line 595, in manage_slas
> while dttm < datetime.now():
> TypeError: can't compare datetime.datetime to NoneType
> {noformat}
> Exception is in airflow/jobs.py:manage_slas() :
> https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595
> {code}
> ts = datetime.now()
> SlaMiss = models.SlaMiss
> for ti in max_tis:
> task = dag.get_task(ti.task_id)
> dttm = ti.execution_date
> if task.sla:
> dttm = dag.following_schedule(dttm)
>   >>>   while dttm < datetime.now():  <<< here
> following_schedule = dag.following_schedule(dttm)
> if following_schedule + task.sla < datetime.now():
> session.merge(models.SlaMiss(
> task_id=ti.task_id,
> {code}
> It seems that dag.following_schedule() returns None for @once dag?
> Here's how dag is defined:
> {code}
> main_dag = DAG(
> dag_id = 'DISCOVER-Oracle-Load',
> default_args   = default_args,   
> user_defined_macros= dag_macros,   
> start_date = datetime.now(), 
> catchup= False,  
> schedule_interval  = '@once',
> concurrency= 2,  
> max_active_runs= 1,  
> dagrun_timeout = timedelta(days=4),  
> )
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)