[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI

2016-06-16 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-252:

Affects Version/s: 1.7.1.3

> Raise Sqlite exceptions when deleting tasks instance in WebUI
> -
>
> Key: AIRFLOW-252
> URL: https://issues.apache.org/jira/browse/AIRFLOW-252
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
>Affects Versions: 1.7.1.3
> Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 
> 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: ryanchou
>
> When I try to delete a task instances on browse -> tasks instances page 
> through WebUI. It raise a Sqlite exception
> The airflow version is v1.7.1.3 I haven't seen the version choice in jira 
> issue.
> ```
>  File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
> line 607, in process
> raise TypeError("SQLite DateTime type only accepts Python "
> StatementError: (exceptions.TypeError) SQLite DateTime type only accepts 
> Python datetime and date objects as input. [SQL: u'SELECT 
> task_instance.task_id AS task_instance_task_id, task_in
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI

2016-06-16 Thread ryanchou (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ryanchou updated AIRFLOW-252:
-
Description: 
When I try to delete a task instances on browse -> tasks instances page through 
WebUI. It raise a Sqlite exception

The airflow version is v1.7.1.3 I haven't seen the version choice in jira issue.

```
 File 
"/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
line 607, in process
raise TypeError("SQLite DateTime type only accepts Python "
StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python 
datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS 
task_instance_task_id, task_in
```

  was:
When I try to delete a task instances on browse -> tasks instances page through 
WebUI. It raise a Sqlite exception

```
 File 
"/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
line 607, in process
raise TypeError("SQLite DateTime type only accepts Python "
StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python 
datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS 
task_instance_task_id, task_in
```


> Raise Sqlite exceptions when deleting tasks instance in WebUI
> -
>
> Key: AIRFLOW-252
> URL: https://issues.apache.org/jira/browse/AIRFLOW-252
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
> Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 
> 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: ryanchou
>
> When I try to delete a task instances on browse -> tasks instances page 
> through WebUI. It raise a Sqlite exception
> The airflow version is v1.7.1.3 I haven't seen the version choice in jira 
> issue.
> ```
>  File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
> line 607, in process
> raise TypeError("SQLite DateTime type only accepts Python "
> StatementError: (exceptions.TypeError) SQLite DateTime type only accepts 
> Python datetime and date objects as input. [SQL: u'SELECT 
> task_instance.task_id AS task_instance_task_id, task_in
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI

2016-06-16 Thread ryanchou (JIRA)
ryanchou created AIRFLOW-252:


 Summary: Raise Sqlite exceptions when deleting tasks instance in 
WebUI
 Key: AIRFLOW-252
 URL: https://issues.apache.org/jira/browse/AIRFLOW-252
 Project: Apache Airflow
  Issue Type: Bug
  Components: ui
Affects Versions: Airflow 1.7.1.2
 Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 
20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
Reporter: ryanchou


When I try to delete a task instances on browse -> tasks instances page through 
WebUI. It raise a Sqlite exception

```
 File 
"/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
line 607, in process
raise TypeError("SQLite DateTime type only accepts Python "
StatementError: (exceptions.TypeError) SQLite DateTime type only accepts Python 
datetime and date objects as input. [SQL: u'SELECT task_instance.task_id AS 
task_instance_task_id, task_in
```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-252) Raise Sqlite exceptions when deleting tasks instance in WebUI

2016-06-16 Thread ryanchou (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ryanchou updated AIRFLOW-252:
-
Affects Version/s: (was: Airflow 1.7.1.2)

> Raise Sqlite exceptions when deleting tasks instance in WebUI
> -
>
> Key: AIRFLOW-252
> URL: https://issues.apache.org/jira/browse/AIRFLOW-252
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ui
> Environment: Linux 0f857c51a5ce 4.0.9-boot2docker #1 SMP Thu Sep 10 
> 20:39:20 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: ryanchou
>
> When I try to delete a task instances on browse -> tasks instances page 
> through WebUI. It raise a Sqlite exception
> ```
>  File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/dialects/sqlite/base.py", 
> line 607, in process
> raise TypeError("SQLite DateTime type only accepts Python "
> StatementError: (exceptions.TypeError) SQLite DateTime type only accepts 
> Python datetime and date objects as input. [SQL: u'SELECT 
> task_instance.task_id AS task_instance_task_id, task_in
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


incubator-airflow git commit: Add Postmates to Airflow users list

2016-06-16 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ea2904610 -> ce362c312


Add Postmates to Airflow users list

Closes #1599 from Syeoryn/master

Add Postmates to Airflow users list


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ce362c31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ce362c31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ce362c31

Branch: refs/heads/master
Commit: ce362c312ccb1bace7215d156909f42d7e51898a
Parents: ea29046
Author: Drew Cuthbertson 
Authored: Thu Jun 16 19:10:04 2016 -0700
Committer: Dan Davydov 
Committed: Thu Jun 16 19:10:04 2016 -0700

--
 README.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ce362c31/README.md
--
diff --git a/README.md b/README.md
index 43c8818..f34ec7c 100644
--- a/README.md
+++ b/README.md
@@ -104,6 +104,7 @@ Currently **officially** using Airflow:
 * [Lucid](http://luc.id) [[@jbrownlucid](https://github.com/jbrownlucid) & 
[@kkourtchikov](https://github.com/kkourtchikov)]
 * 
[Lyft](https://www.lyft.com/)[[@SaurabhBajaj](https://github.com/SaurabhBajaj)]
 * [Nerdwallet](https://www.nerdwallet.com)
+* [Postmates](http://www.postmates.com) 
[[@syeoryn](https://github.com/syeoryn)]
 * [Qubole](https://qubole.com) [[@msumit](https://github.com/msumit)]
 * [Sense360](https://github.com/Sense360) 
[[@kamilmroczek](https://github.com/KamilMroczek)]
 * [Sidecar](https://hello.getsidecar.com/) 
[[@getsidecar](https://github.com/getsidecar)]



[jira] [Commented] (AIRFLOW-251) Add optional parameter SQL_ALCHEMY_SCHEMA to control schema for metadata repository

2016-06-16 Thread Ed Parcell (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-251?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335105#comment-15335105
 ] 

Ed Parcell commented on AIRFLOW-251:


I created a pull request for this issue: 
https://github.com/apache/incubator-airflow/pull/1600

> Add optional parameter SQL_ALCHEMY_SCHEMA to control schema for metadata 
> repository
> ---
>
> Key: AIRFLOW-251
> URL: https://issues.apache.org/jira/browse/AIRFLOW-251
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core
>Affects Versions: Airflow 2.0
>Reporter: Ed Parcell
>Priority: Minor
>
> Using SQL Server as a database for metadata, it is preferable to group all 
> Airflow tables into a separate schema, rather than using dbo. I propose 
> adding an optional parameter SQL_ALCHEMY_SCHEMA to control this.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-249) Refactor the SLA mecanism

2016-06-16 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334642#comment-15334642
 ] 

Chris Riccomini commented on AIRFLOW-249:
-

This is really awesome analysis. Can you send a PR? (unless I missed it)

> Refactor the SLA mecanism
> -
>
> Key: AIRFLOW-249
> URL: https://issues.apache.org/jira/browse/AIRFLOW-249
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: dud
>
> Hello
> I've noticed the SLA feature is currently behaving as follow :
> - it doesn't work on DAG scheduled @once or None because they have no 
> dag.followwing_schedule property
> - it keeps endlessly checking for SLA misses without ever worrying about any 
> end_date. Worse I noticed that emails are still being sent for runs that are 
> never happening because of end_date
> - it keeps checking for recent TIs even if SLA notification has been already 
> been sent for them
> - the SLA logic is only being fired after following_schedule + sla has 
> elapsed, in other words one has to wait for the next TI before having a 
> chance of getting any email. Also the email reports dag.following_schedule 
> time (I guess because it is close of TI.start_date), but unfortunately that 
> doesn't match what the task instances shows nor the log filename
> - the SLA logic is based on max(TI.execution_date) for the starting point of 
> its checks, that means that for a DAG whose SLA is longer than its schedule 
> period if half of the TIs are running longer than expected it will go 
> unnoticed. This could be demonstrated with a DAG like this one :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 6, 16, 12, 20),
> 'email': my_email
> 'sla': timedelta(minutes=2),
> }
> dag = DAG('unnoticed_sla', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
> minute = kwargs['execution_date'].strftime("%M")
> is_odd = int(minute) % 2
> if is_odd:
> sleep(300)
> else:
> sleep(10)
> return True
> PythonOperator(
> task_id='sla_miss',
> python_callable=alternating_sleep,
> provide_context=True,
> dag=dag)
> {code}
> I've tried to rework the SLA triggering mechanism by addressing the above 
> points., please [have a look on 
> it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
> I made some tests with this patch :
> - the fluctuent DAG shown above no longer make Airflow skip any SLA event :
> {code}
>  task_id  |dag_id |   execution_date| email_sent | 
> timestamp  | description | notification_sent 
> --+---+-+++-+---
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t  | 2016-06-16 
> 15:08:26.058631 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t  | 2016-06-16 
> 15:10:06.093253 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t  | 2016-06-16 
> 15:12:06.241773 | | t
> {code}
> - on a normal DAG, the SLA is being triggred more quickly :
> {code}
> // start_date = 2016-06-16 15:55:00
> // end_date = 2016-06-16 16:00:00
> // schedule_interval =  timedelta(minutes=1)
> // sla = timedelta(minutes=2)
>  task_id  |dag_id |   execution_date| email_sent | 
> timestamp  | description | notification_sent 
> --+---+-+++-+---
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t  | 2016-06-16 
> 15:58:11.832299 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t  | 2016-06-16 
> 15:59:09.663778 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t  | 2016-06-16 
> 16:00:13.651422 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t  | 2016-06-16 
> 16:01:08.576399 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t  | 2016-06-16 
> 16:02:08.523486 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t  | 2016-06-16 
> 16:03:08.538593 | | t
> (6 rows)
> {code}
> than before (current master branch) :
> {code}
> // start_date = 2016-06-16 15:40:00
> // end_date = 2016-06-16 15:45:00
> // schedule_interval =  timedelta(minutes=1)
> // sla = timedelta(minutes=2)
>  task_id  |dag_id |   execution_date| email_sent | 
> timestamp  | description | notification_sent 
> 

[jira] [Closed] (AIRFLOW-224) Collect orphaned tasks in case of unclean shutdown

2016-06-16 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-224.
---
Resolution: Fixed
  Assignee: Bolke de Bruin

> Collect orphaned tasks in case of unclean shutdown
> --
>
> Key: AIRFLOW-224
> URL: https://issues.apache.org/jira/browse/AIRFLOW-224
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>Reporter: Bolke de Bruin
>Assignee: Bolke de Bruin
> Fix For: Airflow 1.8
>
>
> Tasks in a "scheduled" state can get orphaned if either the executor or the 
> scheduler is shutdown uncleanly (kill -9). 
> This should be addressed by making sure the scheduler confirms sending tasks 
> to the executor and checking weather "scheduled" tasks are still in the queue 
> of the executor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (AIRFLOW-173) Create a FileSensor / NFSFileSensor sensor

2016-06-16 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-173.
---
   Resolution: Fixed
 Assignee: Andre
Fix Version/s: Airflow 1.8

> Create a FileSensor / NFSFileSensor sensor
> --
>
> Key: AIRFLOW-173
> URL: https://issues.apache.org/jira/browse/AIRFLOW-173
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Andre
>Assignee: Andre
>Priority: Minor
> Fix For: Airflow 1.8
>
>
> While HDFS and WebHDFS suit most organisations using Hadoop, for some shops 
> running MapR-FS, Airflow implementation is simplified by the use of plain 
> files pointing to MapR's NFS gateways.
> A FileSensor and/or a NFSFileSensor would assist the adoption of Airflow 
> within the MapR customer base, but more importantly, help those who are using 
> POSIX compliant distributed filesystems that can be mounted on Unix 
> derivative systems (e.g. as MapR-FS (via NFS), CephFS, GlusterFS, etc).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-247) EMR Hook, Operators, Sensor

2016-06-16 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334586#comment-15334586
 ] 

Chris Riccomini commented on AIRFLOW-247:
-

[~rfroetscher], totally fine. Go for it. Might want to ping [~artwr], as well, 
though.

> EMR Hook, Operators, Sensor
> ---
>
> Key: AIRFLOW-247
> URL: https://issues.apache.org/jira/browse/AIRFLOW-247
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Rob Froetscher
>Assignee: Rob Froetscher
>Priority: Minor
>
> Substory of https://issues.apache.org/jira/browse/AIRFLOW-115. It would be 
> nice to have an EMR hook and operators.
> Hook to generally interact with EMR.
> Operators to:
> * setup and start a job flow
> * add steps to an existing jobflow 
> A sensor to:
> * monitor completion and status of EMR jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (AIRFLOW-239) test_hdfs_sensor, etc not collected by nosetests

2016-06-16 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-239.
---
   Resolution: Fixed
Fix Version/s: Airflow 1.8

> test_hdfs_sensor, etc not collected by nosetests
> 
>
> Key: AIRFLOW-239
> URL: https://issues.apache.org/jira/browse/AIRFLOW-239
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Li Xuanji
>Assignee: Li Xuanji
> Fix For: Airflow 1.8
>
>
> Due to the indentation at 
> https://github.com/apache/incubator-airflow/blob/5963ac8b7b7c0b3470ce6171b1cd928485bc9884/tests/core.py#L1605-L1623
> The methods test_hdfs_sensor and below are treated as internal functions of 
> test_presto_to_mysql and not collected by nosetests
> ```
> ➜  zodiac-airflow git:(master) export AIRFLOW_RUNALL_TESTS=1
> ➜  zodiac-airflow git:(master) nosetests -v --collect-only 2>&1 | grep 
> test_presto_to_mysql | wc -l
>1
> ➜  zodiac-airflow git:(master) nosetests -v --collect-only 2>&1 | grep 
> test_hdfs_sensor | wc -l
>0
> ➜  zodiac-airflow git:(master)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-225) Better units for task duration graph

2016-06-16 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334546#comment-15334546
 ] 

ASF subversion and git services commented on AIRFLOW-225:
-

Commit ea2904610651c53853d89e8296ab4c71e0926588 in incubator-airflow's branch 
refs/heads/master from [~criccomini]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ea29046 ]

Merge pull request #1595 from sekikn/AIRFLOW-225


> Better units for task duration graph
> 
>
> Key: AIRFLOW-225
> URL: https://issues.apache.org/jira/browse/AIRFLOW-225
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Reporter: Jakob Homan
>Assignee: Kengo Seki
> Attachments: screenshot-1.png
>
>
> Right now the job duration window defaults to hours, which for short lived 
> tasks results in numbers out to five decimals.  Instead, it should adjust the 
> scale to hours, minutes, seconds, as appropriate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (AIRFLOW-225) Better units for task duration graph

2016-06-16 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-225.
---
   Resolution: Fixed
Fix Version/s: Airflow 1.8

+1 Merged. Thanks!

> Better units for task duration graph
> 
>
> Key: AIRFLOW-225
> URL: https://issues.apache.org/jira/browse/AIRFLOW-225
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Reporter: Jakob Homan
>Assignee: Kengo Seki
> Fix For: Airflow 1.8
>
> Attachments: screenshot-1.png
>
>
> Right now the job duration window defaults to hours, which for short lived 
> tasks results in numbers out to five decimals.  Instead, it should adjust the 
> scale to hours, minutes, seconds, as appropriate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-225) Better units for task duration graph

2016-06-16 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334545#comment-15334545
 ] 

