[jira] [Commented] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
[ https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16326320#comment-16326320 ] Muhammad Ahmmad commented on AIRFLOW-1055: -- [~bolke], I was wondering where/when the fix for this issue was submitted? I'm working on a fix for this issue, indeed it happens for once dags with catchup is enabled. The fix is simple; but it affects AIRFLOW-1013, which needs more time to be fixed. > airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False > - > > Key: AIRFLOW-1055 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1055 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: Siddharth Anand >Assignee: Muhammad Ahmmad >Priority: Critical > Labels: dagrun, once, scheduler, sla > Fix For: 1.9.0 > > > Getting following exception > {noformat} > [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 346, in helper > pickle_dags) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1581, in process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1175, in _process_dags > self.manage_slas(dag) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 595, in manage_slas > while dttm < datetime.now(): > TypeError: can't compare datetime.datetime to NoneType > {noformat} > Exception is in airflow/jobs.py:manage_slas() : > https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595 > {code} > ts = datetime.now() > SlaMiss = models.SlaMiss > for ti in max_tis: > task = dag.get_task(ti.task_id) > dttm = ti.execution_date > if task.sla: > dttm = dag.following_schedule(dttm) > >>> while dttm < datetime.now(): <<< here > following_schedule = dag.following_schedule(dttm) > if following_schedule + task.sla < datetime.now(): > session.merge(models.SlaMiss( > task_id=ti.task_id, > {code} > It seems that dag.following_schedule() returns None for @once dag? > Here's how dag is defined: > {code} > import datetime as dt > import airflow > from airflow.models import DAG > from airflow.operators.dummy_operator import DummyOperator > from datetime import datetime, timedelta > def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis): > print('Executing SLA miss callback') > now = datetime.now() > now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, > microsecond=0) > START_DATE = now_to_the_hour + timedelta(hours=-3) > DAG_NAME = 'manage_sla_once_dag' > default_args = { > 'owner': 'sanand', > 'depends_on_past': False, > 'start_date': START_DATE, > 'sla': timedelta(hours=2) > } > dag = DAG( > dag_id = 'manage_sla_once_dag', > default_args = default_args, > catchup = False, > schedule_interval = '@once', > sla_miss_callback = sla_alert_func > ) > task1 = DummyOperator(task_id='task1', dag=dag) > {code} > This issue works if catchup = True. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-867) Tons of unit tests are ignored
[ https://issues.apache.org/jira/browse/AIRFLOW-867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muhammad Ahmmad reassigned AIRFLOW-867: --- Assignee: Muhammad Ahmmad (was: George Sakkis) > Tons of unit tests are ignored > -- > > Key: AIRFLOW-867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-867 > Project: Apache Airflow > Issue Type: Bug > Components: tests >Reporter: George Sakkis >Assignee: Muhammad Ahmmad > > I was poking around in tests and found out that lots of tests are not > discovered by nosetests: > {noformat} > $ nosetests -q --collect-only > -- > Ran 254 tests in 0.948s > $ grep -R 'def test' tests/ | wc -l > 360 > {noformat} > Initially I thought it might be related to not having installed all extra > dependencies but it turns out it's because apparently nosetests expects > explicit import of the related modules instead of discovering them > automatically (like py.test). For example, when adding an {{from > .ti_deps.deps.runnable_exec_date_dep import *}} in {{tests/__init__.py}} it > finds 260 tests, while when commenting out all imports in this module it > finds only 15! > h4. Possible options > * Quick fix: Add the necessary missing "import *" to discover all current > tests. > * Better fix: Rename all test modules to start with "test_" > -Move from nosetests to py.test and get rid of the ugly error-prone 'import > *' hack.- -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (AIRFLOW-1055) airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False
[ https://issues.apache.org/jira/browse/AIRFLOW-1055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muhammad Ahmmad reassigned AIRFLOW-1055: Assignee: Muhammad Ahmmad (was: Ruslan Dautkhanov) > airflow/jobs.py:create_dag_run() exception for @once dag when catchup = False > - > > Key: AIRFLOW-1055 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1055 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: Siddharth Anand >Assignee: Muhammad Ahmmad >Priority: Critical > Labels: dagrun, once, scheduler, sla > Fix For: 1.9.0 > > > Getting following exception > {noformat} > [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 346, in helper > pickle_dags) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1581, in process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1175, in _process_dags > self.manage_slas(dag) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 595, in manage_slas > while dttm < datetime.now(): > TypeError: can't compare datetime.datetime to NoneType > {noformat} > Exception is in airflow/jobs.py:manage_slas() : > https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595 > {code} > ts = datetime.now() > SlaMiss = models.SlaMiss > for ti in max_tis: > task = dag.get_task(ti.task_id) > dttm = ti.execution_date > if task.sla: > dttm = dag.following_schedule(dttm) > >>> while dttm < datetime.now(): <<< here > following_schedule = dag.following_schedule(dttm) > if following_schedule + task.sla < datetime.now(): > session.merge(models.SlaMiss( > task_id=ti.task_id, > {code} > It seems that dag.following_schedule() returns None for @once dag? > Here's how dag is defined: > {code} > import datetime as dt > import airflow > from airflow.models import DAG > from airflow.operators.dummy_operator import DummyOperator > from datetime import datetime, timedelta > def sla_alert_func(dag, task_list, blocking_task_list, slas, blocking_tis): > print('Executing SLA miss callback') > now = datetime.now() > now_to_the_hour = now.replace(hour=now.time().hour, minute=0, second=0, > microsecond=0) > START_DATE = now_to_the_hour + timedelta(hours=-3) > DAG_NAME = 'manage_sla_once_dag' > default_args = { > 'owner': 'sanand', > 'depends_on_past': False, > 'start_date': START_DATE, > 'sla': timedelta(hours=2) > } > dag = DAG( > dag_id = 'manage_sla_once_dag', > default_args = default_args, > catchup = False, > schedule_interval = '@once', > sla_miss_callback = sla_alert_func > ) > task1 = DummyOperator(task_id='task1', dag=dag) > {code} > This issue works if catchup = True. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (AIRFLOW-1013) airflow/jobs.py:manage_slas() exception for @once dag
[ https://issues.apache.org/jira/browse/AIRFLOW-1013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muhammad Ahmmad reassigned AIRFLOW-1013: Assignee: Muhammad Ahmmad > airflow/jobs.py:manage_slas() exception for @once dag > - > > Key: AIRFLOW-1013 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1013 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8, 1.8.1, 1.8.0, 1.8.2 >Reporter: Ruslan Dautkhanov >Assignee: Muhammad Ahmmad >Priority: Blocker > Labels: dagrun, once, scheduler, sla > Fix For: 1.9.0 > > > Getting following exception > {noformat} > [2017-03-19 20:16:25,786] {jobs.py:354} DagFileProcessor2638 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 346, in helper > pickle_dags) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1581, in process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 1175, in _process_dags > self.manage_slas(dag) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/opt/cloudera/parcels/Anaconda/lib/python2.7/site-packages/airflow/jobs.py", > line 595, in manage_slas > while dttm < datetime.now(): > TypeError: can't compare datetime.datetime to NoneType > {noformat} > Exception is in airflow/jobs.py:manage_slas() : > https://github.com/apache/incubator-airflow/blob/v1-8-stable/airflow/jobs.py#L595 > {code} > ts = datetime.now() > SlaMiss = models.SlaMiss > for ti in max_tis: > task = dag.get_task(ti.task_id) > dttm = ti.execution_date > if task.sla: > dttm = dag.following_schedule(dttm) > >>> while dttm < datetime.now(): <<< here > following_schedule = dag.following_schedule(dttm) > if following_schedule + task.sla < datetime.now(): > session.merge(models.SlaMiss( > task_id=ti.task_id, > {code} > It seems that dag.following_schedule() returns None for @once dag? > Here's how dag is defined: > {code} > main_dag = DAG( > dag_id = 'DISCOVER-Oracle-Load', > default_args = default_args, > user_defined_macros= dag_macros, > start_date = datetime.now(), > catchup= False, > schedule_interval = '@once', > concurrency= 2, > max_active_runs= 1, > dagrun_timeout = timedelta(days=4), > ) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)