[jira] [Created] (AIRFLOW-257) airflow command fails with "ImportError: No module named zope.deprecation"

2016-06-17 Thread Kengo Seki (JIRA)
Kengo Seki created AIRFLOW-257:
--

 Summary: airflow command fails with "ImportError: No module named 
zope.deprecation"
 Key: AIRFLOW-257
 URL: https://issues.apache.org/jira/browse/AIRFLOW-257
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Kengo Seki
Priority: Critical


After AIRFLOW-31 has been merged, airflow command fails as follows:

{code}
$ airflow webserver
[2016-06-18 00:56:50,367] {__init__.py:36} INFO - Using executor 
SequentialExecutor
[2016-06-18 00:56:50,492] {driver.py:120} INFO - Generating grammar tables from 
/usr/lib/python2.7/lib2to3/Grammar.txt
[2016-06-18 00:56:50,529] {driver.py:120} INFO - Generating grammar tables from 
/usr/lib/python2.7/lib2to3/PatternGrammar.txt
Traceback (most recent call last):
  File "/home/sekikn/.virtualenvs/e/bin/airflow", line 6, in 
exec(compile(open(__file__).read(), __file__, 'exec'))
  File "/home/sekikn/dev/incubator-airflow/airflow/bin/airflow", line 4, in 

from airflow import configuration
  File "/home/sekikn/dev/incubator-airflow/airflow/__init__.py", line 76, in 

from airflow import operators
  File "/home/sekikn/dev/incubator-airflow/airflow/operators/__init__.py", line 
24, in 
from .check_operator import (
  File 
"/home/sekikn/dev/incubator-airflow/airflow/operators/check_operator.py", line 
20, in 
from airflow.hooks import BaseHook
  File "/home/sekikn/dev/incubator-airflow/airflow/hooks/__init__.py", line 66, 
in 
from zope.deprecation import deprecated as _deprecated
ImportError: No module named zope.deprecation
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (AIRFLOW-234) make task that aren't `running` self-terminate

2016-06-17 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336999#comment-15336999
 ] 

ASF subversion and git services commented on AIRFLOW-234:
-

Commit 7c0f8373f59b0554d5ba15bb0e5e8669f0830313 in incubator-airflow's branch 
refs/heads/master from [~maxime.beauche...@apache.org]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7c0f837 ]

[AIRFLOW-234] make task that aren't `running` self-terminate

Closes #1585 from mistercrunch/undeads


> make task that aren't `running` self-terminate
> --
>
> Key: AIRFLOW-234
> URL: https://issues.apache.org/jira/browse/AIRFLOW-234
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Maxime Beauchemin
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


incubator-airflow git commit: [AIRFLOW-234] make task that aren't `running` self-terminate

2016-06-17 Thread maximebeauchemin
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d243c003b -> 7c0f8373f


[AIRFLOW-234] make task that aren't `running` self-terminate

Closes #1585 from mistercrunch/undeads


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c0f8373
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c0f8373
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c0f8373

Branch: refs/heads/master
Commit: 7c0f8373f59b0554d5ba15bb0e5e8669f0830313
Parents: d243c00
Author: Maxime Beauchemin 
Authored: Fri Jun 17 14:30:55 2016 -0700
Committer: Maxime Beauchemin 
Committed: Fri Jun 17 14:30:55 2016 -0700

--
 airflow/example_dags/docker_copy_data.py| 13 ++
 airflow/example_dags/example_bash_operator.py   | 13 ++
 airflow/example_dags/example_branch_operator.py | 13 ++
 airflow/example_dags/example_docker_operator.py | 13 ++
 airflow/example_dags/example_http_operator.py   | 13 ++
 airflow/example_dags/example_python_operator.py | 13 ++
 .../example_short_circuit_operator.py   | 13 ++
 airflow/example_dags/example_subdag_operator.py | 13 ++
 .../example_trigger_controller_dag.py   | 14 +-
 .../example_dags/example_trigger_target_dag.py  | 13 ++
 airflow/example_dags/example_xcom.py| 13 ++
 airflow/example_dags/test_utils.py  | 29 
 airflow/jobs.py | 46 
 airflow/models.py   | 27 +---
 tests/core.py   | 42 +-
 15 files changed, 263 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/docker_copy_data.py