ASF subversion and git services commented on AIRFLOW-225:
-

Commit 1141165271b0e1546ca231bf8aa412793dc38201 in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=1141165 ]

[AIRFLOW-225] Better units for task duration graph

Right now the job duration window defaults to hours, which for short lived tasks
results in numbers out to five decimals. This patch adjusts the scale of the 
Y-axis
in accordance with the maximum value of the durations to be shown.


> Better units for task duration graph
> 
>
> Key: AIRFLOW-225
> URL: https://issues.apache.org/jira/browse/AIRFLOW-225
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Reporter: Jakob Homan
>Assignee: Kengo Seki
> Attachments: screenshot-1.png
>
>
> Right now the job duration window defaults to hours, which for short lived 
> tasks results in numbers out to five decimals.  Instead, it should adjust the 
> scale to hours, minutes, seconds, as appropriate.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[1/2] incubator-airflow git commit: [AIRFLOW-225] Better units for task duration graph

2016-06-16 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 949479cf7 -> ea2904610


[AIRFLOW-225] Better units for task duration graph

Right now the job duration window defaults to hours, which for short lived tasks
results in numbers out to five decimals. This patch adjusts the scale of the 
Y-axis
in accordance with the maximum value of the durations to be shown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/11411652
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/11411652
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/11411652

