[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI
[ https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-252: Affects Version/s: 1.7.1.3 > Raise Sqlite exceptions when deleting tasks instance in WebUI > - > > Key: AIRFLOW-252 > URL: https://issues.apache.org/jira/browse/AIRFLOW-252 > Project: Apache Airflow > Issue Type: Bug > Components: ui >Affects Versions: 1.7.1.3 > Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 > 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux >Reporter: ryanchou > > When I try to delete a task instances on browse -> tasks instances page > through WebUI. It raise a Sqlite exception > The airflow version is v1.7.1.3 I haven't seen the version choice in jira > issue. > ``` > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", > line 607, in process > raise TypeError("SQLite DateTime type only accepts Python " > StatementError: (exceptions.TypeError) SQLite DateTime type only accepts > Python datetime and date objects as input. [SQL: u'SELECT > task_instance.task_id AS task_instance_task_id, task_in > ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI
[ https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ryanchou updated AIRFLOW-252: - Description: When I try to delete a task instances on browse -> tasks instances page through WebUI. It raise a Sqlite exception The airflow version is v1.7.1.3 I haven't seen the version choice in jira issue. ``` File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", line 607, in process raise TypeError("SQLite DateTime type only accepts Python " StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS task_instance_task_id, task_in ``` was: When I try to delete a task instances on browse -> tasks instances page through WebUI. It raise a Sqlite exception ``` File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", line 607, in process raise TypeError("SQLite DateTime type only accepts Python " StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS task_instance_task_id, task_in ``` > Raise Sqlite exceptions when deleting tasks instance in WebUI > - > > Key: AIRFLOW-252 > URL: https://issues.apache.org/jira/browse/AIRFLOW-252 > Project: Apache Airflow > Issue Type: Bug > Components: ui > Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 > 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux >Reporter: ryanchou > > When I try to delete a task instances on browse -> tasks instances page > through WebUI. It raise a Sqlite exception > The airflow version is v1.7.1.3 I haven't seen the version choice in jira > issue. > ``` > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", > line 607, in process > raise TypeError("SQLite DateTime type only accepts Python " > StatementError: (exceptions.TypeError) SQLite DateTime type only accepts > Python datetime and date objects as input. [SQL: u'SELECT > task_instance.task_id AS task_instance_task_id, task_in > ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI
ryanchou created AIRFLOW-252: Summary: Raise Sqlite exceptions when deleting tasks instance in WebUI Key: AIRFLOW-252 URL: https://issues.apache.org/jira/browse/AIRFLOW-252 Project: Apache Airflow Issue Type: Bug Components: ui Affects Versions: Airflow 1.7.1.2 Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux Reporter: ryanchou When I try to delete a task instances on browse -> tasks instances page through WebUI. It raise a Sqlite exception ``` File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", line 607, in process raise TypeError("SQLite DateTime type only accepts Python " StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS task_instance_task_id, task_in ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI
[ https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ryanchou updated AIRFLOW-252: - Affects Version/s: (was: Airflow 1.7.1.2) > Raise Sqlite exceptions when deleting tasks instance in WebUI > - > > Key: AIRFLOW-252 > URL: https://issues.apache.org/jira/browse/AIRFLOW-252 > Project: Apache Airflow > Issue Type: Bug > Components: ui > Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 > 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux >Reporter: ryanchou > > When I try to delete a task instances on browse -> tasks instances page > through WebUI. It raise a Sqlite exception > ``` > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", > line 607, in process > raise TypeError("SQLite DateTime type only accepts Python " > StatementError: (exceptions.TypeError) SQLite DateTime type only accepts > Python datetime and date objects as input. [SQL: u'SELECT > task_instance.task_id AS task_instance_task_id, task_in > ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
incubator-airflow git commit: Add Postmates to Airflow users list
Repository: incubator-airflow Updated Branches: refs/heads/master ea2904610 -> ce362c312 Add Postmates to Airflow users list Closes #1599 from Syeoryn/master Add Postmates to Airflow users list Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ce362c31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ce362c31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ce362c31 Branch: refs/heads/master Commit: ce362c312ccb1bace7215d156909f42d7e51898a Parents: ea29046 Author: Drew CuthbertsonAuthored: Thu Jun 16 19:10:04 2016 -0700 Committer: Dan Davydov Committed: Thu Jun 16 19:10:04 2016 -0700 -- README.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce362c31/README.md -- diff --git a/README.md b/README.md index 43c8818..f34ec7c 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,7 @@ Currently **officially** using Airflow: * [Lucid](http://luc.id) [[@jbrownlucid](https://github.com/jbrownlucid) & [@kkourtchikov](https://github.com/kkourtchikov)] * [Lyft](https://www.lyft.com/)[[@SaurabhBajaj](https://github.com/SaurabhBajaj)] * [Nerdwallet](https://www.nerdwallet.com) +* [Postmates](http://www.postmates.com) [[@syeoryn](https://github.com/syeoryn)] * [Qubole](https://qubole.com) [[@msumit](https://github.com/msumit)] * [Sense360](https://github.com/Sense360) [[@kamilmroczek](https://github.com/KamilMroczek)] * [Sidecar](https://hello.getsidecar.com/) [[@getsidecar](https://github.com/getsidecar)]
[jira] [Commented] (AIRFLOW-251) Add optional parameter SQL_ALCHEMY_SCHEMA to control schema for metadata repository
[ https://issues.apache.org/jira/browse/AIRFLOW-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335105#comment-15335105 ] Ed Parcell commented on AIRFLOW-251: I created a pull request for this issue: https://github.com/apache/incubator-airflow/pull/1600 > Add optional parameter SQL_ALCHEMY_SCHEMA to control schema for metadata > repository > --- > > Key: AIRFLOW-251 > URL: https://issues.apache.org/jira/browse/AIRFLOW-251 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Affects Versions: Airflow 2.0 >Reporter: Ed Parcell >Priority: Minor > > Using SQL Server as a database for metadata, it is preferable to group all > Airflow tables into a separate schema, rather than using dbo. I propose > adding an optional parameter SQL_ALCHEMY_SCHEMA to control this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-249) Refactor the SLA mecanism
[ https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334642#comment-15334642 ] Chris Riccomini commented on AIRFLOW-249: - This is really awesome analysis. Can you send a PR? (unless I missed it) > Refactor the SLA mecanism > - > > Key: AIRFLOW-249 > URL: https://issues.apache.org/jira/browse/AIRFLOW-249 > Project: Apache Airflow > Issue Type: Improvement >Reporter: dud > > Hello > I've noticed the SLA feature is currently behaving as follow : > - it doesn't work on DAG scheduled @once or None because they have no > dag.followwing_schedule property > - it keeps endlessly checking for SLA misses without ever worrying about any > end_date. Worse I noticed that emails are still being sent for runs that are > never happening because of end_date > - it keeps checking for recent TIs even if SLA notification has been already > been sent for them > - the SLA logic is only being fired after following_schedule + sla has > elapsed, in other words one has to wait for the next TI before having a > chance of getting any email. Also the email reports dag.following_schedule > time (I guess because it is close of TI.start_date), but unfortunately that > doesn't match what the task instances shows nor the log filename > - the SLA logic is based on max(TI.execution_date) for the starting point of > its checks, that means that for a DAG whose SLA is longer than its schedule > period if half of the TIs are running longer than expected it will go > unnoticed. This could be demonstrated with a DAG like this one : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 6, 16, 12, 20), > 'email': my_email > 'sla': timedelta(minutes=2), > } > dag = DAG('unnoticed_sla', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='sla_miss', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > I've tried to rework the SLA triggering mechanism by addressing the above > points., please [have a look on > it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] > I made some tests with this patch : > - the fluctuent DAG shown above no longer make Airflow skip any SLA event : > {code} > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 > 15:08:26.058631 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 > 15:10:06.093253 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 > 15:12:06.241773 | | t > {code} > - on a normal DAG, the SLA is being triggred more quickly : > {code} > // start_date = 2016-06-16 15:55:00 > // end_date = 2016-06-16 16:00:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16 > 15:58:11.832299 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16 > 15:59:09.663778 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16 > 16:00:13.651422 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16 > 16:01:08.576399 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16 > 16:02:08.523486 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16 > 16:03:08.538593 | | t > (6 rows) > {code} > than before (current master branch) : > {code} > // start_date = 2016-06-16 15:40:00 > // end_date = 2016-06-16 15:45:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent >
[jira] [Closed] (AIRFLOW-224) Collect orphaned tasks in case of unclean shutdown
[ https://issues.apache.org/jira/browse/AIRFLOW-224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-224. --- Resolution: Fixed Assignee: Bolke de Bruin > Collect orphaned tasks in case of unclean shutdown > -- > > Key: AIRFLOW-224 > URL: https://issues.apache.org/jira/browse/AIRFLOW-224 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Reporter: Bolke de Bruin >Assignee: Bolke de Bruin > Fix For: Airflow 1.8 > > > Tasks in a "scheduled" state can get orphaned if either the executor or the > scheduler is shutdown uncleanly (kill -9). > This should be addressed by making sure the scheduler confirms sending tasks > to the executor and checking weather "scheduled" tasks are still in the queue > of the executor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (AIRFLOW-173) Create a FileSensor / NFSFileSensor sensor
[ https://issues.apache.org/jira/browse/AIRFLOW-173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-173. --- Resolution: Fixed Assignee: Andre Fix Version/s: Airflow 1.8 > Create a FileSensor / NFSFileSensor sensor > -- > > Key: AIRFLOW-173 > URL: https://issues.apache.org/jira/browse/AIRFLOW-173 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Andre >Assignee: Andre >Priority: Minor > Fix For: Airflow 1.8 > > > While HDFS and WebHDFS suit most organisations using Hadoop, for some shops > running MapR-FS, Airflow implementation is simplified by the use of plain > files pointing to MapR's NFS gateways. > A FileSensor and/or a NFSFileSensor would assist the adoption of Airflow > within the MapR customer base, but more importantly, help those who are using > POSIX compliant distributed filesystems that can be mounted on Unix > derivative systems (e.g. as MapR-FS (via NFS), CephFS, GlusterFS, etc). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-247) EMR Hook, Operators, Sensor
[ https://issues.apache.org/jira/browse/AIRFLOW-247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334586#comment-15334586 ] Chris Riccomini commented on AIRFLOW-247: - [~rfroetscher], totally fine. Go for it. Might want to ping [~artwr], as well, though. > EMR Hook, Operators, Sensor > --- > > Key: AIRFLOW-247 > URL: https://issues.apache.org/jira/browse/AIRFLOW-247 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Rob Froetscher >Assignee: Rob Froetscher >Priority: Minor > > Substory of https://issues.apache.org/jira/browse/AIRFLOW-115. It would be > nice to have an EMR hook and operators. > Hook to generally interact with EMR. > Operators to: > * setup and start a job flow > * add steps to an existing jobflow > A sensor to: > * monitor completion and status of EMR jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (AIRFLOW-239) test_hdfs_sensor, etc not collected by nosetests
[ https://issues.apache.org/jira/browse/AIRFLOW-239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-239. --- Resolution: Fixed Fix Version/s: Airflow 1.8 > test_hdfs_sensor, etc not collected by nosetests > > > Key: AIRFLOW-239 > URL: https://issues.apache.org/jira/browse/AIRFLOW-239 > Project: Apache Airflow > Issue Type: Bug >Reporter: Li Xuanji >Assignee: Li Xuanji > Fix For: Airflow 1.8 > > > Due to the indentation at > https://github.com/apache/incubator-airflow/blob/5963ac8b7b7c0b3470ce6171b1cd928485bc9884/tests/core.py#L1605-L1623 > The methods test_hdfs_sensor and below are treated as internal functions of > test_presto_to_mysql and not collected by nosetests > ``` > ➜ zodiac-airflow git:(master) export AIRFLOW_RUNALL_TESTS=1 > ➜ zodiac-airflow git:(master) nosetests -v --collect-only 2>&1 | grep > test_presto_to_mysql | wc -l >1 > ➜ zodiac-airflow git:(master) nosetests -v --collect-only 2>&1 | grep > test_hdfs_sensor | wc -l >0 > ➜ zodiac-airflow git:(master) > ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-225) Better units for task duration graph
[ https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334546#comment-15334546 ] ASF subversion and git services commented on AIRFLOW-225: - Commit ea2904610651c53853d89e8296ab4c71e0926588 in incubator-airflow's branch refs/heads/master from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ea29046 ] Merge pull request #1595 from sekikn/AIRFLOW-225 > Better units for task duration graph > > > Key: AIRFLOW-225 > URL: https://issues.apache.org/jira/browse/AIRFLOW-225 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Reporter: Jakob Homan >Assignee: Kengo Seki > Attachments: screenshot-1.png > > > Right now the job duration window defaults to hours, which for short lived > tasks results in numbers out to five decimals. Instead, it should adjust the > scale to hours, minutes, seconds, as appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (AIRFLOW-225) Better units for task duration graph
[ https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-225. --- Resolution: Fixed Fix Version/s: Airflow 1.8 +1 Merged. Thanks! > Better units for task duration graph > > > Key: AIRFLOW-225 > URL: https://issues.apache.org/jira/browse/AIRFLOW-225 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Reporter: Jakob Homan >Assignee: Kengo Seki > Fix For: Airflow 1.8 > > Attachments: screenshot-1.png > > > Right now the job duration window defaults to hours, which for short lived > tasks results in numbers out to five decimals. Instead, it should adjust the > scale to hours, minutes, seconds, as appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-225) Better units for task duration graph
[ https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334545#comment-15334545 ] ASF subversion and git services commented on AIRFLOW-225: - Commit 1141165271b0e1546ca231bf8aa412793dc38201 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=1141165 ] [AIRFLOW-225] Better units for task duration graph Right now the job duration window defaults to hours, which for short lived tasks results in numbers out to five decimals. This patch adjusts the scale of the Y-axis in accordance with the maximum value of the durations to be shown. > Better units for task duration graph > > > Key: AIRFLOW-225 > URL: https://issues.apache.org/jira/browse/AIRFLOW-225 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Reporter: Jakob Homan >Assignee: Kengo Seki > Attachments: screenshot-1.png > > > Right now the job duration window defaults to hours, which for short lived > tasks results in numbers out to five decimals. Instead, it should adjust the > scale to hours, minutes, seconds, as appropriate. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-airflow git commit: [AIRFLOW-225] Better units for task duration graph
Repository: incubator-airflow Updated Branches: refs/heads/master 949479cf7 -> ea2904610 [AIRFLOW-225] Better units for task duration graph Right now the job duration window defaults to hours, which for short lived tasks results in numbers out to five decimals. This patch adjusts the scale of the Y-axis in accordance with the maximum value of the durations to be shown. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/11411652 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/11411652 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/11411652 Branch: refs/heads/master Commit: 1141165271b0e1546ca231bf8aa412793dc38201 Parents: 8aa7160 Author: Kengo SekiAuthored: Wed Jun 15 04:19:02 2016 + Committer: Kengo Seki Committed: Wed Jun 15 04:19:02 2016 + -- airflow/www/views.py | 21 +++-- 1 file changed, 19 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11411652/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index b468bc1..fc6819b 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1477,6 +1477,7 @@ class Airflow(BaseView): include_downstream=False) all_data = [] +max_duration = 0 for task in dag.tasks: data = [] for ti in task.get_task_instances(session, start_date=min_date, @@ -1484,11 +1485,27 @@ class Airflow(BaseView): if ti.duration: data.append([ ti.execution_date.isoformat(), -float(ti.duration) / (60*60) +ti.duration ]) +if max_duration < ti.duration: +max_duration = ti.duration if data: all_data.append({'data': data, 'name': task.task_id}) +def divide_durations(all_data, denom): +for data in all_data: +for d in data['data']: +d[1] /= denom + +if 60*60 < max_duration: +unit = 'hours' +divide_durations(all_data, float(60*60)) +elif 60 < max_duration: +unit = 'minutes' +divide_durations(all_data, 60.0) +else: +unit = 'seconds' + tis = dag.get_task_instances( session, start_date=min_date, end_date=base_date) dates = sorted(list({ti.execution_date for ti in tis})) @@ -1503,7 +1520,7 @@ class Airflow(BaseView): 'airflow/chart.html', dag=dag, data=json.dumps(all_data), -chart_options={'yAxis': {'title': {'text': 'hours'}}}, +chart_options={'yAxis': {'title': {'text': unit}}}, height="700px", demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root,
[jira] [Commented] (AIRFLOW-244) Expose task/dag id/run data for ingestion by performance monitoring tools
[ https://issues.apache.org/jira/browse/AIRFLOW-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334530#comment-15334530 ] Chris Riccomini commented on AIRFLOW-244: - [~pbramsen], could you add more description to this? Where do you want these items exposed? > Expose task/dag id/run data for ingestion by performance monitoring tools > - > > Key: AIRFLOW-244 > URL: https://issues.apache.org/jira/browse/AIRFLOW-244 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Paul Bramsen > > Expose task/dag id/run data for ingestion by performance analysis tools like > Dr. Elephant. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-250) Add task-specific template searchpaths
[ https://issues.apache.org/jira/browse/AIRFLOW-250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334426#comment-15334426 ] Peter Attardo commented on AIRFLOW-250: --- I can put together a PR for this if it's seen as a worthwhile feature to add. > Add task-specific template searchpaths > -- > > Key: AIRFLOW-250 > URL: https://issues.apache.org/jira/browse/AIRFLOW-250 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Peter Attardo > > Currently template searchpaths are pulled from the DAG, but there are many > instances where we would want a specific task to look in its own folder for > its template(s) to avoid name collisions, or to define the template location > in the task definition and not have to worry about what DAG it is added to. > Adding an optional template_searchpath parameter to BaseOperator that > operates similar to, and has precedence over, the DAG's template_searchpath > would allow this functionality without impacting existing templating logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-250) Add task-specific template searchpaths
Peter Attardo created AIRFLOW-250: - Summary: Add task-specific template searchpaths Key: AIRFLOW-250 URL: https://issues.apache.org/jira/browse/AIRFLOW-250 Project: Apache Airflow Issue Type: Bug Reporter: Peter Attardo Currently template searchpaths are pulled from the DAG, but there are many instances where we would want a specific task to look in its own folder for its template(s) to avoid name collisions, or to define the template location in the task definition and not have to worry about what DAG it is added to. Adding an optional template_searchpath parameter to BaseOperator that operates similar to, and has precedence over, the DAG's template_searchpath would allow this functionality without impacting existing templating logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-250) Add task-specific template searchpaths
[ https://issues.apache.org/jira/browse/AIRFLOW-250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Attardo updated AIRFLOW-250: -- Issue Type: Improvement (was: Bug) > Add task-specific template searchpaths > -- > > Key: AIRFLOW-250 > URL: https://issues.apache.org/jira/browse/AIRFLOW-250 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Peter Attardo > > Currently template searchpaths are pulled from the DAG, but there are many > instances where we would want a specific task to look in its own folder for > its template(s) to avoid name collisions, or to define the template location > in the task definition and not have to worry about what DAG it is added to. > Adding an optional template_searchpath parameter to BaseOperator that > operates similar to, and has precedence over, the DAG's template_searchpath > would allow this functionality without impacting existing templating logic. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-249) Refactor the SLA mecanism
dud created AIRFLOW-249: --- Summary: Refactor the SLA mecanism Key: AIRFLOW-249 URL: https://issues.apache.org/jira/browse/AIRFLOW-249 Project: Apache Airflow Issue Type: Improvement Reporter: dud Hello I've noticed the SLA feature is currently behaving as follow : - it doesn't work on DAG scheduled @once or None because they have no dag.followwing_schedule property - it keeps endlessly checking for SLA misses without ever worrying about any end_date. Worse I noticed that emails are still being sent for runs that are never happening because of end_date - it keeps checking for recent TIs even if SLA notification has been already been sent for them - the SLA logic is only being fired after following_schedule + sla has elapsed, in other words one has to wait for the next TI before having a chance of getting any email. Also the email reports dag.following_schedule time (I guess because it is close of TI.start_date), but unfortunately that doesn't match what the task instances shows nor the log filename - the SLA logic is based on max(TI.execution_date) for the starting point of its checks, that means that for a DAG whose SLA is longer than its schedule period if half of the TIs are running longer than expected it will go unnoticed. This could be demonstrated with a DAG like this one : {code} from airflow import DAG from airflow.operators import * from datetime import datetime, timedelta from time import sleep default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2016, 6, 16, 12, 20), 'email': my_email 'sla': timedelta(minutes=2), } dag = DAG('unnoticed_sla', default_args=default_args, schedule_interval=timedelta(minutes=1)) def alternating_sleep(**kwargs): minute = kwargs['execution_date'].strftime("%M") is_odd = int(minute) % 2 if is_odd: sleep(300) else: sleep(10) return True PythonOperator( task_id='sla_miss', python_callable=alternating_sleep, provide_context=True, dag=dag) {code} I've tried to rework the SLA triggering mechanism by addressing the above points., please [have a look on it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] I made some tests with this patch : - the fluctuent DAG shown above no longer make Airflow skip any SLA event : {code} task_id |dag_id | execution_date| email_sent | timestamp | description | notification_sent --+---+-+++-+--- sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 15:08:26.058631 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 15:10:06.093253 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 15:12:06.241773 | | t {code} - on a normal DAG, the SLA is being triggred more quickly : {code} // start_date = 2016-06-16 15:55:00 // end_date = 2016-06-16 16:00:00 // schedule_interval = timedelta(minutes=1) // sla = timedelta(minutes=2) task_id |dag_id | execution_date| email_sent | timestamp | description | notification_sent --+---+-+++-+--- sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16 15:58:11.832299 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16 15:59:09.663778 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16 16:00:13.651422 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16 16:01:08.576399 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16 16:02:08.523486 | | t sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16 16:03:08.538593 | | t (6 rows) {code} than before (current master branch) : {code} // start_date = 2016-06-16 15:40:00 // end_date = 2016-06-16 15:45:00 // schedule_interval = timedelta(minutes=1) // sla = timedelta(minutes=2) task_id |dag_id | execution_date| email_sent | timestamp | description | notification_sent --+---+-+++-+--- sla_miss | dag_sla_miss1 | 2016-06-16 15:41:00 | t | 2016-06-16 15:44:30.305287 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:42:00 | t | 2016-06-16 15:45:35.372118 | | t sla_miss | dag_sla_miss1 | 2016-06-16 15:43:00 | t | 2016-06-16 15:46:30.415744 | | t sla_miss | dag_sla_miss1 | 2016-06-16
[jira] [Resolved] (AIRFLOW-140) DagRun state not updated
[ https://issues.apache.org/jira/browse/AIRFLOW-140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dud resolved AIRFLOW-140. - Resolution: Fixed This issue has been solved by commit [b18c9959142f3f1e2cb031c8709225af01192e32|https://github.com/apache/incubator-airflow/commit/b18c9959142f3f1e2cb031c8709225af01192e32]. [~bolke] many thanks for your work :) > DagRun state not updated > > > Key: AIRFLOW-140 > URL: https://issues.apache.org/jira/browse/AIRFLOW-140 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Environment: Airflow latest Git version >Reporter: dud >Priority: Minor > > Hello > I've noticed a strange behaviour : when launching a DAG whose task execution > duration is alternatingly slower and longer, DagRun state is only updated if > all previous DagRuns have ended. > Here is DAG that can trigger this behaviour : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 19, 10, 15), > 'end_date': datetime(2016, 5, 19, 10, 20), > } > dag = DAG('dagrun_not_updated', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='alt_sleep', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > When this operator is executed, being run at an even minute makes the TI runs > faster than an odd one. > I'm observing the following behaviour : > - after some time, the second DagRun is still i running state despites it has > ended for a while : > {code} > airflow=> SELECT * FROM task_instance WHERE dag_id = :dag_id ORDER BY > execution_date ; SELECT * FROM dag_run WHERE dag_id = :dag_id ; > task_id | dag_id | execution_date| start_date > | end_date | duration | state | try_number | > hostname | unixname | job_id | pool | queue | priority_weight | > operator| queued_dttm > --+---+-+++---+-++---+--++--+-+-++- > alt_sleep | dagrun_not_updated | 2016-05-19 10:15:00 | 2016-05-19 > 10:17:19.039565 || | running | > 1 | localhost | airflow | 3196 | | default | 1 | > PythonOperator | > alt_sleep | dagrun_not_updated | 2016-05-19 10:16:00 | 2016-05-19 > 10:17:23.698928 | 2016-05-19 10:17:33.823066 | 10.124138 | success | > 1 | localhost | airflow | 3197 | | default | 1 | > PythonOperator | > alt_sleep | dagrun_not_updated | 2016-05-19 10:17:00 | 2016-05-19 > 10:18:03.025546 || | running | > 1 | localhost | airflow | 3198 | | default | 1 | > PythonOperator | > (3 rows) > id | dag_id | execution_date| state | > run_id | external_trigger | conf | end_date | start_date > > --+---+-+-++--+--+--+ > 1479 | dagrun_not_updated | 2016-05-19 10:15:00 | running | > scheduled__2016-05-19T10:15:00 | f| | | > 2016-05-19 10:17:06.563842 > 1480 | dagrun_not_updated | 2016-05-19 10:16:00 | running | > scheduled__2016-05-19T10:16:00 | f| | | > 2016-05-19 10:17:12.188781 > 1481 | dagrun_not_updated | 2016-05-19 10:17:00 | running | > scheduled__2016-05-19T10:17:00 | f| | | > 2016-05-19 10:18:01.550625 > (3 rows) > {code} > - afer some time, all reportedly still running DagRuns are being marked as > successful at the same time : > {code} > 2016-05-19 10:23:11 UTC [12073-18] airflow@airflow LOG: duration: 0.168 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1479 > 2016-05-19 10:23:11 UTC [12073-19] airflow@airflow LOG: duration: 0.106 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1480 > 2016-05-19 10:23:11 UTC [12073-20] airflow@airflow LOG: duration: 0.083 ms > statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1481 > 2016-05-19 10:23:11 UTC [12073-21] airflow@airflow LOG: duration: 0.081 ms > statement: UPDATE dag_run SET state='success' WHERE
[2/2] incubator-airflow git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/949479cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/949479cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/949479cf Branch: refs/heads/master Commit: 949479cf7b5c298a31a54e7e2ebe71bf782b5d5d Parents: fb89276 06e70e2 Author: Bolke de BruinAuthored: Thu Jun 16 10:26:59 2016 +0200 Committer: Bolke de Bruin Committed: Thu Jun 16 10:26:59 2016 +0200 -- airflow/bin/cli.py | 11 +++-- airflow/configuration.py | 7 ++- airflow/contrib/hooks/__init__.py| 3 +- airflow/contrib/hooks/fs_hook.py | 41 + airflow/contrib/operators/__init__.py| 3 +- airflow/contrib/operators/fs_operator.py | 57 airflow/utils/db.py | 4 ++ airflow/www/views.py | 1 + tests/contrib/operators/__init__.py | 16 +++ tests/contrib/operators/fs_operator.py | 64 +++ tests/core.py| 50 ++--- 11 files changed, 225 insertions(+), 32 deletions(-) --
[1/2] incubator-airflow git commit: [AIRFLOW-224] Collect orphaned tasks and reschedule them
Repository: incubator-airflow Updated Branches: refs/heads/master 06e70e2d1 -> 949479cf7 [AIRFLOW-224] Collect orphaned tasks and reschedule them Tasks can get orphaned if the scheduler is killed in the middle of processing the tasks or if the MQ queue is cleared without a worker having picked these up. Now tasks do not get set to a scheduled state anymore if they have not been sent to the executor yet. Next to that a garbage collector scans the executor for tasks not being present and reschedules those if needed. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb892767 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb892767 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb892767 Branch: refs/heads/master Commit: fb89276750d23fc19a2d57036bc59d3aef846a26 Parents: 8aa7160 Author: Bolke de BruinAuthored: Sat Jun 11 13:53:32 2016 +0200 Committer: Bolke de Bruin Committed: Wed Jun 15 21:58:12 2016 +0200 -- airflow/jobs.py | 62 +++- airflow/models.py | 8 +++-- tests/executor/__init__.py | 13 tests/executor/test_executor.py | 33 +++ tests/jobs.py | 47 +++ 5 files changed, 146 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 3a2d97a..1e583ac 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -518,12 +518,13 @@ class SchedulerJob(BaseJob): if run.execution_date > datetime.now(): continue -# todo: run.task is transient but needs to be set +# todo: run.dag is transient but needs to be set run.dag = dag # todo: preferably the integrity check happens at dag collection time run.verify_integrity(session=session) run.update_state(session=session) if run.state == State.RUNNING: +make_transient(run) active_dag_runs.append(run) for run in active_dag_runs: @@ -546,20 +547,6 @@ class SchedulerJob(BaseJob): if ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) - -ti.refresh_from_db(session=session, lock_for_update=True) -# another scheduler could have picked this task -# todo: UP_FOR_RETRY still could create a race condition -if ti.state is State.SCHEDULED: -session.commit() -self.logger.debug("Task {} was picked up by another scheduler" - .format(ti)) -continue -elif ti.state is State.NONE: -ti.state = State.SCHEDULED -session.merge(ti) - -session.commit() queue.put((ti.key, pickle_id)) session.close() @@ -676,7 +663,28 @@ class SchedulerJob(BaseJob): except Exception as e: self.logger.exception(e) +@provide_session +def _reset_state_for_orphaned_tasks(self, dag_run, session=None): +""" +This function checks for a DagRun if there are any tasks +that have a scheduled state but are not known by the +executor. If it finds those it will reset the state to None +so they will get picked up again. +""" +queued_tis = self.executor.queued_tasks + +# also consider running as the state might not have changed in the db yet +running = self.executor.running +tis = dag_run.get_task_instances(state=State.SCHEDULED, session=session) +for ti in tis: +if ti.key not in queued_tis and ti.key not in running: +ti.state = State.NONE +self.logger.debug("Rescheduling orphaned task {}".format(ti)) + +session.commit() + def _execute(self): +session = settings.Session() TI = models.TaskInstance pessimistic_connection_handling() @@ -687,6 +695,16 @@ class SchedulerJob(BaseJob): dagbag = models.DagBag(self.subdir, sync_to_db=True) executor = self.executor = dagbag.executor executor.start() + +# grab orphaned tasks and make sure to reset their state +active_runs = DagRun.find( +state=State.RUNNING, +external_trigger=False, +session=session +) +for dr in
[jira] [Created] (AIRFLOW-248) Add Apache license header to all files
Ajay Yadava created AIRFLOW-248: --- Summary: Add Apache license header to all files Key: AIRFLOW-248 URL: https://issues.apache.org/jira/browse/AIRFLOW-248 Project: Apache Airflow Issue Type: Task Reporter: Ajay Yadava Assignee: Ajay Yadava As part of getting ready for an Apache Release we should add Apache License header to all the files( [mailing list discussion | http://mail-archives.apache.org/mod_mbox/incubator-airflow-dev/201606.mbox/%3C43C19AB6-9A7F-4351-862E-540D75490396%40gmail.com%3E] ). -- This message was sent by Atlassian JIRA (v6.3.4#6332)