[jira] [Created] (AIRFLOW-257) airflow command fails with "ImportError: No module named zope.deprecation"
Kengo Seki created AIRFLOW-257: -- Summary: airflow command fails with "ImportError: No module named zope.deprecation" Key: AIRFLOW-257 URL: https://issues.apache.org/jira/browse/AIRFLOW-257 Project: Apache Airflow Issue Type: Bug Reporter: Kengo Seki Priority: Critical After AIRFLOW-31 has been merged, airflow command fails as follows: {code} $ airflow webserver [2016-06-18 00:56:50,367] {__init__.py:36} INFO - Using executor SequentialExecutor [2016-06-18 00:56:50,492] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt [2016-06-18 00:56:50,529] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt Traceback (most recent call last): File "/home/sekikn/.virtualenvs/e/bin/airflow", line 6, in exec(compile(open(__file__).read(), __file__, 'exec')) File "/home/sekikn/dev/incubator-airflow/airflow/bin/airflow", line 4, in from airflow import configuration File "/home/sekikn/dev/incubator-airflow/airflow/__init__.py", line 76, in from airflow import operators File "/home/sekikn/dev/incubator-airflow/airflow/operators/__init__.py", line 24, in from .check_operator import ( File "/home/sekikn/dev/incubator-airflow/airflow/operators/check_operator.py", line 20, in from airflow.hooks import BaseHook File "/home/sekikn/dev/incubator-airflow/airflow/hooks/__init__.py", line 66, in from zope.deprecation import deprecated as _deprecated ImportError: No module named zope.deprecation {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-234) make task that aren't `running` self-terminate
[ https://issues.apache.org/jira/browse/AIRFLOW-234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336999#comment-15336999 ] ASF subversion and git services commented on AIRFLOW-234: - Commit 7c0f8373f59b0554d5ba15bb0e5e8669f0830313 in incubator-airflow's branch refs/heads/master from [~maxime.beauche...@apache.org] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=7c0f837 ] [AIRFLOW-234] make task that aren't `running` self-terminate Closes #1585 from mistercrunch/undeads > make task that aren't `running` self-terminate > -- > > Key: AIRFLOW-234 > URL: https://issues.apache.org/jira/browse/AIRFLOW-234 > Project: Apache Airflow > Issue Type: Bug >Reporter: Maxime Beauchemin > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
incubator-airflow git commit: [AIRFLOW-234] make task that aren't `running` self-terminate
Repository: incubator-airflow Updated Branches: refs/heads/master d243c003b -> 7c0f8373f [AIRFLOW-234] make task that aren't `running` self-terminate Closes #1585 from mistercrunch/undeads Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7c0f8373 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7c0f8373 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7c0f8373 Branch: refs/heads/master Commit: 7c0f8373f59b0554d5ba15bb0e5e8669f0830313 Parents: d243c00 Author: Maxime Beauchemin Authored: Fri Jun 17 14:30:55 2016 -0700 Committer: Maxime Beauchemin Committed: Fri Jun 17 14:30:55 2016 -0700 -- airflow/example_dags/docker_copy_data.py| 13 ++ airflow/example_dags/example_bash_operator.py | 13 ++ airflow/example_dags/example_branch_operator.py | 13 ++ airflow/example_dags/example_docker_operator.py | 13 ++ airflow/example_dags/example_http_operator.py | 13 ++ airflow/example_dags/example_python_operator.py | 13 ++ .../example_short_circuit_operator.py | 13 ++ airflow/example_dags/example_subdag_operator.py | 13 ++ .../example_trigger_controller_dag.py | 14 +- .../example_dags/example_trigger_target_dag.py | 13 ++ airflow/example_dags/example_xcom.py| 13 ++ airflow/example_dags/test_utils.py | 29 airflow/jobs.py | 46 airflow/models.py | 27 +--- tests/core.py | 42 +- 15 files changed, 263 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/docker_copy_data.py -- diff --git a/airflow/example_dags/docker_copy_data.py b/airflow/example_dags/docker_copy_data.py index ccf84c1..f0789b1 100644 --- a/airflow/example_dags/docker_copy_data.py +++ b/airflow/example_dags/docker_copy_data.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. ''' This sample "listen to directory". move the new file and print it, using docker-containers. The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_bash_operator.py -- diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 4ab9144..c759f4d 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from builtins import range from airflow.operators import BashOperator, DummyOperator from airflow.models import DAG http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7c0f8373/airflow/example_dags/example_branch_operator.py -- diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index f576d20..edd177a 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -1,3 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in wr
[jira] [Commented] (AIRFLOW-247) EMR Hook, Operators, Sensor
[ https://issues.apache.org/jira/browse/AIRFLOW-247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336796#comment-15336796 ] Greg Reda commented on AIRFLOW-247: --- Happy to help on this if need be. > EMR Hook, Operators, Sensor > --- > > Key: AIRFLOW-247 > URL: https://issues.apache.org/jira/browse/AIRFLOW-247 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Rob Froetscher >Assignee: Rob Froetscher >Priority: Minor > > Substory of https://issues.apache.org/jira/browse/AIRFLOW-115. It would be > nice to have an EMR hook and operators. > Hook to generally interact with EMR. > Operators to: > * setup and start a job flow > * add steps to an existing jobflow > A sensor to: > * monitor completion and status of EMR jobs -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check
[ https://issues.apache.org/jira/browse/AIRFLOW-256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336763#comment-15336763 ] ASF subversion and git services commented on AIRFLOW-256: - Commit ab2d71be199708918c1f6d85f0c48c51c777f1e4 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ab2d71b ] [AIRFLOW-256] Fix test_scheduler_reschedule heartrate test_scheduler_reschedule runs two schedulerjob quite fast after one another this sometimes is faster than the heartrate allows and thus the tasks will not get rescheduled and the test will fail. Fixed by setting heartrate to 0. > test_scheduler_reschedule fails due to heartrate check > -- > > Key: AIRFLOW-256 > URL: https://issues.apache.org/jira/browse/AIRFLOW-256 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin > > test_scheduler_reschedule can fail due to the heartrate check -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-airflow git commit: [AIRFLOW-256] Fix test_scheduler_reschedule heartrate
Repository: incubator-airflow Updated Branches: refs/heads/master adcccfa26 -> d243c003b [AIRFLOW-256] Fix test_scheduler_reschedule heartrate test_scheduler_reschedule runs two schedulerjob quite fast after one another this sometimes is faster than the heartrate allows and thus the tasks will not get rescheduled and the test will fail. Fixed by setting heartrate to 0. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab2d71be Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab2d71be Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab2d71be Branch: refs/heads/master Commit: ab2d71be199708918c1f6d85f0c48c51c777f1e4 Parents: ce362c3 Author: Bolke de Bruin Authored: Fri Jun 17 20:52:38 2016 +0200 Committer: Bolke de Bruin Committed: Fri Jun 17 20:52:38 2016 +0200 -- tests/jobs.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab2d71be/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index 3618ce4..0619f3d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -770,6 +770,7 @@ class SchedulerJobTest(unittest.TestCase): @mock.patch('airflow.models.DagBag.collect_dags') def do_schedule(function, function2): scheduler = SchedulerJob(num_runs=1, executor=executor,) +scheduler.heartrate = 0 scheduler.run() do_schedule()
[2/2] incubator-airflow git commit: Merge remote-tracking branch 'apache/master'
Merge remote-tracking branch 'apache/master' Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d243c003 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d243c003 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d243c003 Branch: refs/heads/master Commit: d243c003b09b3f1ff395592890f32d5269b8f27b Parents: ab2d71b adcccfa Author: Bolke de Bruin Authored: Fri Jun 17 21:20:23 2016 +0200 Committer: Bolke de Bruin Committed: Fri Jun 17 21:20:23 2016 +0200 -- airflow/__init__.py | 6 +- .../contrib/example_dags/example_twitter_dag.py | 4 +- airflow/contrib/hooks/__init__.py | 33 +- airflow/contrib/operators/__init__.py | 42 ++- airflow/contrib/operators/fs_operator.py| 3 +- airflow/contrib/operators/mysql_to_gcs.py | 2 +- airflow/contrib/operators/vertica_to_hive.py| 2 +- .../contrib/plugins/metastore_browser/main.py | 4 +- airflow/example_dags/example_http_operator.py | 3 +- airflow/hooks/__init__.py | 73 +++- airflow/hooks/base_hook.py | 14 + airflow/hooks/dbapi_hook.py | 14 + airflow/hooks/druid_hook.py | 14 + airflow/hooks/hdfs_hook.py | 14 + airflow/hooks/http_hook.py | 2 +- airflow/hooks/jdbc_hook.py | 14 + airflow/hooks/mssql_hook.py | 14 + airflow/hooks/mysql_hook.py | 14 + airflow/hooks/oracle_hook.py| 14 + airflow/hooks/pig_hook.py | 14 + airflow/hooks/postgres_hook.py | 14 + airflow/hooks/presto_hook.py| 14 + airflow/hooks/samba_hook.py | 14 + airflow/hooks/sqlite_hook.py| 14 + airflow/hooks/webhdfs_hook.py | 14 + airflow/macros/__init__.py | 38 ++- airflow/macros/hive.py | 4 +- airflow/models.py | 38 ++- airflow/operators/__init__.py | 120 +-- airflow/operators/bash_operator.py | 14 + airflow/operators/check_operator.py | 14 + airflow/operators/dagrun_operator.py| 14 + airflow/operators/docker_operator.py| 14 + airflow/operators/dummy_operator.py | 14 + airflow/operators/email_operator.py | 14 + airflow/operators/generic_transfer.py | 14 + airflow/operators/hive_operator.py | 16 +- airflow/operators/hive_stats_operator.py| 18 +- airflow/operators/hive_to_druid.py | 17 +- airflow/operators/hive_to_mysql.py | 17 +- airflow/operators/hive_to_samba_operator.py | 17 +- airflow/operators/http_operator.py | 14 + airflow/operators/jdbc_operator.py | 14 + airflow/operators/mssql_operator.py | 16 +- airflow/operators/mssql_to_hive.py | 17 +- airflow/operators/mysql_operator.py | 16 +- airflow/operators/mysql_to_hive.py | 17 +- airflow/operators/oracle_operator.py| 2 +- airflow/operators/pig_operator.py | 16 +- airflow/operators/postgres_operator.py | 16 +- airflow/operators/presto_check_operator.py | 16 +- airflow/operators/presto_to_mysql.py| 17 +- airflow/operators/python_operator.py| 14 + airflow/operators/s3_file_transform_operator.py | 16 +- airflow/operators/s3_to_hive_operator.py| 17 +- airflow/operators/sensors.py| 31 +- airflow/operators/slack_operator.py | 14 + airflow/operators/sqlite_operator.py| 14 + airflow/plugins_manager.py | 30 +- airflow/utils/email.py | 18 +- airflow/utils/logging.py| 2 +- airflow/utils/tests.py | 23 ++ dags/testdruid.py | 2 +- run_unit_tests.sh | 3 + setup.py| 14 + tests/__init__.py | 2 +- tests/core.py | 340 +-- tests/operators/__init__.py | 17 + tests/operators/hive_operator.py| 209 tests/operators/operators.py| 174 ++ tests/operators/sensor.py | 39 --- tests/operators/sensors.py | 39 +++ 72 files changed, 1458 insertions(+), 474 deletions(-) --
[jira] [Updated] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check
[ https://issues.apache.org/jira/browse/AIRFLOW-256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-256: --- External issue URL: https://github.com/apache/incubator-airflow/pull/1606 > test_scheduler_reschedule fails due to heartrate check > -- > > Key: AIRFLOW-256 > URL: https://issues.apache.org/jira/browse/AIRFLOW-256 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin > > test_scheduler_reschedule can fail due to the heartrate check -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-256) test_scheduler_reschedule fails due to heartrate check
Bolke de Bruin created AIRFLOW-256: -- Summary: test_scheduler_reschedule fails due to heartrate check Key: AIRFLOW-256 URL: https://issues.apache.org/jira/browse/AIRFLOW-256 Project: Apache Airflow Issue Type: Bug Reporter: Bolke de Bruin test_scheduler_reschedule can fail due to the heartrate check -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (AIRFLOW-246) dag_stats endpoint has a terrible query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kengo Seki reassigned AIRFLOW-246: -- Assignee: Kengo Seki > dag_stats endpoint has a terrible query > --- > > Key: AIRFLOW-246 > URL: https://issues.apache.org/jira/browse/AIRFLOW-246 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: Airflow 1.7.1 > Environment: MySQL Backend through sqlalchemy >Reporter: Neil Hanlon >Assignee: Kengo Seki > > Hitting this endpoint creates a series of queries on the database which take > over 20 seconds to run, causing the page to not load for that entire time. > Luckily the main page (which includes this under "Recent Statuses") loads > this synchronously, but still... waiting almost half a minute (at times more) > to see the statuses for dags is really not fun. > We have less than a million rows in the task_instance table--so it's not even > a problem with that. > Here's a query profile for the query: > https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf > We've done some optimizations on the database, but to no avail. > The query: > {code:sql} > SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS > task_instance_state, count(task_instance.task_id) AS count_1 FROM > task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, > dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = > 'running') AS running_dag_run ON running_dag_run.dag_id = > task_instance.dag_id AND running_dag_run.execution_date = > task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS > dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY > dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id > AND last_dag_run.execution_date = task_instance.execution_date WHERE > task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR > last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, > task_instance.state; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336365#comment-15336365 ] Kengo Seki commented on AIRFLOW-246: bq. Looks good to me. Query only takes ~4 seconds to run on our db (~750k rows). I'm glad to hear that. Thanks for confirming ;) bq. maybe it should also be looked at whether or not the whole thing should be rearchitected... both of the queries (original, and the one you wrote) are pretty ugly. Well, I'm not sure whether it should be, since I'm a newbie to Airflow. I've just started evaluation and contribution to Airflow a month ago. I hope Airflow experts give us some advice, workarounds, etc. I agree that these queries are not so readable, but I also think it's somewhat inevitable as far as using ORM such as SQLAlchemy. The original python code which generates the query in question is not so difficult to read to me. bq. Let me know if you get a code fix for this and I'd be happy to test it. Sure. I hope I can submit the first PR this weekend. > dag_stats endpoint has a terrible query > --- > > Key: AIRFLOW-246 > URL: https://issues.apache.org/jira/browse/AIRFLOW-246 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: Airflow 1.7.1 > Environment: MySQL Backend through sqlalchemy >Reporter: Neil Hanlon > > Hitting this endpoint creates a series of queries on the database which take > over 20 seconds to run, causing the page to not load for that entire time. > Luckily the main page (which includes this under "Recent Statuses") loads > this synchronously, but still... waiting almost half a minute (at times more) > to see the statuses for dags is really not fun. > We have less than a million rows in the task_instance table--so it's not even > a problem with that. > Here's a query profile for the query: > https://gist.github.com/NeilHanlon/613f12724e802bc51c23fca7d46d28bf > We've done some optimizations on the database, but to no avail. > The query: > {code:sql} > SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS > task_instance_state, count(task_instance.task_id) AS count_1 FROM > task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, > dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = > 'running') AS running_dag_run ON running_dag_run.dag_id = > task_instance.dag_id AND running_dag_run.execution_date = > task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS > dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY > dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id > AND last_dag_run.execution_date = task_instance.execution_date WHERE > task_instance.task_id IN ... AND (running_dag_run.dag_id IS NOT NULL OR > last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, > task_instance.state; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (AIRFLOW-246) dag_stats endpoint has a terrible query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336229#comment-15336229 ] Neil Hanlon edited comment on AIRFLOW-246 at 6/17/16 2:43 PM: -- [~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k rows). Obviously with more task instances the actual size of this query will increase which will require database tuning to allow it to run (max_allowed_packet) and all that... so maybe it should also be looked at whether or not the whole thing should be rearchitected... both of the queries (original, and the one you wrote) are pretty ugly. I'm not saying all queries have to be pretty, but... Let me know if you get a code fix for this and I'd be happy to test it. (new query profile for posterity) {code:sql} airflow-rwdb01:airflow> show profile for query 1; ++--+ | Status | Duration | ++--+ | starting | 0.33 | | Waiting for query cache lock | 0.05 | | Waiting on query cache mutex | 0.04 | | checking query cache for query | 0.002673 | | checking permissions | 0.06 | | checking permissions | 0.04 | | checking permissions | 0.03 | | checking permissions | 0.06 | | Opening tables | 0.25 | | System lock| 0.001740 | | optimizing | 0.05 | | statistics | 0.17 | | preparing | 0.08 | | executing | 0.07 | | Sorting result | 0.04 | | Sending data | 0.000855 | | optimizing | 0.09 | | statistics | 0.09 | | preparing | 0.10 | | executing | 0.04 | | Sending data | 0.000622 | | optimizing | 0.73 | | statistics | 1.371602 | | preparing | 0.000239 | | executing | 0.08 | | Sending data | 0.407134 | | optimizing | 0.000145 | | statistics | 1.176737 | | preparing | 0.000174 | | executing | 0.06 | | Sending data | 0.395361 | | optimizing | 0.11 | | statistics | 0.14 | | preparing | 0.12 | | executing | 0.05 | | Sending data | 0.000654 | | removing tmp table | 0.21 | | Sending data | 0.13 | | Waiting for query cache lock | 0.05 | | Waiting on query cache mutex | 0.05 | | Sending data | 0.001048 | | init | 0.23 | | optimizing | 0.06 | | statistics | 0.07 | | preparing | 0.08 | | Creating tmp table | 0.000386 | | executing | 0.06 | | Copying to tmp table | 0.005354 | | Sorting result | 0.91 | | Sending data | 0.90 | | end| 0.04 | | removing tmp table | 0.000169 | | end| 0.06 | | query end | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.05 | | closing tables | 0.13 | | freeing items | 0.000144 | | Waiting for query cache lock | 0.04 | | Waiting on query cache mutex | 0.03 | | freeing items | 0.57 | | Waiting for query cache lock | 0.04 | | Waiting on query cache mutex | 0.05 | | freeing items | 0.04 | | storing result in query cache | 0.07 | | logging slow query | 0.04 | | logging slow query | 0.000221 | | cleaning up| 0.13 | ++--+ 72 rows in set (0.00 sec) {code} was (Author: nhanlon): [~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k rows). Obviously with more task instances the actual size of this query will increase which will require database tuning to allow it to run (max_allowed_packet) and all that... so maybe it should also be looked at whether or not the whole thing should be rearchitected... both of the queries (original, and the one you wrote) are pretty ugly. I'm not saying all queries have to be pretty, but... Let me k
[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336229#comment-15336229 ] Neil Hanlon commented on AIRFLOW-246: - [~sekikn] Looks good to me. Query only takes ~4 seconds to run on our db (~750k rows). Obviously with more task instances the actual size of this query will increase which will require database tuning to allow it to run (max_allowed_packet) and all that... so maybe it should also be looked at whether or not the whole thing should be rearchitected... both of the queries (original, and the one you wrote) are pretty ugly. I'm not saying all queries have to be pretty, but... Let me know if you get a code fix for this and I'd be happy to test it. (new query profile for posterity) {pre} airflow-rwdb01:airflow> show profile for query 1; ++--+ | Status | Duration | ++--+ | starting | 0.33 | | Waiting for query cache lock | 0.05 | | Waiting on query cache mutex | 0.04 | | checking query cache for query | 0.002673 | | checking permissions | 0.06 | | checking permissions | 0.04 | | checking permissions | 0.03 | | checking permissions | 0.06 | | Opening tables | 0.25 | | System lock| 0.001740 | | optimizing | 0.05 | | statistics | 0.17 | | preparing | 0.08 | | executing | 0.07 | | Sorting result | 0.04 | | Sending data | 0.000855 | | optimizing | 0.09 | | statistics | 0.09 | | preparing | 0.10 | | executing | 0.04 | | Sending data | 0.000622 | | optimizing | 0.73 | | statistics | 1.371602 | | preparing | 0.000239 | | executing | 0.08 | | Sending data | 0.407134 | | optimizing | 0.000145 | | statistics | 1.176737 | | preparing | 0.000174 | | executing | 0.06 | | Sending data | 0.395361 | | optimizing | 0.11 | | statistics | 0.14 | | preparing | 0.12 | | executing | 0.05 | | Sending data | 0.000654 | | removing tmp table | 0.21 | | Sending data | 0.13 | | Waiting for query cache lock | 0.05 | | Waiting on query cache mutex | 0.05 | | Sending data | 0.001048 | | init | 0.23 | | optimizing | 0.06 | | statistics | 0.07 | | preparing | 0.08 | | Creating tmp table | 0.000386 | | executing | 0.06 | | Copying to tmp table | 0.005354 | | Sorting result | 0.91 | | Sending data | 0.90 | | end| 0.04 | | removing tmp table | 0.000169 | | end| 0.06 | | query end | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.07 | | closing tables | 0.04 | | removing tmp table | 0.05 | | closing tables | 0.13 | | freeing items | 0.000144 | | Waiting for query cache lock | 0.04 | | Waiting on query cache mutex | 0.03 | | freeing items | 0.57 | | Waiting for query cache lock | 0.04 | | Waiting on query cache mutex | 0.05 | | freeing items | 0.04 | | storing result in query cache | 0.07 | | logging slow query | 0.04 | | logging slow query | 0.000221 | | cleaning up| 0.13 | ++--+ 72 rows in set (0.00 sec) {pre} > dag_stats endpoint has a terrible query > --- > > Key: AIRFLOW-246 > URL: https://issues.apache.org/jira/browse/AIRFLOW-246 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Affects Versions: Airflow 1.7.1 > Environment: MySQL Backend through sqlalchemy >Reporter: Neil Hanlon > > Hitting this endpoint creates a series of queries on the database which take > over 20 seconds to run, causing the page to not load for that enti
[jira] [Commented] (AIRFLOW-255) schedule_dag shouldn't return early if dagrun_timeout is given
[ https://issues.apache.org/jira/browse/AIRFLOW-255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15336223#comment-15336223 ] Kevin Lin commented on AIRFLOW-255: --- Take a stab at creating a PR for fix: https://github.com/apache/incubator-airflow/pull/1604 > schedule_dag shouldn't return early if dagrun_timeout is given > -- > > Key: AIRFLOW-255 > URL: https://issues.apache.org/jira/browse/AIRFLOW-255 > Project: Apache Airflow > Issue Type: Bug >Reporter: Kevin Lin > > In > https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L396, > schedule_dag returns as long as `len(active_runs) >= dag.max_active_runs`. It > should also take into account dag.dagrun_timeout -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-255) schedule_dag shouldn't return early if dagrun_timeout is given
Kevin Lin created AIRFLOW-255: - Summary: schedule_dag shouldn't return early if dagrun_timeout is given Key: AIRFLOW-255 URL: https://issues.apache.org/jira/browse/AIRFLOW-255 Project: Apache Airflow Issue Type: Bug Reporter: Kevin Lin In https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L396, schedule_dag returns as long as `len(active_runs) >= dag.max_active_runs`. It should also take into account dag.dagrun_timeout -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-254) Webserver should refresh all workers in case of a dag refresh / update
[ https://issues.apache.org/jira/browse/AIRFLOW-254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-254: --- External issue URL: https://github.com/apache/incubator-airflow/pull/1603 > Webserver should refresh all workers in case of a dag refresh / update > -- > > Key: AIRFLOW-254 > URL: https://issues.apache.org/jira/browse/AIRFLOW-254 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Bolke de Bruin > > The webserver only refreshes one process in case a dag refresh is demanded or > an update is made to a dag. This is annoying as you might end up with old > code in the views or the dreaded "scheduler has put this in the db, but the > webserver hasnt got it yet". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-254) Webserver should refresh all workers in case of a dag refresh / update
Bolke de Bruin created AIRFLOW-254: -- Summary: Webserver should refresh all workers in case of a dag refresh / update Key: AIRFLOW-254 URL: https://issues.apache.org/jira/browse/AIRFLOW-254 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Bolke de Bruin The webserver only refreshes one process in case a dag refresh is demanded or an update is made to a dag. This is annoying as you might end up with old code in the views or the dreaded "scheduler has put this in the db, but the webserver hasnt got it yet". -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-32) Remove deprecated features prior to releasing Airflow 2.0
[ https://issues.apache.org/jira/browse/AIRFLOW-32?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeremiah Lowin updated AIRFLOW-32: -- Description: A number of features have been marked for deprecation in Airflow 2.0. They need to be deleted prior to release. Usually the error message or comments will mention Airflow 2.0 with either a #TODO or #FIXME. Tracking list (not necessarily complete!): JIRA: AIRFLOW-31 GitHub: https://github.com/airbnb/airflow/pull/1137/files#diff-1c2404a3a60f829127232842250ff406R233 https://github.com/airbnb/airflow/pull/1219 https://github.com/airbnb/airflow/pull/1285 was: A number of features have been marked for deprecation in Airflow 2.0. They need to be deleted prior to release. Usually the error message or comments will mention Airflow 2.0 with either a #TODO or #FIXME. Tracking list (not necessarily complete!): https://github.com/airbnb/airflow/pull/1137/files#diff-1c2404a3a60f829127232842250ff406R233 https://github.com/airbnb/airflow/pull/1219 https://github.com/airbnb/airflow/pull/1272 https://github.com/airbnb/airflow/pull/1285 > Remove deprecated features prior to releasing Airflow 2.0 > - > > Key: AIRFLOW-32 > URL: https://issues.apache.org/jira/browse/AIRFLOW-32 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Jeremiah Lowin > Labels: deprecated > Fix For: Airflow 2.0 > > > A number of features have been marked for deprecation in Airflow 2.0. They > need to be deleted prior to release. > Usually the error message or comments will mention Airflow 2.0 with either a > #TODO or #FIXME. > Tracking list (not necessarily complete!): > JIRA: > AIRFLOW-31 > GitHub: > https://github.com/airbnb/airflow/pull/1137/files#diff-1c2404a3a60f829127232842250ff406R233 > https://github.com/airbnb/airflow/pull/1219 > https://github.com/airbnb/airflow/pull/1285 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (AIRFLOW-31) Use standard imports for hooks/operators
[ https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeremiah Lowin closed AIRFLOW-31. - Resolution: Fixed Fix Version/s: (was: Airflow 2.0) Airflow 1.8 Merged in https://github.com/apache/incubator-airflow/pull/1272 > Use standard imports for hooks/operators > > > Key: AIRFLOW-31 > URL: https://issues.apache.org/jira/browse/AIRFLOW-31 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Reporter: Jeremiah Lowin >Assignee: Jeremiah Lowin > Labels: enhancement > Fix For: Airflow 1.8 > > > (Migrated from https://github.com/airbnb/airflow/issues/1238) > Currently, Airflow uses a relatively complex import mechanism to import hooks > and operators without polluting the namespace with submodules. I would like > to propose that Airflow abandon that system and use standard Python importing. > Here are a few major reasons why I think the current system has run its > course. > h3. Polluting namespace > The biggest advantage of the current system, as I understand it, is that only > Operators appear in the `airflow.operators` namespace. The submodules that > actually contain the operators do not. > So for example while `airflow.operators.python_operator.PythonOperator` is a > thing, `PythonOperator` is in the `airflow.operators` namespace but > `python_operator` is not. > I think this sort of namespace pollution was helpful when Airflow was a > smaller project, but as the number of hooks/operators grows -- and especially > as the `contrib` hooks/operators grow -- I'd argue that namespacing is a > *good thing*. It provides structure and organization, and opportunities for > documentation (through module docstrings). > In fact, I'd argue that the current namespace is itself getting quite > polluted -- the only way to know what's available is to use something like > Ipython tab-completion to browse an alphabetical list of Operator names, or > to load the source file and grok the import definition (which no one > installing from pypi is likely to do). > h3. Conditional imports > There's a second advantage to the current system that any module that fails > to import is silently ignored. It makes it easy to have optional > dependencies. For example, if someone doesn't have `boto` installed, then > they don't have an `S3Hook` either. Same for a HiveOperator > Again, as Airflow grows and matures, I think this is a little too magic. If > my environment is missing a dependency, I want to hear about it. > On the other hand, the `contrib` namespace sort of depends on this -- we > don't want users to have to install every single dependency. So I propose > that contrib modules all live in their submodules: `from > airflow.contrib.operators.my_operator import MyOperator`. As mentioned > previously, having structure and namespacing is a good thing as the project > gets more complex. > Other ways to handle this include putting "non-standard" dependencies inside > the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so > it can be imported but not used. Another is judicious use of `try`/`except > ImportError`. The simplest is to make people import things explicitly from > submodules. > h3. Operator dependencies > Right now, operators can't depend on each other if they aren't in the same > file. This is for the simple reason that there is no guarantee on what order > the operators will be loaded. It all comes down to which dictionary key gets > loaded first. One day Operator B could be loaded after Operator A; the next > day it might be loaded before. Consequently, A and B can't depend on each > other. Worse, if a user makes two operators that do depend on each other, > they won't get an error message when one fails to import. > For contrib modules in particular, this is sort of killer. > h3. Ease of use > It's *hard* to set up imports for a new operator. The dictionary-based import > instructions aren't obvious for new users, and errors are silently dismissed > which makes debugging difficult. > h3. Identity > Surprisingly, `airflow.operators.SubDagOperator != > airflow.operators.subdag_operator.SubDagOperator`. See #1168. > h2. Proposal > Use standard python importing for hooks/operators/etc. > - `__init__.py` files use straightforward, standard Python imports > - major operators are available at `airflow.operators.OperatorName` or > `airflow.operators.operator_module.OperatorName`. > - contrib operators are only available at > `airflow.contrib.operators.operator_module.OperatorName` in order to manage > dependencies > - operator authors are encouraged to use `__all__` to define their module's > exports > Possibly delete namespace afterward > - in `operators/_
[jira] [Commented] (AIRFLOW-31) Use standard imports for hooks/operators
[ https://issues.apache.org/jira/browse/AIRFLOW-31?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335919#comment-15335919 ] ASF subversion and git services commented on AIRFLOW-31: Commit 851adc5547597ec51743be4bc47d634c77d6dc17 in incubator-airflow's branch refs/heads/master from jlowin [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=851adc5 ] [AIRFLOW-31] Use standard imports for hooks/operators > Use standard imports for hooks/operators > > > Key: AIRFLOW-31 > URL: https://issues.apache.org/jira/browse/AIRFLOW-31 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Reporter: Jeremiah Lowin >Assignee: Jeremiah Lowin > Labels: enhancement > Fix For: Airflow 2.0 > > > (Migrated from https://github.com/airbnb/airflow/issues/1238) > Currently, Airflow uses a relatively complex import mechanism to import hooks > and operators without polluting the namespace with submodules. I would like > to propose that Airflow abandon that system and use standard Python importing. > Here are a few major reasons why I think the current system has run its > course. > h3. Polluting namespace > The biggest advantage of the current system, as I understand it, is that only > Operators appear in the `airflow.operators` namespace. The submodules that > actually contain the operators do not. > So for example while `airflow.operators.python_operator.PythonOperator` is a > thing, `PythonOperator` is in the `airflow.operators` namespace but > `python_operator` is not. > I think this sort of namespace pollution was helpful when Airflow was a > smaller project, but as the number of hooks/operators grows -- and especially > as the `contrib` hooks/operators grow -- I'd argue that namespacing is a > *good thing*. It provides structure and organization, and opportunities for > documentation (through module docstrings). > In fact, I'd argue that the current namespace is itself getting quite > polluted -- the only way to know what's available is to use something like > Ipython tab-completion to browse an alphabetical list of Operator names, or > to load the source file and grok the import definition (which no one > installing from pypi is likely to do). > h3. Conditional imports > There's a second advantage to the current system that any module that fails > to import is silently ignored. It makes it easy to have optional > dependencies. For example, if someone doesn't have `boto` installed, then > they don't have an `S3Hook` either. Same for a HiveOperator > Again, as Airflow grows and matures, I think this is a little too magic. If > my environment is missing a dependency, I want to hear about it. > On the other hand, the `contrib` namespace sort of depends on this -- we > don't want users to have to install every single dependency. So I propose > that contrib modules all live in their submodules: `from > airflow.contrib.operators.my_operator import MyOperator`. As mentioned > previously, having structure and namespacing is a good thing as the project > gets more complex. > Other ways to handle this include putting "non-standard" dependencies inside > the operator/hook rather than the module (see `HiveOperator`/`HiveHook`), so > it can be imported but not used. Another is judicious use of `try`/`except > ImportError`. The simplest is to make people import things explicitly from > submodules. > h3. Operator dependencies > Right now, operators can't depend on each other if they aren't in the same > file. This is for the simple reason that there is no guarantee on what order > the operators will be loaded. It all comes down to which dictionary key gets > loaded first. One day Operator B could be loaded after Operator A; the next > day it might be loaded before. Consequently, A and B can't depend on each > other. Worse, if a user makes two operators that do depend on each other, > they won't get an error message when one fails to import. > For contrib modules in particular, this is sort of killer. > h3. Ease of use > It's *hard* to set up imports for a new operator. The dictionary-based import > instructions aren't obvious for new users, and errors are silently dismissed > which makes debugging difficult. > h3. Identity > Surprisingly, `airflow.operators.SubDagOperator != > airflow.operators.subdag_operator.SubDagOperator`. See #1168. > h2. Proposal > Use standard python importing for hooks/operators/etc. > - `__init__.py` files use straightforward, standard Python imports > - major operators are available at `airflow.operators.OperatorName` or > `airflow.operators.operator_module.OperatorName`. > - contrib operators are only available at > `airflow.contrib.operators.operator_module.OperatorName` in orde
[1/4] incubator-airflow git commit: [AIRFLOW-31] Use standard imports for hooks/operators
Repository: incubator-airflow Updated Branches: refs/heads/master ce362c312 -> adcccfa26 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/__init__.py -- diff --git a/tests/__init__.py b/tests/__init__.py index ca8150b..4a79d0f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,9 +1,9 @@ from __future__ import absolute_import from .configuration import * +from .contrib import * from .core import * from .jobs import * from .models import * from .operators import * -from .contrib import * from .utils import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/core.py -- diff --git a/tests/core.py b/tests/core.py index af791e3..5e6a4fd 100644 --- a/tests/core.py +++ b/tests/core.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import print_function import doctest @@ -289,7 +290,7 @@ class CoreTest(unittest.TestCase): assert hash(self.dag) != hash(dag_subclass) def test_time_sensor(self): -t = operators.TimeSensor( +t = operators.sensors.TimeSensor( task_id='time_sensor_check', target_time=time(0), dag=self.dag) @@ -380,21 +381,22 @@ class CoreTest(unittest.TestCase): t.dry_run() def test_sqlite(self): -t = operators.SqliteOperator( +import airflow.operators.sqlite_operator +t = airflow.operators.sqlite_operator.SqliteOperator( task_id='time_sqlite', sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))", dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_timedelta_sensor(self): -t = operators.TimeDeltaSensor( +t = operators.sensors.TimeDeltaSensor( task_id='timedelta_sensor_check', delta=timedelta(seconds=2), dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor(self): -t = operators.ExternalTaskSensor( +t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -402,7 +404,7 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor_delta(self): -t = operators.ExternalTaskSensor( +t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check_delta', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -1077,97 +1079,6 @@ class WebLdapAuthTest(unittest.TestCase): session.close() configuration.conf.set("webserver", "authenticate", "False") - -if 'MySqlOperator' in dir(operators): -# Only testing if the operator is installed -class MySqlTest(unittest.TestCase): -def setUp(self): -configuration.test_mode() -args = { -'owner': 'airflow', -'mysql_conn_id': 'airflow_db', -'start_date': DEFAULT_DATE -} -dag = DAG(TEST_DAG_ID, default_args=args) -self.dag = dag - -def mysql_operator_test(self): -sql = """ -CREATE TABLE IF NOT EXISTS test_airflow ( -dummy VARCHAR(50) -); -""" -t = operators.MySqlOperator( -task_id='basic_mysql', -sql=sql, -mysql_conn_id='airflow_db', -dag=self.dag) -t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -def mysql_operator_test_multi(self): -sql = [ -"TRUNCATE TABLE test_airflow", -"INSERT INTO test_airflow VALUES ('X')", -] -t = operators.MySqlOperator( -task_id='mysql_operator_test_multi', -mysql_conn_id='airflow_db', -sql=sql, dag=self.dag) -t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -def test_mysql_to_mysql(self): -sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" -t = operators.GenericTransfer( -task_id='test_m2m', -preoperator=[ -"DROP TABLE IF EXISTS test_mysql_to_mysql", -"CREATE TABLE IF NOT EXISTS " -"test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES" -], -source_conn_id='airflow_db', -de
[3/4] incubator-airflow git commit: Add Python 3 compatibility fix
Add Python 3 compatibility fix In Python 3, errors donât have a `message` attribute Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f26b7a25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f26b7a25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f26b7a25 Branch: refs/heads/master Commit: f26b7a25d9c4632af68d9d64ac5f4a929a44f426 Parents: 851adc5 Author: jlowin Authored: Thu Jun 16 16:53:27 2016 -0400 Committer: jlowin Committed: Thu Jun 16 16:53:27 2016 -0400 -- airflow/operators/sensors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f26b7a25/airflow/operators/sensors.py -- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index 6d87b44..5276f6e 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -532,7 +532,7 @@ class HttpSensor(BaseSensorOperator): # run content check on response return self.response_check(response) except AirflowException as ae: -if ae.message.startswith("404"): +if str(ae).startswith("404"): return False raise ae
[4/4] incubator-airflow git commit: Merge pull request #1272 from jlowin/standard-imports
Merge pull request #1272 from jlowin/standard-imports Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/adcccfa2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/adcccfa2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/adcccfa2 Branch: refs/heads/master Commit: adcccfa26ff666f4a787670441caf4c49f8ccef5 Parents: ce362c3 f26b7a2 Author: jlowin Authored: Fri Jun 17 07:57:06 2016 -0400 Committer: jlowin Committed: Fri Jun 17 07:57:06 2016 -0400 -- airflow/__init__.py | 6 +- .../contrib/example_dags/example_twitter_dag.py | 4 +- airflow/contrib/hooks/__init__.py | 33 +- airflow/contrib/operators/__init__.py | 42 ++- airflow/contrib/operators/fs_operator.py| 3 +- airflow/contrib/operators/mysql_to_gcs.py | 2 +- airflow/contrib/operators/vertica_to_hive.py| 2 +- .../contrib/plugins/metastore_browser/main.py | 4 +- airflow/example_dags/example_http_operator.py | 3 +- airflow/hooks/__init__.py | 73 +++- airflow/hooks/base_hook.py | 14 + airflow/hooks/dbapi_hook.py | 14 + airflow/hooks/druid_hook.py | 14 + airflow/hooks/hdfs_hook.py | 14 + airflow/hooks/http_hook.py | 2 +- airflow/hooks/jdbc_hook.py | 14 + airflow/hooks/mssql_hook.py | 14 + airflow/hooks/mysql_hook.py | 14 + airflow/hooks/oracle_hook.py| 14 + airflow/hooks/pig_hook.py | 14 + airflow/hooks/postgres_hook.py | 14 + airflow/hooks/presto_hook.py| 14 + airflow/hooks/samba_hook.py | 14 + airflow/hooks/sqlite_hook.py| 14 + airflow/hooks/webhdfs_hook.py | 14 + airflow/macros/__init__.py | 38 ++- airflow/macros/hive.py | 4 +- airflow/models.py | 38 ++- airflow/operators/__init__.py | 120 +-- airflow/operators/bash_operator.py | 14 + airflow/operators/check_operator.py | 14 + airflow/operators/dagrun_operator.py| 14 + airflow/operators/docker_operator.py| 14 + airflow/operators/dummy_operator.py | 14 + airflow/operators/email_operator.py | 14 + airflow/operators/generic_transfer.py | 14 + airflow/operators/hive_operator.py | 16 +- airflow/operators/hive_stats_operator.py| 18 +- airflow/operators/hive_to_druid.py | 17 +- airflow/operators/hive_to_mysql.py | 17 +- airflow/operators/hive_to_samba_operator.py | 17 +- airflow/operators/http_operator.py | 14 + airflow/operators/jdbc_operator.py | 14 + airflow/operators/mssql_operator.py | 16 +- airflow/operators/mssql_to_hive.py | 17 +- airflow/operators/mysql_operator.py | 16 +- airflow/operators/mysql_to_hive.py | 17 +- airflow/operators/oracle_operator.py| 2 +- airflow/operators/pig_operator.py | 16 +- airflow/operators/postgres_operator.py | 16 +- airflow/operators/presto_check_operator.py | 16 +- airflow/operators/presto_to_mysql.py| 17 +- airflow/operators/python_operator.py| 14 + airflow/operators/s3_file_transform_operator.py | 16 +- airflow/operators/s3_to_hive_operator.py| 17 +- airflow/operators/sensors.py| 31 +- airflow/operators/slack_operator.py | 14 + airflow/operators/sqlite_operator.py| 14 + airflow/plugins_manager.py | 30 +- airflow/utils/email.py | 18 +- airflow/utils/logging.py| 2 +- airflow/utils/tests.py | 23 ++ dags/testdruid.py | 2 +- run_unit_tests.sh | 3 + setup.py| 14 + tests/__init__.py | 2 +- tests/core.py | 340 +-- tests/operators/__init__.py | 17 + tests/operators/hive_operator.py| 209 tests/operators/operators.py| 174 ++ tests/operators/sensor.py | 39 --- tests/operators/sensors.py | 39 +++ 72 files changed, 1458 insertions(+), 474 deletions(-) -- http://git-wip-us.apach
[2/4] incubator-airflow git commit: [AIRFLOW-31] Use standard imports for hooks/operators
[AIRFLOW-31] Use standard imports for hooks/operators Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/851adc55 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/851adc55 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/851adc55 Branch: refs/heads/master Commit: 851adc5547597ec51743be4bc47d634c77d6dc17 Parents: 06e70e2 Author: jlowin Authored: Wed Jun 15 17:39:12 2016 -0400 Committer: jlowin Committed: Thu Jun 16 14:55:07 2016 -0400 -- airflow/__init__.py | 6 +- .../contrib/example_dags/example_twitter_dag.py | 4 +- airflow/contrib/hooks/__init__.py | 33 +- airflow/contrib/operators/__init__.py | 42 ++- airflow/contrib/operators/fs_operator.py| 3 +- airflow/contrib/operators/mysql_to_gcs.py | 2 +- airflow/contrib/operators/vertica_to_hive.py| 2 +- .../contrib/plugins/metastore_browser/main.py | 4 +- airflow/example_dags/example_http_operator.py | 3 +- airflow/hooks/__init__.py | 73 +++- airflow/hooks/base_hook.py | 14 + airflow/hooks/dbapi_hook.py | 14 + airflow/hooks/druid_hook.py | 14 + airflow/hooks/hdfs_hook.py | 14 + airflow/hooks/http_hook.py | 2 +- airflow/hooks/jdbc_hook.py | 14 + airflow/hooks/mssql_hook.py | 14 + airflow/hooks/mysql_hook.py | 14 + airflow/hooks/oracle_hook.py| 14 + airflow/hooks/pig_hook.py | 14 + airflow/hooks/postgres_hook.py | 14 + airflow/hooks/presto_hook.py| 14 + airflow/hooks/samba_hook.py | 14 + airflow/hooks/sqlite_hook.py| 14 + airflow/hooks/webhdfs_hook.py | 14 + airflow/macros/__init__.py | 38 ++- airflow/macros/hive.py | 4 +- airflow/models.py | 38 ++- airflow/operators/__init__.py | 120 +-- airflow/operators/bash_operator.py | 14 + airflow/operators/check_operator.py | 14 + airflow/operators/dagrun_operator.py| 14 + airflow/operators/docker_operator.py| 14 + airflow/operators/dummy_operator.py | 14 + airflow/operators/email_operator.py | 14 + airflow/operators/generic_transfer.py | 14 + airflow/operators/hive_operator.py | 16 +- airflow/operators/hive_stats_operator.py| 18 +- airflow/operators/hive_to_druid.py | 17 +- airflow/operators/hive_to_mysql.py | 17 +- airflow/operators/hive_to_samba_operator.py | 17 +- airflow/operators/http_operator.py | 14 + airflow/operators/jdbc_operator.py | 14 + airflow/operators/mssql_operator.py | 16 +- airflow/operators/mssql_to_hive.py | 17 +- airflow/operators/mysql_operator.py | 16 +- airflow/operators/mysql_to_hive.py | 17 +- airflow/operators/oracle_operator.py| 2 +- airflow/operators/pig_operator.py | 16 +- airflow/operators/postgres_operator.py | 16 +- airflow/operators/presto_check_operator.py | 16 +- airflow/operators/presto_to_mysql.py| 17 +- airflow/operators/python_operator.py| 14 + airflow/operators/s3_file_transform_operator.py | 16 +- airflow/operators/s3_to_hive_operator.py| 17 +- airflow/operators/sensors.py| 29 +- airflow/operators/slack_operator.py | 14 + airflow/operators/sqlite_operator.py| 14 + airflow/plugins_manager.py | 30 +- airflow/utils/email.py | 18 +- airflow/utils/logging.py| 2 +- airflow/utils/tests.py | 23 ++ dags/testdruid.py | 2 +- run_unit_tests.sh | 3 + setup.py| 14 + tests/__init__.py | 2 +- tests/core.py | 340 +-- tests/operators/__init__.py | 17 + tests/operators/hive_operator.py| 209 tests/operators/operators.py| 174 ++ tests/operators/sensor.py | 39 --- tests/operators/sensors.py | 39 +++ 72 files changed, 1457 insertions(+), 473 deletions(-) -- http://git-wip-us.apache.org/re
[jira] [Commented] (AIRFLOW-246) dag_stats endpoint has a terrible query
[ https://issues.apache.org/jira/browse/AIRFLOW-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335724#comment-15335724 ] Kengo Seki commented on AIRFLOW-246: Multiple left outer joins seem to affect performance. I think we can rewrite the query in question by replacing left outer join with inner join and union, such as: {code:sql} SELECT dag_id AS task_instance_dag_id, state AS task_instance_state, count(*) as count_1 FROM ( SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running' ) AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ... UNION ALL SELECT task_instance.dag_id, task_instance.state FROM task_instance JOIN ( SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id ) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_id IN ... ) t GROUP BY dag_id, state; {code} I compared these queries with some dummy data, and got x3-4 improvement. {code} mysql> select count(*) from dag_run; +--+ | count(*) | +--+ | 3417 | +--+ 1 row in set (0.00 sec) mysql> select count(*) from task_instance; +--+ | count(*) | +--+ | 229089 | +--+ 1 row in set (0.00 sec) mysql> SELECT task_instance.dag_id AS task_instance_dag_id, task_instance.state AS task_instance_state, count(task_instance.task_id) AS count_1 FROM task_instance LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, dag_run.execution_date AS execution_date FROM dag_run WHERE dag_run.state = 'running') AS running_dag_run ON running_dag_run.dag_id = task_instance.dag_id AND running_dag_run.execution_date = task_instance.execution_date LEFT OUTER JOIN (SELECT dag_run.dag_id AS dag_id, max(dag_run.execution_date) AS execution_date FROM dag_run GROUP BY dag_run.dag_id) AS last_dag_run ON last_dag_run.dag_id = task_instance.dag_id AND last_dag_run.execution_date = task_instance.execution_date WHERE task_instance.task_id IN ('all_success', 'also_run_this', 'always_true_1', 'always_true_2', 'bash_task', 'branching', 'branch_a', 'branch_b', 'branch_c', 'branch_d', 'condition', 'condition_is_False', 'condition_is_True', 'del_op', 'end', 'false_1', 'false_2', 'final_1', 'final_2', 'follow_branch_a', 'follow_branch_b', 'follow_branch_c', 'follow_branch_d', 'get_op', 'http_sensor_check', 'join', 'one_success', 'oper_1', 'oper_2', 'post_op', 'post_op_formenc', 'print_date', 'puller', 'push', 'push_by_returning', 'put_op', 'runme_0', 'runme_1', 'runme_2', 'run_after_loop', 'run_this', 'run_this_first', 'run_this_last', 'section-1', 'section-1-task-1', 'section-1-task-2', 'section-1-task-3', 'section-1-task-4', 'section-1-task-5', 'section-2', 'section-2-task-1', 'section-2-task-2', 'section-2-task-3', 'section-2-task-4', 'section-2-task-5', 'skip_operator_1', 'skip_operator_2', 'sleep', 'some-other-task', 'start', 'templated', 'test_trigger_dagrun', 'true_1', 'true_2') AND (running_dag_run.dag_id IS NOT NULL OR last_dag_run.dag_id IS NOT NULL) GROUP BY task_instance.dag_id, task_instance.state; +-+-+-+ | task_instance_dag_id| task_instance_state | count_1 | +-+-+-+ | example_bash_operator | success | 6 | | example_branch_dop_operator_v3 | NULL| 3 | | example_branch_operator | skipped | 6 | | example_branch_operator | success | 5 | | example_http_operator | failed | 1 | | example_http_operator | upstream_failed | 5 | | example_passing_params_via_test_command | success | 2 | | example_short_circuit_operator | skipped | 2 | | example_short_circuit_operator | success | 4 | | example_skip_dag| skipped | 4 | | example_skip_dag| success | 4 | | example_subdag_operator | success | 5 | | example_trigger_controller_dag | success | 1 | | example_trigger_target_dag | success | 2 | | example_xcom| success | 3 | | tutorial
[jira] [Commented] (AIRFLOW-249) Refactor the SLA mecanism
[ https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335653#comment-15335653 ] dud commented on AIRFLOW-249: - Here you are : https://github.com/apache/incubator-airflow/pull/1601 > Refactor the SLA mecanism > - > > Key: AIRFLOW-249 > URL: https://issues.apache.org/jira/browse/AIRFLOW-249 > Project: Apache Airflow > Issue Type: Improvement >Reporter: dud > > Hello > I've noticed the SLA feature is currently behaving as follow : > - it doesn't work on DAG scheduled @once or None because they have no > dag.followwing_schedule property > - it keeps endlessly checking for SLA misses without ever worrying about any > end_date. Worse I noticed that emails are still being sent for runs that are > never happening because of end_date > - it keeps checking for recent TIs even if SLA notification has been already > been sent for them > - the SLA logic is only being fired after following_schedule + sla has > elapsed, in other words one has to wait for the next TI before having a > chance of getting any email. Also the email reports dag.following_schedule > time (I guess because it is close of TI.start_date), but unfortunately that > doesn't match what the task instances shows nor the log filename > - the SLA logic is based on max(TI.execution_date) for the starting point of > its checks, that means that for a DAG whose SLA is longer than its schedule > period if half of the TIs are running longer than expected it will go > unnoticed. This could be demonstrated with a DAG like this one : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 6, 16, 12, 20), > 'email': my_email > 'sla': timedelta(minutes=2), > } > dag = DAG('unnoticed_sla', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='sla_miss', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > I've tried to rework the SLA triggering mechanism by addressing the above > points., please [have a look on > it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] > I made some tests with this patch : > - the fluctuent DAG shown above no longer make Airflow skip any SLA event : > {code} > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 > 15:08:26.058631 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 > 15:10:06.093253 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 > 15:12:06.241773 | | t > {code} > - on a normal DAG, the SLA is being triggred more quickly : > {code} > // start_date = 2016-06-16 15:55:00 > // end_date = 2016-06-16 16:00:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16 > 15:58:11.832299 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16 > 15:59:09.663778 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16 > 16:00:13.651422 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16 > 16:01:08.576399 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16 > 16:02:08.523486 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16 > 16:03:08.538593 | | t > (6 rows) > {code} > than before (current master branch) : > {code} > // start_date = 2016-06-16 15:40:00 > // end_date = 2016-06-16 15:45:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+---