Branch: refs/heads/master
Commit: 1141165271b0e1546ca231bf8aa412793dc38201
Parents: 8aa7160
Author: Kengo Seki 
Authored: Wed Jun 15 04:19:02 2016 +
Committer: Kengo Seki 
Committed: Wed Jun 15 04:19:02 2016 +

--
 airflow/www/views.py | 21 +++--
 1 file changed, 19 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/11411652/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b468bc1..fc6819b 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1477,6 +1477,7 @@ class Airflow(BaseView):
 include_downstream=False)
 
 all_data = []
+max_duration = 0
 for task in dag.tasks:
 data = []
 for ti in task.get_task_instances(session, start_date=min_date,
@@ -1484,11 +1485,27 @@ class Airflow(BaseView):
 if ti.duration:
 data.append([
 ti.execution_date.isoformat(),
-float(ti.duration) / (60*60)
+ti.duration
 ])
+if max_duration < ti.duration:
+max_duration = ti.duration
 if data:
 all_data.append({'data': data, 'name': task.task_id})
 
+def divide_durations(all_data, denom):
+for data in all_data:
+for d in data['data']:
+d[1] /= denom
+
+if 60*60 < max_duration:
+unit = 'hours'
+divide_durations(all_data, float(60*60))
+elif 60 < max_duration:
+unit = 'minutes'
+divide_durations(all_data, 60.0)
+else:
+unit = 'seconds'
+
 tis = dag.get_task_instances(
 session, start_date=min_date, end_date=base_date)
 dates = sorted(list({ti.execution_date for ti in tis}))
@@ -1503,7 +1520,7 @@ class Airflow(BaseView):
 'airflow/chart.html',
 dag=dag,
 data=json.dumps(all_data),
-chart_options={'yAxis': {'title': {'text': 'hours'}}},
+chart_options={'yAxis': {'title': {'text': unit}}},
 height="700px",
 demo_mode=conf.getboolean('webserver', 'demo_mode'),
 root=root,



[jira] [Commented] (AIRFLOW-244) Expose task/dag id/run data for ingestion by performance monitoring tools

2016-06-16 Thread Chris Riccomini (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334530#comment-15334530
 ] 

Chris Riccomini commented on AIRFLOW-244:
-

[~pbramsen], could you add more description to this? Where do you want these 
items exposed?

> Expose task/dag id/run data for ingestion by performance monitoring tools
> -
>
> Key: AIRFLOW-244
> URL: https://issues.apache.org/jira/browse/AIRFLOW-244
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Paul Bramsen
>
> Expose task/dag id/run data for ingestion by performance analysis tools like 
> Dr. Elephant.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-250) Add task-specific template searchpaths