--
diff --git a/airflow/example_dags/docker_copy_data.py 
b/airflow/example_dags/docker_copy_data.py
index ccf84c1..f0789b1 100644
--- a/airflow/example_dags/docker_copy_data.py
+++ b/airflow/example_dags/docker_copy_data.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 '''
 This sample "listen to directory". move the new file and print it, using 
docker-containers.
 The following operators are being used: DockerOperator, BashOperator & 
ShortCircuitOperator.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_bash_operator.py
--
diff --git a/airflow/example_dags/example_bash_operator.py 
b/airflow/example_dags/example_bash_operator.py
index 4ab9144..c759f4d 100644
--- a/airflow/example_dags/example_bash_operator.py
+++ b/airflow/example_dags/example_bash_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 from builtins import range
 from airflow.operators import BashOperator, DummyOperator
 from airflow.models import DAG

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_branch_operator.py
--
diff --git a/airflow/example_dags/example_branch_operator.py 
b/airflow/example_dags/example_branch_operator.py
index f576d20..edd177a 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -1,3 +1,16 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0

[jira] [Commented] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check

2016-06-17 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336763#comment-15336763
 ] 

ASF subversion and git services commented on AIRFLOW-256:
-

Commit ab2d71be199708918c1f6d85f0c48c51c777f1e4 in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ab2d71b ]

[AIRFLOW-256] Fix test_scheduler_reschedule heartrate

test_scheduler_reschedule runs two schedulerjob quite
fast after one another this sometimes is faster than
the heartrate allows and thus the tasks will not get
rescheduled and the test will fail. Fixed by setting
heartrate to 0.


> test_scheduler_reschedule fails due to heartrate check
> --
>
> Key: AIRFLOW-256
> URL: https://issues.apache.org/jira/browse/AIRFLOW-256
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>
> test_scheduler_reschedule can fail due to the heartrate check



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[1/2] incubator-airflow git commit: [AIRFLOW-256] Fix test_scheduler_reschedule heartrate

2016-06-17 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master adcccfa26 -> d243c003b


[AIRFLOW-256] Fix test_scheduler_reschedule heartrate

test_scheduler_reschedule runs two schedulerjob quite
fast after one another this sometimes is faster than
the heartrate allows and thus the tasks will not get
rescheduled and the test will fail. Fixed by setting
heartrate to 0.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab2d71be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab2d71be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab2d71be

Branch: refs/heads/master
Commit: ab2d71be199708918c1f6d85f0c48c51c777f1e4
Parents: ce362c3
Author: Bolke de Bruin 
Authored: Fri Jun 17 20:52:38 2016 +0200
Committer: Bolke de Bruin 
Committed: Fri Jun 17 20:52:38 2016 +0200

--
 tests/jobs.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab2d71be/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index 3618ce4..0619f3d 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -770,6 +770,7 @@ class SchedulerJobTest(unittest.TestCase):
 @mock.patch('airflow.models.DagBag.collect_dags')
 def do_schedule(function, function2):
 scheduler = SchedulerJob(num_runs=1, executor=executor,)
+scheduler.heartrate = 0
 scheduler.run()
 
 do_schedule()



[jira] [Updated] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check

2016-06-17 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-256:
---
External issue URL: https://github.com/apache/incubator-airflow/pull/1606

> test_scheduler_reschedule fails due to heartrate check
> --
>
> Key: AIRFLOW-256
> URL: https://issues.apache.org/jira/browse/AIRFLOW-256
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>
> test_scheduler_reschedule can fail due to the heartrate check



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check

2016-06-17 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-256:
--

 Summary: test_scheduler_reschedule fails due to heartrate check
 Key: AIRFLOW-256
 URL: https://issues.apache.org/jira/browse/AIRFLOW-256
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Bolke de Bruin


test_scheduler_reschedule can fail due to the heartrate check



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (AIRFLOW-246) dag_stats endpoint has a terrible query

2016-06-17 Thread Kengo Seki (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kengo Seki reassigned AIRFLOW-246:
--

Assignee: Kengo Seki

> dag_stats endpoint has a terrible query
> ---
>
> Key: AIRFLOW-246
> URL: https://issues.apache.org/jira/browse/AIRFLOW-246
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: Airflow 1.7.1
> Environment: MySQL Backend through sqlalchemy
>Reporter: Neil Hanlon
>Assignee: Kengo Seki
>
> Hitting this endpoint creates a series of queries on the database which take 
> over 20 seconds to run, causing the page to not load for that entire time. 
> Luckily the main page (which includes this under "Recent Statuses") loads 
> this synchronously, but still... waiting almost half a minute (at times more) 
> to see the statuses for dags is really not fun.
> We have less than a million rows in the task_instance table--so it's not even 
> a problem with that.
> Here's a query profile for the query:
> https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf
> We've done some optimizations on the database, but to no avail.
> The query:
> {code:sql}
> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS 
> task_instance_state, count(task_instance.task_id) AS count_1 FROM 
> task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, 
> dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
> 'running') AS running_dag_run ON running_dag_run.dag_id = 
> task_instance.dag_id AND running_dag_run.execution_date = 
> task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS 
> dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY 
> dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id 
> AND last_dag_run.execution_date = task_instance.execution_date WHERE 
> task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR 
> last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, 
> task_instance.state;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (AIRFLOW-246) dag_stats endpoint has a terrible query

2016-06-17 Thread Neil Hanlon (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336229#comment-15336229
 ] 

Neil Hanlon edited comment on AIRFLOW-246 at 6/17/16 2:43 PM:
--

[~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k 
rows).

Obviously with more task instances the actual size of this query will increase 
which will require database tuning to allow it to run (max_allowed_packet) and 
all that... so maybe it should also be looked at whether or not the whole thing 
should be rearchitected... both of the queries (original, and the one you 
wrote) are pretty ugly. I'm not saying all queries have to be pretty, but...

Let me know if you get a code fix for this and I'd be happy to test it.

(new query profile for posterity)

{code:sql}
airflow-rwdb01:airflow> show profile for query 1;
++--+
| Status | Duration |
++--+
| starting   | 0.33 |
| Waiting for query cache lock   | 0.05 |
| Waiting on query cache mutex   | 0.04 |
| checking query cache for query | 0.002673 |
| checking permissions   | 0.06 |
| checking permissions   | 0.04 |
| checking permissions   | 0.03 |
| checking permissions   | 0.06 |
| Opening tables | 0.25 |
| System lock| 0.001740 |
| optimizing | 0.05 |
| statistics | 0.17 |
| preparing  | 0.08 |
| executing  | 0.07 |
| Sorting result | 0.04 |
| Sending data   | 0.000855 |
| optimizing | 0.09 |
| statistics | 0.09 |
| preparing  | 0.10 |
| executing  | 0.04 |
| Sending data   | 0.000622 |
| optimizing | 0.73 |
| statistics | 1.371602 |
| preparing  | 0.000239 |
| executing  | 0.08 |
| Sending data   | 0.407134 |
| optimizing | 0.000145 |
| statistics | 1.176737 |
| preparing  | 0.000174 |
| executing  | 0.06 |
| Sending data   | 0.395361 |
| optimizing | 0.11 |
| statistics | 0.14 |
| preparing  | 0.12 |
| executing  | 0.05 |
| Sending data   | 0.000654 |
| removing tmp table | 0.21 |
| Sending data   | 0.13 |
| Waiting for query cache lock   | 0.05 |
| Waiting on query cache mutex   | 0.05 |
| Sending data   | 0.001048 |
| init   | 0.23 |
| optimizing | 0.06 |
| statistics | 0.07 |
| preparing  | 0.08 |
| Creating tmp table | 0.000386 |
| executing  | 0.06 |
| Copying to tmp table   | 0.005354 |
| Sorting result | 0.91 |
| Sending data   | 0.90 |
| end| 0.04 |
| removing tmp table | 0.000169 |
| end| 0.06 |
| query end  | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.05 |
| closing tables | 0.13 |
| freeing items  | 0.000144 |
| Waiting for query cache lock   | 0.04 |
| Waiting on query cache mutex   | 0.03 |
| freeing items  | 0.57 |
| Waiting for query cache lock   | 0.04 |
| Waiting on query cache mutex   | 0.05 |
| freeing items  | 0.04 |
| storing result in query cache  | 0.07 |
| logging slow query | 0.04 |
| logging slow query | 0.000221 |
| cleaning up| 0.13 |
++--+
72 rows in set (0.00 sec)
{code}


was (Author: nhanlon):
[~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k 
rows).

Obviously with more task instances the actual size of this query will increase 
which will require database tuning to allow it to run (max_allowed_packet) and 
all that... so maybe it should also be looked at whether or not the whole thing 
should be rearchitected... both of the queries (original, and the one you 
wrote) are pretty ugly. I'm not saying all queries have to be pretty, but...

Let me know if you get a 

[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query

2016-06-17 Thread Neil Hanlon (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336229#comment-15336229
 ] 

Neil Hanlon commented on AIRFLOW-246:
-

[~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k 
rows).

Obviously with more task instances the actual size of this query will increase 
which will require database tuning to allow it to run (max_allowed_packet) and 
all that... so maybe it should also be looked at whether or not the whole thing 
should be rearchitected... both of the queries (original, and the one you 
wrote) are pretty ugly. I'm not saying all queries have to be pretty, but...

Let me know if you get a code fix for this and I'd be happy to test it.

(new query profile for posterity)

{pre}
airflow-rwdb01:airflow> show profile for query 1;
++--+
| Status | Duration |
++--+
| starting   | 0.33 |
| Waiting for query cache lock   | 0.05 |
| Waiting on query cache mutex   | 0.04 |
| checking query cache for query | 0.002673 |
| checking permissions   | 0.06 |
| checking permissions   | 0.04 |
| checking permissions   | 0.03 |
| checking permissions   | 0.06 |
| Opening tables | 0.25 |
| System lock| 0.001740 |
| optimizing | 0.05 |
| statistics | 0.17 |
| preparing  | 0.08 |
| executing  | 0.07 |
| Sorting result | 0.04 |
| Sending data   | 0.000855 |
| optimizing | 0.09 |
| statistics | 0.09 |
| preparing  | 0.10 |
| executing  | 0.04 |
| Sending data   | 0.000622 |
| optimizing | 0.73 |
| statistics | 1.371602 |
| preparing  | 0.000239 |
| executing  | 0.08 |
| Sending data   | 0.407134 |
| optimizing | 0.000145 |
| statistics | 1.176737 |
| preparing  | 0.000174 |
| executing  | 0.06 |
| Sending data   | 0.395361 |
| optimizing | 0.11 |
| statistics | 0.14 |
| preparing  | 0.12 |
| executing  | 0.05 |
| Sending data   | 0.000654 |
| removing tmp table | 0.21 |
| Sending data   | 0.13 |
| Waiting for query cache lock   | 0.05 |
| Waiting on query cache mutex   | 0.05 |
| Sending data   | 0.001048 |
| init   | 0.23 |
| optimizing | 0.06 |
| statistics | 0.07 |
| preparing  | 0.08 |
| Creating tmp table | 0.000386 |
| executing  | 0.06 |
| Copying to tmp table   | 0.005354 |
| Sorting result | 0.91 |
| Sending data   | 0.90 |
| end| 0.04 |
| removing tmp table | 0.000169 |
| end| 0.06 |
| query end  | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.07 |
| closing tables | 0.04 |
| removing tmp table | 0.05 |
| closing tables | 0.13 |
| freeing items  | 0.000144 |
| Waiting for query cache lock   | 0.04 |
| Waiting on query cache mutex   | 0.03 |
| freeing items  | 0.57 |
| Waiting for query cache lock   | 0.04 |
| Waiting on query cache mutex   | 0.05 |
| freeing items  | 0.04 |
| storing result in query cache  | 0.07 |
| logging slow query | 0.04 |
| logging slow query | 0.000221 |
| cleaning up| 0.13 |
++--+
72 rows in set (0.00 sec)
{pre}

> dag_stats endpoint has a terrible query
> ---
>
> Key: AIRFLOW-246
> URL: https://issues.apache.org/jira/browse/AIRFLOW-246
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Affects Versions: Airflow 1.7.1
> Environment: MySQL Backend through sqlalchemy
>Reporter: Neil Hanlon
>
> Hitting this endpoint creates a series of queries on the database which take 
> over 20 seconds to run, causing the page to not load for that entire time. 
> 

[jira] [Commented] (AIRFLOW-255) schedule_dag shouldn't return early if dagrun_timeout is given

2016-06-17 Thread Kevin Lin (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15336223#comment-15336223
 ] 

Kevin Lin commented on AIRFLOW-255:
---

Take a stab at creating a PR for fix: 
https://github.com/apache/incubator-airflow/pull/1604

> schedule_dag shouldn't return early if dagrun_timeout is given
> --
>
> Key: AIRFLOW-255
> URL: https://issues.apache.org/jira/browse/AIRFLOW-255
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Kevin Lin
>
> In 
> https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L396, 
> schedule_dag returns as long as `len(active_runs) >= dag.max_active_runs`. It 
> should also take into account dag.dagrun_timeout



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-255) schedule_dag shouldn't return early if dagrun_timeout is given

2016-06-17 Thread Kevin Lin (JIRA)
Kevin Lin created AIRFLOW-255:
-

 Summary: schedule_dag shouldn't return early if dagrun_timeout is 
given
 Key: AIRFLOW-255
 URL: https://issues.apache.org/jira/browse/AIRFLOW-255
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Kevin Lin


In 
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L396, 
schedule_dag returns as long as `len(active_runs) >= dag.max_active_runs`. It 
should also take into account dag.dagrun_timeout



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (AIRFLOW-254) Webserver should refresh all workers in case of a dag refresh / update

2016-06-17 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-254:
---
External issue URL: https://github.com/apache/incubator-airflow/pull/1603

> Webserver should refresh all workers in case of a dag refresh / update
> --
>
> Key: AIRFLOW-254
> URL: https://issues.apache.org/jira/browse/AIRFLOW-254
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Bolke de Bruin
>
> The webserver only refreshes one process in case a dag refresh is demanded or 
> an update is made to a dag. This is annoying as you might end up with old 
> code in the views or the dreaded "scheduler has put this in the db, but the 
> webserver hasnt got it yet".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (AIRFLOW-254) Webserver should refresh all workers in case of a dag refresh / update

2016-06-17 Thread Bolke de Bruin (JIRA)
Bolke de Bruin created AIRFLOW-254:
--

 Summary: Webserver should refresh all workers in case of a dag 
refresh / update
 Key: AIRFLOW-254
 URL: https://issues.apache.org/jira/browse/AIRFLOW-254
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Bolke de Bruin


The webserver only refreshes one process in case a dag refresh is demanded or 
an update is made to a dag. This is annoying as you might end up with old code 
in the views or the dreaded "scheduler has put this in the db, but the 
webserver hasnt got it yet".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (AIRFLOW-31) Use standard imports for hooks/operators

2016-06-17 Thread Jeremiah Lowin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeremiah Lowin closed AIRFLOW-31.
-
   Resolution: Fixed
Fix Version/s: (was: Airflow 2.0)
   Airflow 1.8

Merged in https://github.com/apache/incubator-airflow/pull/1272

> Use standard imports for hooks/operators
> 
>
> Key: AIRFLOW-31
> URL: https://issues.apache.org/jira/browse/AIRFLOW-31
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: enhancement
> Fix For: Airflow 1.8
>
>
> (Migrated from https://github.com/airbnb/airflow/issues/1238)
> Currently, Airflow uses a relatively complex import mechanism to import hooks 
> and operators without polluting the namespace with submodules. I would like 
> to propose that Airflow abandon that system and use standard Python importing.
> Here are a few major reasons why I think the current system has run its 
> course.
> h3. Polluting namespace
> The biggest advantage of the current system, as I understand it, is that only 
> Operators appear in the `airflow.operators` namespace.  The submodules that 
> actually contain the operators do not.
> So for example while `airflow.operators.python_operator.PythonOperator` is a 
> thing, `PythonOperator` is in the `airflow.operators` namespace but 
> `python_operator` is not.
> I think this sort of namespace pollution was helpful when Airflow was a 
> smaller project, but as the number of hooks/operators grows -- and especially 
> as the `contrib` hooks/operators grow -- I'd argue that namespacing is a 
> *good thing*. It provides structure and organization, and opportunities for 
> documentation (through module docstrings).
> In fact, I'd argue that the current namespace is itself getting quite 
> polluted -- the only way to know what's available is to use something like 
> Ipython tab-completion to browse an alphabetical list of Operator names, or 
> to load the source file and grok the import definition (which no one 
> installing from pypi is likely to do).
> h3. Conditional imports
> There's a second advantage to the current system that any module that fails 
> to import is silently ignored. It makes it easy to have optional 
> dependencies. For example, if someone doesn't have `boto` installed, then 
> they don't have an `S3Hook` either. Same for a HiveOperator
> Again, as Airflow grows and matures, I think this is a little too magic. If 
> my environment is missing a dependency, I want to hear about it.
> On the other hand, the `contrib` namespace sort of depends on this -- we 
> don't want users to have to install every single dependency. So I propose 
> that contrib modules all live in their submodules: `from 
> airflow.contrib.operators.my_operator import MyOperator`. As mentioned 
> previously, having structure and namespacing is a good thing as the project 
> gets more complex.
> Other ways to handle this include putting "non-standard" dependencies inside 
> the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so 
> it can be imported but not used. Another is judicious use of `try`/`except 
> ImportError`. The simplest is to make people import things explicitly from 
> submodules.
> h3. Operator dependencies
> Right now, operators can't depend on each other if they aren't in the same 
> file. This is for the simple reason that there is no guarantee on what order 
> the operators will be loaded. It all comes down to which dictionary key gets 
> loaded first. One day Operator B could be loaded after Operator A; the next 
> day it might be loaded before. Consequently, A and B can't depend on each 
> other. Worse, if a user makes two operators that do depend on each other, 
> they won't get an error message when one fails to import.
> For contrib modules in particular, this is sort of killer.
> h3. Ease of use
> It's *hard* to set up imports for a new operator. The dictionary-based import 
> instructions aren't obvious for new users, and errors are silently dismissed 
> which makes debugging difficult.
> h3. Identity
> Surprisingly, `airflow.operators.SubDagOperator != 
> airflow.operators.subdag_operator.SubDagOperator`. See #1168.
> h2. Proposal
> Use standard python importing for hooks/operators/etc.
> - `__init__.py` files use straightforward, standard Python imports
> - major operators are available at `airflow.operators.OperatorName` or 
> `airflow.operators.operator_module.OperatorName`.
> - contrib operators are only available at 
> `airflow.contrib.operators.operator_module.OperatorName` in order to manage 
> dependencies
> - operator authors are encouraged to use `__all__` to define their module's 
> exports
> Possibly delete namespace afterward
> - in 

[jira] [Commented] (AIRFLOW-31) Use standard imports for hooks/operators

2016-06-17 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335919#comment-15335919
 ] 

ASF subversion and git services commented on AIRFLOW-31:


Commit 851adc5547597ec51743be4bc47d634c77d6dc17 in incubator-airflow's branch 
refs/heads/master from jlowin
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=851adc5 ]

[AIRFLOW-31] Use standard imports for hooks/operators


> Use standard imports for hooks/operators
> 
>
> Key: AIRFLOW-31
> URL: https://issues.apache.org/jira/browse/AIRFLOW-31
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Reporter: Jeremiah Lowin
>Assignee: Jeremiah Lowin
>  Labels: enhancement
> Fix For: Airflow 2.0
>
>
> (Migrated from https://github.com/airbnb/airflow/issues/1238)
> Currently, Airflow uses a relatively complex import mechanism to import hooks 
> and operators without polluting the namespace with submodules. I would like 
> to propose that Airflow abandon that system and use standard Python importing.
> Here are a few major reasons why I think the current system has run its 
> course.
> h3. Polluting namespace
> The biggest advantage of the current system, as I understand it, is that only 
> Operators appear in the `airflow.operators` namespace.  The submodules that 
> actually contain the operators do not.
> So for example while `airflow.operators.python_operator.PythonOperator` is a 
> thing, `PythonOperator` is in the `airflow.operators` namespace but 
> `python_operator` is not.
> I think this sort of namespace pollution was helpful when Airflow was a 
> smaller project, but as the number of hooks/operators grows -- and especially 
> as the `contrib` hooks/operators grow -- I'd argue that namespacing is a 
> *good thing*. It provides structure and organization, and opportunities for 
> documentation (through module docstrings).
> In fact, I'd argue that the current namespace is itself getting quite 
> polluted -- the only way to know what's available is to use something like 
> Ipython tab-completion to browse an alphabetical list of Operator names, or 
> to load the source file and grok the import definition (which no one 
> installing from pypi is likely to do).
> h3. Conditional imports
> There's a second advantage to the current system that any module that fails 
> to import is silently ignored. It makes it easy to have optional 
> dependencies. For example, if someone doesn't have `boto` installed, then 
> they don't have an `S3Hook` either. Same for a HiveOperator
> Again, as Airflow grows and matures, I think this is a little too magic. If 
> my environment is missing a dependency, I want to hear about it.
> On the other hand, the `contrib` namespace sort of depends on this -- we 
> don't want users to have to install every single dependency. So I propose 
> that contrib modules all live in their submodules: `from 
> airflow.contrib.operators.my_operator import MyOperator`. As mentioned 
> previously, having structure and namespacing is a good thing as the project 
> gets more complex.
> Other ways to handle this include putting "non-standard" dependencies inside 
> the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so 
> it can be imported but not used. Another is judicious use of `try`/`except 
> ImportError`. The simplest is to make people import things explicitly from 
> submodules.
> h3. Operator dependencies
> Right now, operators can't depend on each other if they aren't in the same 
> file. This is for the simple reason that there is no guarantee on what order 
> the operators will be loaded. It all comes down to which dictionary key gets 
> loaded first. One day Operator B could be loaded after Operator A; the next 
> day it might be loaded before. Consequently, A and B can't depend on each 
> other. Worse, if a user makes two operators that do depend on each other, 
> they won't get an error message when one fails to import.
> For contrib modules in particular, this is sort of killer.
> h3. Ease of use
> It's *hard* to set up imports for a new operator. The dictionary-based import 
> instructions aren't obvious for new users, and errors are silently dismissed 
> which makes debugging difficult.
> h3. Identity
> Surprisingly, `airflow.operators.SubDagOperator != 
> airflow.operators.subdag_operator.SubDagOperator`. See #1168.
> h2. Proposal
> Use standard python importing for hooks/operators/etc.
> - `__init__.py` files use straightforward, standard Python imports
> - major operators are available at `airflow.operators.OperatorName` or 
> `airflow.operators.operator_module.OperatorName`.
> - contrib operators are only available at 
> `airflow.contrib.operators.operator_module.OperatorName` in order to manage 
> 

[1/4] incubator-airflow git commit: [AIRFLOW-31] Use standard imports for hooks/operators

2016-06-17 Thread jlowin
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ce362c312 -> adcccfa26


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/__init__.py
--
diff --git a/tests/__init__.py b/tests/__init__.py
index ca8150b..4a79d0f 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,9 +1,9 @@
 from __future__ import absolute_import
 
 from .configuration import *
+from .contrib import *
 from .core import *
 from .jobs import *
 from .models import *
 from .operators import *
-from .contrib import *
 from .utils import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/core.py
--
diff --git a/tests/core.py b/tests/core.py
index af791e3..5e6a4fd 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from __future__ import print_function
 
 import doctest
@@ -289,7 +290,7 @@ class CoreTest(unittest.TestCase):
 assert hash(self.dag) != hash(dag_subclass)
 
 def test_time_sensor(self):
-t = operators.TimeSensor(
+t = operators.sensors.TimeSensor(
 task_id='time_sensor_check',
 target_time=time(0),
 dag=self.dag)
@@ -380,21 +381,22 @@ class CoreTest(unittest.TestCase):
 t.dry_run()
 
 def test_sqlite(self):
-t = operators.SqliteOperator(
+import airflow.operators.sqlite_operator
+t = airflow.operators.sqlite_operator.SqliteOperator(
 task_id='time_sqlite',
 sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))",
 dag=self.dag)
 t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
 def test_timedelta_sensor(self):
-t = operators.TimeDeltaSensor(
+t = operators.sensors.TimeDeltaSensor(
 task_id='timedelta_sensor_check',
 delta=timedelta(seconds=2),
 dag=self.dag)
 t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
 def test_external_task_sensor(self):
-t = operators.ExternalTaskSensor(
+t = operators.sensors.ExternalTaskSensor(
 task_id='test_external_task_sensor_check',
 external_dag_id=TEST_DAG_ID,
 external_task_id='time_sensor_check',
@@ -402,7 +404,7 @@ class CoreTest(unittest.TestCase):
 t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
 def test_external_task_sensor_delta(self):
-t = operators.ExternalTaskSensor(
+t = operators.sensors.ExternalTaskSensor(
 task_id='test_external_task_sensor_check_delta',
 external_dag_id=TEST_DAG_ID,
 external_task_id='time_sensor_check',
@@ -1077,97 +1079,6 @@ class WebLdapAuthTest(unittest.TestCase):
 session.close()
 configuration.conf.set("webserver", "authenticate", "False")
 
-
-if 'MySqlOperator' in dir(operators):
-# Only testing if the operator is installed
-class MySqlTest(unittest.TestCase):
-def setUp(self):
-configuration.test_mode()
-args = {
-'owner': 'airflow',
-'mysql_conn_id': 'airflow_db',
-'start_date': DEFAULT_DATE
-}
-dag = DAG(TEST_DAG_ID, default_args=args)
-self.dag = dag
-
-def mysql_operator_test(self):
-sql = """
-CREATE TABLE IF NOT EXISTS test_airflow (
-dummy VARCHAR(50)
-);
-"""
-t = operators.MySqlOperator(
-task_id='basic_mysql',
-sql=sql,
-mysql_conn_id='airflow_db',
-dag=self.dag)
-t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-def mysql_operator_test_multi(self):
-sql = [
-"TRUNCATE TABLE test_airflow",
-"INSERT INTO test_airflow VALUES ('X')",
-]
-t = operators.MySqlOperator(
-task_id='mysql_operator_test_multi',
-mysql_conn_id='airflow_db',
-sql=sql, dag=self.dag)
-t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-def test_mysql_to_mysql(self):
-sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
-t = operators.GenericTransfer(
-task_id='test_m2m',
-preoperator=[
-"DROP TABLE IF EXISTS test_mysql_to_mysql",
-"CREATE TABLE IF NOT EXISTS "
-"test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES"
-],
-source_conn_id='airflow_db',
-

[3/4] incubator-airflow git commit: Add Python 3 compatibility fix

2016-06-17 Thread jlowin
Add Python 3 compatibility fix

In Python 3, errors don’t have a `message` attribute


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f26b7a25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f26b7a25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f26b7a25

Branch: refs/heads/master
Commit: f26b7a25d9c4632af68d9d64ac5f4a929a44f426
Parents: 851adc5
Author: jlowin 
Authored: Thu Jun 16 16:53:27 2016 -0400
Committer: jlowin 
Committed: Thu Jun 16 16:53:27 2016 -0400

--
 airflow/operators/sensors.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f26b7a25/airflow/operators/sensors.py
--
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index 6d87b44..5276f6e 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -532,7 +532,7 @@ class HttpSensor(BaseSensorOperator):
 # run content check on response
 return self.response_check(response)
 except AirflowException as ae:
-if ae.message.startswith("404"):
+if str(ae).startswith("404"):
 return False
 
 raise ae



[4/4] incubator-airflow git commit: Merge pull request #1272 from jlowin/standard-imports

2016-06-17 Thread jlowin
Merge pull request #1272 from jlowin/standard-imports


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/adcccfa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/adcccfa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/adcccfa2

Branch: refs/heads/master
Commit: adcccfa26ff666f4a787670441caf4c49f8ccef5
Parents: ce362c3 f26b7a2
Author: jlowin 
Authored: Fri Jun 17 07:57:06 2016 -0400
Committer: jlowin 
Committed: Fri Jun 17 07:57:06 2016 -0400

--
 airflow/__init__.py |   6 +-
 .../contrib/example_dags/example_twitter_dag.py |   4 +-
 airflow/contrib/hooks/__init__.py   |  33 +-
 airflow/contrib/operators/__init__.py   |  42 ++-
 airflow/contrib/operators/fs_operator.py|   3 +-
 airflow/contrib/operators/mysql_to_gcs.py   |   2 +-
 airflow/contrib/operators/vertica_to_hive.py|   2 +-
 .../contrib/plugins/metastore_browser/main.py   |   4 +-
 airflow/example_dags/example_http_operator.py   |   3 +-
 airflow/hooks/__init__.py   |  73 +++-
 airflow/hooks/base_hook.py  |  14 +
 airflow/hooks/dbapi_hook.py |  14 +
 airflow/hooks/druid_hook.py |  14 +
 airflow/hooks/hdfs_hook.py  |  14 +
 airflow/hooks/http_hook.py  |   2 +-
 airflow/hooks/jdbc_hook.py  |  14 +
 airflow/hooks/mssql_hook.py |  14 +
 airflow/hooks/mysql_hook.py |  14 +
 airflow/hooks/oracle_hook.py|  14 +
 airflow/hooks/pig_hook.py   |  14 +
 airflow/hooks/postgres_hook.py  |  14 +
 airflow/hooks/presto_hook.py|  14 +
 airflow/hooks/samba_hook.py |  14 +
 airflow/hooks/sqlite_hook.py|  14 +
 airflow/hooks/webhdfs_hook.py   |  14 +
 airflow/macros/__init__.py  |  38 ++-
 airflow/macros/hive.py  |   4 +-
 airflow/models.py   |  38 ++-
 airflow/operators/__init__.py   | 120 +--
 airflow/operators/bash_operator.py  |  14 +
 airflow/operators/check_operator.py |  14 +
 airflow/operators/dagrun_operator.py|  14 +
 airflow/operators/docker_operator.py|  14 +
 airflow/operators/dummy_operator.py |  14 +
 airflow/operators/email_operator.py |  14 +
 airflow/operators/generic_transfer.py   |  14 +
 airflow/operators/hive_operator.py  |  16 +-
 airflow/operators/hive_stats_operator.py|  18 +-
 airflow/operators/hive_to_druid.py  |  17 +-
 airflow/operators/hive_to_mysql.py  |  17 +-
 airflow/operators/hive_to_samba_operator.py |  17 +-
 airflow/operators/http_operator.py  |  14 +
 airflow/operators/jdbc_operator.py  |  14 +
 airflow/operators/mssql_operator.py |  16 +-
 airflow/operators/mssql_to_hive.py  |  17 +-
 airflow/operators/mysql_operator.py |  16 +-
 airflow/operators/mysql_to_hive.py  |  17 +-
 airflow/operators/oracle_operator.py|   2 +-
 airflow/operators/pig_operator.py   |  16 +-
 airflow/operators/postgres_operator.py  |  16 +-
 airflow/operators/presto_check_operator.py  |  16 +-
 airflow/operators/presto_to_mysql.py|  17 +-
 airflow/operators/python_operator.py|  14 +
 airflow/operators/s3_file_transform_operator.py |  16 +-
 airflow/operators/s3_to_hive_operator.py|  17 +-
 airflow/operators/sensors.py|  31 +-
 airflow/operators/slack_operator.py |  14 +
 airflow/operators/sqlite_operator.py|  14 +
 airflow/plugins_manager.py  |  30 +-
 airflow/utils/email.py  |  18 +-
 airflow/utils/logging.py|   2 +-
 airflow/utils/tests.py  |  23 ++
 dags/testdruid.py   |   2 +-
 run_unit_tests.sh   |   3 +
 setup.py|  14 +
 tests/__init__.py   |   2 +-
 tests/core.py   | 340 +--
 tests/operators/__init__.py |  17 +
 tests/operators/hive_operator.py| 209 
 tests/operators/operators.py| 174 ++
 tests/operators/sensor.py   |  39 ---
 tests/operators/sensors.py  |  39 +++
 72 files changed, 1458 insertions(+), 474 deletions(-)

[2/4] incubator-airflow git commit: [AIRFLOW-31] Use standard imports for hooks/operators

2016-06-17 Thread jlowin
[AIRFLOW-31] Use standard imports for hooks/operators


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/851adc55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/851adc55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/851adc55

Branch: refs/heads/master
Commit: 851adc5547597ec51743be4bc47d634c77d6dc17
Parents: 06e70e2
Author: jlowin 
Authored: Wed Jun 15 17:39:12 2016 -0400
Committer: jlowin 
Committed: Thu Jun 16 14:55:07 2016 -0400

--
 airflow/__init__.py |   6 +-
 .../contrib/example_dags/example_twitter_dag.py |   4 +-
 airflow/contrib/hooks/__init__.py   |  33 +-
 airflow/contrib/operators/__init__.py   |  42 ++-
 airflow/contrib/operators/fs_operator.py|   3 +-
 airflow/contrib/operators/mysql_to_gcs.py   |   2 +-
 airflow/contrib/operators/vertica_to_hive.py|   2 +-
 .../contrib/plugins/metastore_browser/main.py   |   4 +-
 airflow/example_dags/example_http_operator.py   |   3 +-
 airflow/hooks/__init__.py   |  73 +++-
 airflow/hooks/base_hook.py  |  14 +
 airflow/hooks/dbapi_hook.py |  14 +
 airflow/hooks/druid_hook.py |  14 +
 airflow/hooks/hdfs_hook.py  |  14 +
 airflow/hooks/http_hook.py  |   2 +-
 airflow/hooks/jdbc_hook.py  |  14 +
 airflow/hooks/mssql_hook.py |  14 +
 airflow/hooks/mysql_hook.py |  14 +
 airflow/hooks/oracle_hook.py|  14 +
 airflow/hooks/pig_hook.py   |  14 +
 airflow/hooks/postgres_hook.py  |  14 +
 airflow/hooks/presto_hook.py|  14 +
 airflow/hooks/samba_hook.py |  14 +
 airflow/hooks/sqlite_hook.py|  14 +
 airflow/hooks/webhdfs_hook.py   |  14 +
 airflow/macros/__init__.py  |  38 ++-
 airflow/macros/hive.py  |   4 +-
 airflow/models.py   |  38 ++-
 airflow/operators/__init__.py   | 120 +--
 airflow/operators/bash_operator.py  |  14 +
 airflow/operators/check_operator.py |  14 +
 airflow/operators/dagrun_operator.py|  14 +
 airflow/operators/docker_operator.py|  14 +
 airflow/operators/dummy_operator.py |  14 +
 airflow/operators/email_operator.py |  14 +
 airflow/operators/generic_transfer.py   |  14 +
 airflow/operators/hive_operator.py  |  16 +-
 airflow/operators/hive_stats_operator.py|  18 +-
 airflow/operators/hive_to_druid.py  |  17 +-
 airflow/operators/hive_to_mysql.py  |  17 +-
 airflow/operators/hive_to_samba_operator.py |  17 +-
 airflow/operators/http_operator.py  |  14 +
 airflow/operators/jdbc_operator.py  |  14 +
 airflow/operators/mssql_operator.py |  16 +-
 airflow/operators/mssql_to_hive.py  |  17 +-
 airflow/operators/mysql_operator.py |  16 +-
 airflow/operators/mysql_to_hive.py  |  17 +-
 airflow/operators/oracle_operator.py|   2 +-
 airflow/operators/pig_operator.py   |  16 +-
 airflow/operators/postgres_operator.py  |  16 +-
 airflow/operators/presto_check_operator.py  |  16 +-
 airflow/operators/presto_to_mysql.py|  17 +-
 airflow/operators/python_operator.py|  14 +
 airflow/operators/s3_file_transform_operator.py |  16 +-
 airflow/operators/s3_to_hive_operator.py|  17 +-
 airflow/operators/sensors.py|  29 +-
 airflow/operators/slack_operator.py |  14 +
 airflow/operators/sqlite_operator.py|  14 +
 airflow/plugins_manager.py  |  30 +-
 airflow/utils/email.py  |  18 +-
 airflow/utils/logging.py|   2 +-
 airflow/utils/tests.py  |  23 ++
 dags/testdruid.py   |   2 +-
 run_unit_tests.sh   |   3 +
 setup.py|  14 +
 tests/__init__.py   |   2 +-
 tests/core.py   | 340 +--
 tests/operators/__init__.py |  17 +
 tests/operators/hive_operator.py| 209 
 tests/operators/operators.py| 174 ++
 tests/operators/sensor.py   |  39 ---
 tests/operators/sensors.py  |  39 +++
 72 files changed, 1457 insertions(+), 473 deletions(-)

[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query

2016-06-17 Thread Kengo Seki (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15335724#comment-15335724
 ] 

Kengo Seki commented on AIRFLOW-246:


Multiple left outer joins seem to affect performance. I think we can rewrite 
the query in question by replacing left outer join with inner join and union, 
such as:

{code:sql}
SELECT
  dag_id AS task_instance_dag_id,
  state AS task_instance_state,
  count(*) as count_1
FROM (
  SELECT
task_instance.dag_id,
task_instance.state
  FROM
task_instance
  JOIN (
SELECT
  dag_run.dag_id AS dag_id,
  dag_run.execution_date AS execution_date
FROM
  dag_run
WHERE
  dag_run.state = 'running'
  ) AS running_dag_run
  ON
running_dag_run.dag_id = task_instance.dag_id
  AND
running_dag_run.execution_date = task_instance.execution_date
  WHERE
task_id IN ...

  UNION ALL

  SELECT
task_instance.dag_id,
task_instance.state
  FROM
task_instance
  JOIN (
SELECT
  dag_run.dag_id AS dag_id,
  max(dag_run.execution_date) AS execution_date
FROM
  dag_run
GROUP BY
  dag_run.dag_id
  ) AS last_dag_run
  ON
last_dag_run.dag_id = task_instance.dag_id
  AND
last_dag_run.execution_date = task_instance.execution_date
  WHERE
task_id IN ...
) t
GROUP BY
  dag_id,
  state;
{code}

I compared these queries with some dummy data, and got x3-4 improvement.

{code}
mysql> select count(*) from dag_run;
+--+
| count(*) |
+--+
| 3417 |
+--+
1 row in set (0.00 sec)

mysql> select count(*) from task_instance;
+--+
| count(*) |
+--+
|   229089 |
+--+
1 row in set (0.00 sec)

mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state 
AS task_instance_state, count(task_instance.task_id) AS count_1 FROM 
task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, 
dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 
'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id 
AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER 
JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS 
execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON 
last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = 
task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 
'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 
'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 
'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 
'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 
'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 
'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 
'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 
'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 
'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 
'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 
'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 
'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 
'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND 
(running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP 
BY task_instance.dag_id, task_instance.state; 
+-+-+-+
| task_instance_dag_id| task_instance_state | count_1 |
+-+-+-+
| example_bash_operator   | success |   6 |
| example_branch_dop_operator_v3  | NULL|   3 |
| example_branch_operator | skipped |   6 |
| example_branch_operator | success |   5 |
| example_http_operator   | failed  |   1 |
| example_http_operator   | upstream_failed |   5 |
| example_passing_params_via_test_command | success |   2 |
| example_short_circuit_operator  | skipped |   2 |
| example_short_circuit_operator  | success |   4 |
| example_skip_dag| skipped |   4 |
| example_skip_dag| success |   4 |
| example_subdag_operator | success |   5 |
| example_trigger_controller_dag  | success |   1 |
| example_trigger_target_dag  | success |   2 |
| example_xcom| success |   3 |
| tutorial| 

[jira] [Created] (AIRFLOW-253) Job does not find CONN_ID environmental variable

2016-06-17 Thread Nathan Scully (JIRA)
Nathan Scully created AIRFLOW-253:
-

 Summary: Job does not find CONN_ID environmental variable
 Key: AIRFLOW-253
 URL: https://issues.apache.org/jira/browse/AIRFLOW-253
 Project: Apache Airflow
  Issue Type: Bug
  Components: hooks
Affects Versions: 1.7.1.3
 Environment: OS: Ubuntu 14.04
airflow@ip-10-0-40-185:~$ pip show airflow
---
Name: airflow
Version: 1.7.1.3
Reporter: Nathan Scully


I am not sure if this is directly an airflow issue or something causing havoc 
in my environment but when the scheduler triggers a test job I have setup 
(@hourly) to ping a Redshift server with a 'select * from x limit 1;' I receive 
a no conn_id found. However, I can echo the environmental var and if I run the 
job as a test, or manually query from python console I can resolve it. Any 
ideas what might be causing it to be missed? 

Error from AIRFLOW scheduled run:
{code}
[2016-06-17 06:00:04,402] {models.py:1286} ERROR - The conn_id 
`OF_REDSHIFT` isn't defined
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 
1245, in run
result = task_copy.execute(context=context)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", 
line 66, in execute
return_value = self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/dags/ping_redshift.py", line 23, in 
ping_redshift_query
df = ph.get_records(sql)
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/hooks/dbapi_hook.py", line 72, 
in get_records
conn = self.get_conn()
  File 
"/usr/local/lib/python2.7/dist-packages/airflow/hooks/postgres_hook.py", line 
18, in get_conn
conn = self.get_connection(self.postgres_conn_id)
  File "/usr/local/lib/python2.7/dist-packages/airflow/hooks/base_hook.py", 
line 51, in get_connection
conn = random.choice(cls.get_connections(conn_id))
  File "/usr/local/lib/python2.7/dist-packages/airflow/hooks/base_hook.py", 
line 39, in get_connections
"The conn_id `{0}` isn't defined".format(conn_id))  
AirflowException: The conn_id `OF_REDSHIFT` isn't defined
[2016-06-17 06:00:04,403] {models.py:1306} INFO - Marking task as FAILED.
[2016-06-17 06:00:04,695] {models.py:1327} ERROR - The conn_id 
`OF_REDSHIFT` isn't defined
{code}
I can echo the env variable:
{code}
airflow@ip-xx-x-xx-xxx:~$ echo $AIRFLOW_CONN_OF_REDSHIFT
postgres://airflow:xxx@xxx:5439/xxx
{code}

And can also get it through python console:
{code}
airflow@ip-xx-x-xx-xxx:~$ python
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> CONN_ENV_PREFIX = 'AIRFLOW_CONN_'
>>> conn_id ='OF_REDSHIFT'
>>> os.environ.get(CONN_ENV_PREFIX + conn_id.upper())
'postgres://airflow:xxx@xxx:5439/xxx'
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)