2016-06-16 Thread Peter Attardo (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15334426#comment-15334426
 ] 

Peter Attardo commented on AIRFLOW-250:
---

I can put together a PR for this if it's seen as a worthwhile feature to add.

> Add task-specific template searchpaths
> --
>
> Key: AIRFLOW-250
> URL: https://issues.apache.org/jira/browse/AIRFLOW-250
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Peter Attardo
>
> Currently template searchpaths are pulled from the DAG, but there are many 
> instances where we would want a specific task to look in its own folder for 
> its template(s) to avoid name collisions, or to define the template location 
> in the task definition and not have to worry about what DAG it is added to. 
> Adding an optional template_searchpath parameter to BaseOperator that 
> operates similar to, and has precedence over, the DAG's template_searchpath 
> would allow this functionality without impacting existing templating logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-250) Add task-specific template searchpaths

2016-06-16 Thread Peter Attardo (JIRA)
Peter Attardo created AIRFLOW-250:
-

 Summary: Add task-specific template searchpaths
 Key: AIRFLOW-250
 URL: https://issues.apache.org/jira/browse/AIRFLOW-250
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Peter Attardo


Currently template searchpaths are pulled from the DAG, but there are many 
instances where we would want a specific task to look in its own folder for its 
template(s) to avoid name collisions, or to define the template location in the 
task definition and not have to worry about what DAG it is added to. Adding an 
optional template_searchpath parameter to BaseOperator that operates similar 
to, and has precedence over, the DAG's template_searchpath would allow this 
functionality without impacting existing templating logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-250) Add task-specific template searchpaths

2016-06-16 Thread Peter Attardo (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Attardo updated AIRFLOW-250:
--
Issue Type: Improvement  (was: Bug)

> Add task-specific template searchpaths
> --
>
> Key: AIRFLOW-250
> URL: https://issues.apache.org/jira/browse/AIRFLOW-250
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Peter Attardo
>
> Currently template searchpaths are pulled from the DAG, but there are many 
> instances where we would want a specific task to look in its own folder for 
> its template(s) to avoid name collisions, or to define the template location 
> in the task definition and not have to worry about what DAG it is added to. 
> Adding an optional template_searchpath parameter to BaseOperator that 
> operates similar to, and has precedence over, the DAG's template_searchpath 
> would allow this functionality without impacting existing templating logic.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-249) Refactor the SLA mecanism

2016-06-16 Thread dud (JIRA)
dud created AIRFLOW-249:
---

 Summary: Refactor the SLA mecanism
 Key: AIRFLOW-249
 URL: https://issues.apache.org/jira/browse/AIRFLOW-249
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: dud


Hello

I've noticed the SLA feature is currently behaving as follow :
- it doesn't work on DAG scheduled @once or None because they have no 
dag.followwing_schedule property
- it keeps endlessly checking for SLA misses without ever worrying about any 
end_date. Worse I noticed that emails are still being sent for runs that are 
never happening because of end_date
- it keeps checking for recent TIs even if SLA notification has been already 
been sent for them
- the SLA logic is only being fired after following_schedule + sla has elapsed, 
in other words one has to wait for the next TI before having a chance of 
getting any email. Also the email reports dag.following_schedule time (I guess 
because it is close of TI.start_date), but unfortunately that doesn't match 
what the task instances shows nor the log filename
- the SLA logic is based on max(TI.execution_date) for the starting point of 
its checks, that means that for a DAG whose SLA is longer than its schedule 
period if half of the TIs are running longer than expected it will go 
unnoticed. This could be demonstrated with a DAG like this one :
{code}
from airflow import DAG
from airflow.operators import *
from datetime import datetime, timedelta
from time import sleep

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 6, 16, 12, 20),
'email': my_email
'sla': timedelta(minutes=2),
}

dag = DAG('unnoticed_sla', default_args=default_args, 
schedule_interval=timedelta(minutes=1))

def alternating_sleep(**kwargs):
minute = kwargs['execution_date'].strftime("%M")
is_odd = int(minute) % 2
if is_odd:
sleep(300)
else:
sleep(10)
return True

PythonOperator(
task_id='sla_miss',
python_callable=alternating_sleep,
provide_context=True,
dag=dag)
{code}


I've tried to rework the SLA triggering mechanism by addressing the above 
points., please [have a look on 
it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]

I made some tests with this patch :
- the fluctuent DAG shown above no longer make Airflow skip any SLA event :
{code}
 task_id  |dag_id |   execution_date| email_sent | 
timestamp  | description | notification_sent 
--+---+-+++-+---
 sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t  | 2016-06-16 
15:08:26.058631 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t  | 2016-06-16 
15:10:06.093253 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t  | 2016-06-16 
15:12:06.241773 | | t
{code}
- on a normal DAG, the SLA is being triggred more quickly :
{code}
// start_date = 2016-06-16 15:55:00
// end_date = 2016-06-16 16:00:00
// schedule_interval =  timedelta(minutes=1)
// sla = timedelta(minutes=2)

 task_id  |dag_id |   execution_date| email_sent | 
timestamp  | description | notification_sent 
--+---+-+++-+---
 sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t  | 2016-06-16 
15:58:11.832299 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t  | 2016-06-16 
15:59:09.663778 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t  | 2016-06-16 
16:00:13.651422 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t  | 2016-06-16 
16:01:08.576399 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t  | 2016-06-16 
16:02:08.523486 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t  | 2016-06-16 
16:03:08.538593 | | t
(6 rows)

{code}
than before (current master branch) :
{code}
// start_date = 2016-06-16 15:40:00
// end_date = 2016-06-16 15:45:00
// schedule_interval =  timedelta(minutes=1)
// sla = timedelta(minutes=2)

 task_id  |dag_id |   execution_date| email_sent | 
timestamp  | description | notification_sent 
--+---+-+++-+---
 sla_miss | dag_sla_miss1 | 2016-06-16 15:41:00 | t  | 2016-06-16 
15:44:30.305287 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:42:00 | t  | 2016-06-16 
15:45:35.372118 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 15:43:00 | t  | 2016-06-16 
15:46:30.415744 | | t
 sla_miss | dag_sla_miss1 | 2016-06-16 

[jira] [Resolved] (AIRFLOW-140) DagRun state not updated

2016-06-16 Thread dud (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dud resolved AIRFLOW-140.
-
Resolution: Fixed

This issue has been solved by commit 
[b18c9959142f3f1e2cb031c8709225af01192e32|https://github.com/apache/incubator-airflow/commit/b18c9959142f3f1e2cb031c8709225af01192e32].

[~bolke] many thanks for your work :)


> DagRun state not updated
> 
>
> Key: AIRFLOW-140
> URL: https://issues.apache.org/jira/browse/AIRFLOW-140
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
> Environment: Airflow latest Git version
>Reporter: dud
>Priority: Minor
>
> Hello
> I've noticed a strange behaviour : when launching a DAG whose task execution 
> duration is alternatingly slower and longer, DagRun state is only updated if 
> all previous DagRuns have ended.
> Here is DAG that can trigger this behaviour :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 5, 19, 10, 15),
> 'end_date': datetime(2016, 5, 19, 10, 20),
> }
> dag = DAG('dagrun_not_updated', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
> minute = kwargs['execution_date'].strftime("%M")
> is_odd = int(minute) % 2
> if is_odd:
> sleep(300)
> else:
> sleep(10)
> return True
> PythonOperator(
> task_id='alt_sleep',
> python_callable=alternating_sleep,
> provide_context=True,
> dag=dag)
> {code}
> When this operator is executed, being run at an even minute makes the TI runs 
> faster than an odd one.
> I'm observing the following behaviour :
> - after some time, the second DagRun is still i running state despites it has 
> ended for a while :
> {code}
> airflow=> SELECT * FROM task_instance WHERE dag_id = :dag_id ORDER BY 
> execution_date ;  SELECT * FROM dag_run WHERE dag_id = :dag_id ;
>   task_id  |   dag_id   |   execution_date| start_date
>  |  end_date  | duration  |  state  | try_number | 
> hostname  | unixname | job_id | pool |  queue  | priority_weight |
> operator| queued_dttm
> --+---+-+++---+-++---+--++--+-+-++-
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:15:00 | 2016-05-19 
> 10:17:19.039565 ||   | running |  
> 1 | localhost | airflow  |   3196 |  | default |   1 | 
> PythonOperator |
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:16:00 | 2016-05-19 
> 10:17:23.698928 | 2016-05-19 10:17:33.823066 | 10.124138 | success |  
> 1 | localhost | airflow  |   3197 |  | default |   1 | 
> PythonOperator |
>  alt_sleep | dagrun_not_updated | 2016-05-19 10:17:00 | 2016-05-19 
> 10:18:03.025546 ||   | running |  
> 1 | localhost | airflow  |   3198 |  | default |   1 | 
> PythonOperator |
> (3 rows)
>   id  |   dag_id   |   execution_date|  state  | 
> run_id | external_trigger | conf | end_date | start_date  
>   
> --+---+-+-++--+--+--+
>  1479 | dagrun_not_updated | 2016-05-19 10:15:00 | running | 
> scheduled__2016-05-19T10:15:00 | f|  |  | 
> 2016-05-19 10:17:06.563842
>  1480 | dagrun_not_updated | 2016-05-19 10:16:00 | running | 
> scheduled__2016-05-19T10:16:00 | f|  |  | 
> 2016-05-19 10:17:12.188781
>  1481 | dagrun_not_updated | 2016-05-19 10:17:00 | running | 
> scheduled__2016-05-19T10:17:00 | f|  |  | 
> 2016-05-19 10:18:01.550625
> (3 rows)
> {code}
> - afer some time, all reportedly still running DagRuns are being marked as 
> successful at the same time :
> {code}
> 2016-05-19 10:23:11 UTC [12073-18] airflow@airflow LOG:  duration: 0.168 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1479
> 2016-05-19 10:23:11 UTC [12073-19] airflow@airflow LOG:  duration: 0.106 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1480
> 2016-05-19 10:23:11 UTC [12073-20] airflow@airflow LOG:  duration: 0.083 ms  
> statement: UPDATE dag_run SET state='success' WHERE dag_run.id = 1481
> 2016-05-19 10:23:11 UTC [12073-21] airflow@airflow LOG:  duration: 0.081 ms  
> statement: UPDATE dag_run SET state='success' WHERE 

[2/2] incubator-airflow git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-airflow

2016-06-16 Thread bolke
Merge branch 'master' of 
https://git-wip-us.apache.org/repos/asf/incubator-airflow


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/949479cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/949479cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/949479cf

Branch: refs/heads/master
Commit: 949479cf7b5c298a31a54e7e2ebe71bf782b5d5d
Parents: fb89276 06e70e2
Author: Bolke de Bruin 
Authored: Thu Jun 16 10:26:59 2016 +0200
Committer: Bolke de Bruin 
Committed: Thu Jun 16 10:26:59 2016 +0200

--
 airflow/bin/cli.py   | 11 +++--
 airflow/configuration.py |  7 ++-
 airflow/contrib/hooks/__init__.py|  3 +-
 airflow/contrib/hooks/fs_hook.py | 41 +
 airflow/contrib/operators/__init__.py|  3 +-
 airflow/contrib/operators/fs_operator.py | 57 
 airflow/utils/db.py  |  4 ++
 airflow/www/views.py |  1 +
 tests/contrib/operators/__init__.py  | 16 +++
 tests/contrib/operators/fs_operator.py   | 64 +++
 tests/core.py| 50 ++---
 11 files changed, 225 insertions(+), 32 deletions(-)
--




[1/2] incubator-airflow git commit: [AIRFLOW-224] Collect orphaned tasks and reschedule them

2016-06-16 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 06e70e2d1 -> 949479cf7


[AIRFLOW-224] Collect orphaned tasks and reschedule them

Tasks can get orphaned if the scheduler is killed in the middle
of processing the tasks or if the MQ queue is cleared without
a worker having picked these up. Now tasks do not get set
to a scheduled state anymore if they have not been sent to the
executor yet. Next to that a garbage collector scans the executor
for tasks not being present and reschedules those if needed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fb892767
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fb892767
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fb892767

Branch: refs/heads/master
Commit: fb89276750d23fc19a2d57036bc59d3aef846a26
Parents: 8aa7160
Author: Bolke de Bruin 
Authored: Sat Jun 11 13:53:32 2016 +0200
Committer: Bolke de Bruin 
Committed: Wed Jun 15 21:58:12 2016 +0200

--
 airflow/jobs.py | 62 +++-
 airflow/models.py   |  8 +++--
 tests/executor/__init__.py  | 13 
 tests/executor/test_executor.py | 33 +++
 tests/jobs.py   | 47 +++
 5 files changed, 146 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fb892767/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 3a2d97a..1e583ac 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -518,12 +518,13 @@ class SchedulerJob(BaseJob):
 if run.execution_date > datetime.now():
 continue
 
-# todo: run.task is transient but needs to be set
+# todo: run.dag is transient but needs to be set
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
 run.verify_integrity(session=session)
 run.update_state(session=session)
 if run.state == State.RUNNING:
+make_transient(run)
 active_dag_runs.append(run)
 
 for run in active_dag_runs:
@@ -546,20 +547,6 @@ class SchedulerJob(BaseJob):
 
 if ti.is_runnable(flag_upstream_failed=True):
 self.logger.debug('Queuing task: {}'.format(ti))
-
-ti.refresh_from_db(session=session, lock_for_update=True)
-# another scheduler could have picked this task
-# todo: UP_FOR_RETRY still could create a race condition
-if ti.state is State.SCHEDULED:
-session.commit()
-self.logger.debug("Task {} was picked up by another 
scheduler"
-  .format(ti))
-continue
-elif ti.state is State.NONE:
-ti.state = State.SCHEDULED
-session.merge(ti)
-
-session.commit()
 queue.put((ti.key, pickle_id))
 
 session.close()
@@ -676,7 +663,28 @@ class SchedulerJob(BaseJob):
 except Exception as e:
 self.logger.exception(e)
 
+@provide_session
+def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
+"""
+This function checks for a DagRun if there are any tasks
+that have a scheduled state but are not known by the
+executor. If it finds those it will reset the state to None
+so they will get picked up again.
+"""
+queued_tis = self.executor.queued_tasks
+
+# also consider running as the state might not have changed in the db 
yet
+running = self.executor.running
+tis = dag_run.get_task_instances(state=State.SCHEDULED, 
session=session)
+for ti in tis:
+if ti.key not in queued_tis and ti.key not in running:
+ti.state = State.NONE
+self.logger.debug("Rescheduling orphaned task {}".format(ti))
+
+session.commit()
+
 def _execute(self):
+session = settings.Session()
 TI = models.TaskInstance
 
 pessimistic_connection_handling()
@@ -687,6 +695,16 @@ class SchedulerJob(BaseJob):
 dagbag = models.DagBag(self.subdir, sync_to_db=True)
 executor = self.executor = dagbag.executor
 executor.start()
+
+# grab orphaned tasks and make sure to reset their state
+active_runs = DagRun.find(
+state=State.RUNNING,
+external_trigger=False,
+session=session
+)
+for dr in 

[jira] [Created] (AIRFLOW-248) Add Apache license header to all files

2016-06-16 Thread Ajay Yadava (JIRA)
Ajay Yadava created AIRFLOW-248:
---

 Summary: Add Apache license header to all files
 Key: AIRFLOW-248
 URL: https://issues.apache.org/jira/browse/AIRFLOW-248
 Project: Apache Airflow
  Issue Type: Task
Reporter: Ajay Yadava
Assignee: Ajay Yadava


As part of getting ready for an Apache Release we should add Apache License 
header to all the files( [mailing list discussion | 
http://mail-archives.apache.org/mod_mbox/incubator-airflow-dev/201606.mbox/%3C43C19AB6-9A7F-4351-862E-540D75490396%40gmail.com%3E]
 ). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)