[jira] [Created] (AIRFLOW-2172) Cast template_fields into a list if the operator's template_fields has only one element
Tao Feng created AIRFLOW-2172: - Summary: Cast template_fields into a list if the operator's template_fields has only one element Key: AIRFLOW-2172 URL: https://issues.apache.org/jira/browse/AIRFLOW-2172 Project: Apache Airflow Issue Type: Bug Reporter: Tao Feng Assignee: Tao Feng Normally user defined operator is derived from BaseOperator. And a jinjafied tuple is created for template_fields. If only one field is provided in the operator's template_fields as following for example: template_fields = ('hql') the resolve_template_files method in BaseOperator actually will throw an exception, as the template_fields is now considered as a string instead of a list/tuple. In the example, 'h', 'q', 'l' will all consider template field attribute. The workaround is to cast template_frields into a list when its actual type is a string. Not sure it is a real bug, but it takes me some time to figure out this issue... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2170) The Implement Features section in the CONTRIBUTING.md is incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joy Gao resolved AIRFLOW-2170. -- Resolution: Fixed Fix Version/s: 1.10.0 Issue resolved by pull request #3089 [https://github.com/apache/incubator-airflow/pull/3089] > The Implement Features section in the CONTRIBUTING.md is incomplete > --- > > Key: AIRFLOW-2170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2170 > Project: Apache Airflow > Issue Type: Improvement >Affects Versions: 1.9.0 >Reporter: Felix Uellendall >Assignee: Felix Uellendall >Priority: Trivial > Fix For: 1.10.0 > > > Currently it says: > {noformat} > Implement Features > Look through the Apache Jira for features. Any unassigned "Improvement" issue > is open to whoever wants to implement it. > We've created the operators, hooks, macros and executors we needed, but we > made sure that this part of Airflow is extensible. New operators, hooks and > operators are very welcomed!{noformat} > but it would probably be better to change the last sentence to: > {noformat} > New operators, hooks, macros and executors are very welcomed!{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[2/2] incubator-airflow git commit: Merge pull request #3089 from feluelle/master
Merge pull request #3089 from feluelle/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e28f6e22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e28f6e22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e28f6e22 Branch: refs/heads/master Commit: e28f6e224bfb25b500a22abfece9bf602002d6a9 Parents: f36ae3a 08891b2 Author: Joy GaoAuthored: Fri Mar 2 15:36:21 2018 -0800 Committer: Joy Gao Committed: Fri Mar 2 15:36:21 2018 -0800 -- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e28f6e22/CONTRIBUTING.md --
[1/2] incubator-airflow git commit: docs: "Implement Features" section text changes
Repository: incubator-airflow Updated Branches: refs/heads/master f36ae3ac2 -> e28f6e224 docs: "Implement Features" section text changes Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/08891b2a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/08891b2a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/08891b2a Branch: refs/heads/master Commit: 08891b2a75f89f6ae293f47746d44c2daa593774 Parents: 6c93460 Author: FelixAuthored: Fri Mar 2 23:27:41 2018 +0100 Committer: GitHub Committed: Fri Mar 2 23:27:41 2018 +0100 -- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/08891b2a/CONTRIBUTING.md -- diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6ac8c43..b25e4ac 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -42,7 +42,7 @@ Look through the GitHub issues for features. Anything tagged with We've created the operators, hooks, macros and executors we needed, but we made sure that this part of Airflow is extensible. New operators, -hooks and operators are very welcomed! +hooks, macros and executors are very welcomed! ### Improve Documentation
[jira] [Created] (AIRFLOW-2171) Base GCP hook delegate_to not actually being used
Zhiwei Zhao created AIRFLOW-2171: Summary: Base GCP hook delegate_to not actually being used Key: AIRFLOW-2171 URL: https://issues.apache.org/jira/browse/AIRFLOW-2171 Project: Apache Airflow Issue Type: Bug Reporter: Zhiwei Zhao Assignee: Zhiwei Zhao the GoogleCloudBaseHook has a variable named delegate_to to create delegated credentials but it never actually been used. Adding the logic back to make it able to create delegated credentials upon request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2170) The Implement Features section in the CONTRIBUTING.md is incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Felix Uellendall reassigned AIRFLOW-2170: - Assignee: Felix Uellendall > The Implement Features section in the CONTRIBUTING.md is incomplete > --- > > Key: AIRFLOW-2170 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2170 > Project: Apache Airflow > Issue Type: Improvement >Affects Versions: 1.9.0 >Reporter: Felix Uellendall >Assignee: Felix Uellendall >Priority: Trivial > > Currently it says: > {noformat} > Implement Features > Look through the Apache Jira for features. Any unassigned "Improvement" issue > is open to whoever wants to implement it. > We've created the operators, hooks, macros and executors we needed, but we > made sure that this part of Airflow is extensible. New operators, hooks and > operators are very welcomed!{noformat} > but it would probably be better to change the last sentence to: > {noformat} > New operators, hooks, macros and executors are very welcomed!{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2170) The Implement Features section in the CONTRIBUTING.md is incomplete
Felix Uellendall created AIRFLOW-2170: - Summary: The Implement Features section in the CONTRIBUTING.md is incomplete Key: AIRFLOW-2170 URL: https://issues.apache.org/jira/browse/AIRFLOW-2170 Project: Apache Airflow Issue Type: Improvement Affects Versions: 1.9.0 Reporter: Felix Uellendall Currently it says: {noformat} Implement Features Look through the Apache Jira for features. Any unassigned "Improvement" issue is open to whoever wants to implement it. We've created the operators, hooks, macros and executors we needed, but we made sure that this part of Airflow is extensible. New operators, hooks and operators are very welcomed!{noformat} but it would probably be better to change the last sentence to: {noformat} New operators, hooks, macros and executors are very welcomed!{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2166) BigQueryBaseCursor missing sql dialect parameter
[ https://issues.apache.org/jira/browse/AIRFLOW-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini resolved AIRFLOW-2166. -- Resolution: Fixed Fix Version/s: 1.10.0 > BigQueryBaseCursor missing sql dialect parameter > > > Key: AIRFLOW-2166 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2166 > Project: Apache Airflow > Issue Type: Bug >Reporter: Winston Huang >Assignee: Winston Huang >Priority: Major > Fix For: 1.10.0 > > > [https://github.com/apache/incubator-airflow/pull/2964] introduced a > backward-incompatible change to {{BigQueryBaseCursor}} by removing the > {{use_legacy_sql}} parameter from the {{run_query}} method. This parameter > should be restored and override the default cursor dialect when specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2166) BigQueryBaseCursor missing sql dialect parameter
[ https://issues.apache.org/jira/browse/AIRFLOW-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384266#comment-16384266 ] ASF subversion and git services commented on AIRFLOW-2166: -- Commit f36ae3ac2da745eacf2c99ae4ee8aa8dc4c8594f in incubator-airflow's branch refs/heads/master from [~ji-han] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f36ae3a ] [AIRFLOW-2166] Restore BQ run_query dialect param Restores the use_legacy_sql parameter in the run_query method of BigQueryBaseCursor. This method was removed by commit d5d2c01f37f345458d9eeb8cdfbb0e77b55eb7ea, which introduced a backward-incompatible change for direct calls to the cursor methods. Closes #3087 from ji-han/AIRFLOW-2166 > BigQueryBaseCursor missing sql dialect parameter > > > Key: AIRFLOW-2166 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2166 > Project: Apache Airflow > Issue Type: Bug >Reporter: Winston Huang >Assignee: Winston Huang >Priority: Major > > [https://github.com/apache/incubator-airflow/pull/2964] introduced a > backward-incompatible change to {{BigQueryBaseCursor}} by removing the > {{use_legacy_sql}} parameter from the {{run_query}} method. This parameter > should be restored and override the default cursor dialect when specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2166) BigQueryBaseCursor missing sql dialect parameter
[ https://issues.apache.org/jira/browse/AIRFLOW-2166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384265#comment-16384265 ] ASF subversion and git services commented on AIRFLOW-2166: -- Commit f36ae3ac2da745eacf2c99ae4ee8aa8dc4c8594f in incubator-airflow's branch refs/heads/master from [~ji-han] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f36ae3a ] [AIRFLOW-2166] Restore BQ run_query dialect param Restores the use_legacy_sql parameter in the run_query method of BigQueryBaseCursor. This method was removed by commit d5d2c01f37f345458d9eeb8cdfbb0e77b55eb7ea, which introduced a backward-incompatible change for direct calls to the cursor methods. Closes #3087 from ji-han/AIRFLOW-2166 > BigQueryBaseCursor missing sql dialect parameter > > > Key: AIRFLOW-2166 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2166 > Project: Apache Airflow > Issue Type: Bug >Reporter: Winston Huang >Assignee: Winston Huang >Priority: Major > > [https://github.com/apache/incubator-airflow/pull/2964] introduced a > backward-incompatible change to {{BigQueryBaseCursor}} by removing the > {{use_legacy_sql}} parameter from the {{run_query}} method. This parameter > should be restored and override the default cursor dialect when specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2166] Restore BQ run_query dialect param
Repository: incubator-airflow Updated Branches: refs/heads/master b1deb3318 -> f36ae3ac2 [AIRFLOW-2166] Restore BQ run_query dialect param Restores the use_legacy_sql parameter in the run_query method of BigQueryBaseCursor. This method was removed by commit d5d2c01f37f345458d9eeb8cdfbb0e77b55eb7ea, which introduced a backward-incompatible change for direct calls to the cursor methods. Closes #3087 from ji-han/AIRFLOW-2166 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f36ae3ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f36ae3ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f36ae3ac Branch: refs/heads/master Commit: f36ae3ac2da745eacf2c99ae4ee8aa8dc4c8594f Parents: b1deb33 Author: Winston HuangAuthored: Fri Mar 2 14:40:34 2018 -0800 Committer: Chris Riccomini Committed: Fri Mar 2 14:40:41 2018 -0800 -- airflow/contrib/hooks/bigquery_hook.py| 9 - tests/contrib/hooks/test_bigquery_hook.py | 15 +++ 2 files changed, 23 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f36ae3ac/airflow/contrib/hooks/bigquery_hook.py -- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index d937f1e..c6499d3 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -455,6 +455,7 @@ class BigQueryBaseCursor(LoggingMixin): allow_large_results=False, flatten_results=False, udf_config=False, + use_legacy_sql=None, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', @@ -485,6 +486,9 @@ class BigQueryBaseCursor(LoggingMixin): :type flatten_results: boolean :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. +:param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). +If `None`, defaults to `self.use_legacy_sql`. +:type use_legacy_sql: boolean :type udf_config: list :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. @@ -523,10 +527,13 @@ class BigQueryBaseCursor(LoggingMixin): "Please only use one or more of the following options: {1}" .format(schema_update_options, allowed_schema_update_options)) +if use_legacy_sql is None: +use_legacy_sql = self.use_legacy_sql + configuration = { 'query': { 'query': bql, -'useLegacySql': self.use_legacy_sql, +'useLegacySql': use_legacy_sql, 'maximumBillingTier': maximum_billing_tier, 'maximumBytesBilled': maximum_bytes_billed, 'priority': priority http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f36ae3ac/tests/contrib/hooks/test_bigquery_hook.py -- diff --git a/tests/contrib/hooks/test_bigquery_hook.py b/tests/contrib/hooks/test_bigquery_hook.py index a5dd595..6c6bed6 100644 --- a/tests/contrib/hooks/test_bigquery_hook.py +++ b/tests/contrib/hooks/test_bigquery_hook.py @@ -240,6 +240,21 @@ class TestBigQueryBaseCursor(unittest.TestCase): mock_jobs.cancel.assert_called_with(projectId=project_id, jobId=running_job_id) +@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') +def test_run_query_sql_dialect_default(self, run_with_config): +cursor = hook.BigQueryBaseCursor(mock.Mock(), "project_id") +cursor.run_query('query') +args, kwargs = run_with_config.call_args +self.assertIs(args[0]['query']['useLegacySql'], True) + +@mock.patch.object(hook.BigQueryBaseCursor, 'run_with_configuration') +def test_run_query_sql_dialect_override(self, run_with_config): +for bool_val in [True, False]: +cursor = hook.BigQueryBaseCursor(mock.Mock(), "project_id") +cursor.run_query('query', use_legacy_sql=bool_val) +args, kwargs = run_with_config.call_args +self.assertIs(args[0]['query']['useLegacySql'], bool_val) + class TestTimePartitioningInRunJob(unittest.TestCase):
[jira] [Commented] (AIRFLOW-1642) An Alembic script not using scoped session causing deadlock
[ https://issues.apache.org/jira/browse/AIRFLOW-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16384225#comment-16384225 ] Joy Gao commented on AIRFLOW-1642: -- This one fell off my radar, I do have a PR out for it [https://github.com/apache/incubator-airflow/pull/2632] but never got merged :( > An Alembic script not using scoped session causing deadlock > --- > > Key: AIRFLOW-1642 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1642 > Project: Apache Airflow > Issue Type: Bug >Reporter: Joy Gao >Priority: Minor > > The bug I'm about to describe is a more of an obscure edge case, however I > think it's something still worth fixing. > After upgrading to airflow 1.9, while running `airflow resetdb` on my local > machine (with mysql), I encountered a deadlock on the final alembic revision > _d2ae31099d61 Increase text size for MySQL (not relevant for other DBs' text > types)_. > The deadlock turned out to be caused by another earlier session that was > created and left open in revision _cc1e65623dc7 add max tries column to task > instance_. Notably the code below: > {code} > sessionmaker = sa.orm.sessionmaker() > session = sessionmaker(bind=connection) > dagbag = DagBag(settings.DAGS_FOLDER) > {code} > The session created here was not a `scoped_session`, so when the DAGs were > being parsed in line 3 above, one of the DAG files makes a direct call to the > class method `Variable.get()` to acquire an env variable, which makes a db > query to the `variable` table, but raised a KeyError as the env variable was > non-existent, thus holding the lock to the `variable` table as a result of > that exception. > Later on, the latter alembic script `_cc1e65623dc7` needs to alter the > `Variable` table. Instead of creating its own Session object, it attempts to > reuse the same one as above. And because of the exception, it waits > indefinitely to acquire the lock on that table. > So the DAG file itself could have avoided the KeyError by providing a default > value when calling Variable.get(). However I think it would be a good idea to > avoid using unscoped sessions in general, as an exception could potentially > occur in the future elsewhere. The easiest fix is replacing *session = > sessionmaker(bind=connection)* with *session = settings.Session()*, which is > scoped. However, making a change on a migration script is going to make folks > anxious. > If anyone have any thoughts on this, let me know! Thanks :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2169) Fail to discern between VARBINARY and VARCHAR in MySQL
Hongyi Wang created AIRFLOW-2169: Summary: Fail to discern between VARBINARY and VARCHAR in MySQL Key: AIRFLOW-2169 URL: https://issues.apache.org/jira/browse/AIRFLOW-2169 Project: Apache Airflow Issue Type: Bug Components: db, operators Reporter: Hongyi Wang Assignee: Hongyi Wang Current MySqlToGoogleCloudStorageOperator has difficulty to discern between VARBINARY and VARCHAR in MySQL (and other similar fields–CHAR/BINARY, etc). While "binary-related" MySQL data types, like VARBINARY, should be mapped to "BYTES" in Google Cloud Storage, rather than "STRING". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2169) Fail to discern between VARBINARY and VARCHAR in MySQL
[ https://issues.apache.org/jira/browse/AIRFLOW-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2169 started by Hongyi Wang. > Fail to discern between VARBINARY and VARCHAR in MySQL > -- > > Key: AIRFLOW-2169 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2169 > Project: Apache Airflow > Issue Type: Bug > Components: db, operators >Reporter: Hongyi Wang >Assignee: Hongyi Wang >Priority: Major > > Current MySqlToGoogleCloudStorageOperator has difficulty to discern between > VARBINARY and VARCHAR in MySQL (and other similar fields–CHAR/BINARY, etc). > While "binary-related" MySQL data types, like VARBINARY, should be mapped to > "BYTES" in Google Cloud Storage, rather than "STRING". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2168) Remote logging for Azure Blob Storage
[ https://issues.apache.org/jira/browse/AIRFLOW-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2168 started by Marcus Rehm. > Remote logging for Azure Blob Storage > - > > Key: AIRFLOW-2168 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2168 > Project: Apache Airflow > Issue Type: New Feature > Components: logging >Affects Versions: 1.9.0 >Reporter: Marcus Rehm >Assignee: Marcus Rehm >Priority: Trivial > Labels: features, logging > > Today Airflow remote logging only supports Amazon S3 and GCS (Google Cloud > Storage). Would be nice if it have support to Azure Blob Storage too. > With the new logging structure available in Airflow 1.9 we can develop this > feature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2168) Remote logging for Azure Blob Storage
Marcus Rehm created AIRFLOW-2168: Summary: Remote logging for Azure Blob Storage Key: AIRFLOW-2168 URL: https://issues.apache.org/jira/browse/AIRFLOW-2168 Project: Apache Airflow Issue Type: New Feature Components: logging Affects Versions: 1.9.0 Reporter: Marcus Rehm Assignee: Marcus Rehm Today Airflow remote logging only supports Amazon S3 and GCS (Google Cloud Storage). Would be nice if it have support to Azure Blob Storage too. With the new logging structure available in Airflow 1.9 we can develop this feature. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2167) Scheduler's clear_nonexistent_import_errors function should be called on first iteration
Casey created AIRFLOW-2167: -- Summary: Scheduler's clear_nonexistent_import_errors function should be called on first iteration Key: AIRFLOW-2167 URL: https://issues.apache.org/jira/browse/AIRFLOW-2167 Project: Apache Airflow Issue Type: Bug Components: scheduler Affects Versions: 1.9.0 Reporter: Casey Assignee: Casey Attachments: Screen Shot 2018-03-02 at 2.08.29 PM.png In `airflow/jobs.py`, the `**clear_nonexistent_import_errors` function is not called until the amount of seconds defined by `dag_dir_list_interval` has elapsed. If the scheduler is not alive for the duration of `dag_dir_list_interval` (300 seconds) this cleanup never occurs. In some environments this could result in error messages displaying on the UI permanently, even if the DAG has been removed from the environment. It was previously an Airflow best practice to have the scheduler run N runtimes and terminate. Then, the scheduler would started again by an auxiliary process like Docker or Supervisor. This situation is what brought the bug to my attention. My suggested fix is to tweak jobs.py to run the import error cleanup on the first iteration and periodically as defined by `dag_dir_list_interval`. This way, a scheduler setup with a small number of runs will still have old errors cleaned up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2166) BigQueryBaseCursor missing sql dialect parameter
Winston Huang created AIRFLOW-2166: -- Summary: BigQueryBaseCursor missing sql dialect parameter Key: AIRFLOW-2166 URL: https://issues.apache.org/jira/browse/AIRFLOW-2166 Project: Apache Airflow Issue Type: Bug Reporter: Winston Huang Assignee: Winston Huang [https://github.com/apache/incubator-airflow/pull/2964] introduced a backward-incompatible change to {{BigQueryBaseCursor}} by removing the {{use_legacy_sql}} parameter from the {{run_query}} method. This parameter should be restored and override the default cursor dialect when specified. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2165) XCOM values are being saved as bytestring
Cong Qin created AIRFLOW-2165: - Summary: XCOM values are being saved as bytestring Key: AIRFLOW-2165 URL: https://issues.apache.org/jira/browse/AIRFLOW-2165 Project: Apache Airflow Issue Type: Bug Components: xcom Affects Versions: 1.9.0 Environment: Ubuntu Airflow 1.9.0 from PIP Reporter: Cong Qin Attachments: Screen Shot 2018-03-02 at 11.09.15 AM.png I noticed after upgrading to 1.9.0 that XCOM values are now being saved as byte strings that cannot be decoded. Once I downgraded back to 1.8.2 the "old" behavior is back. It means that when I'm storing certain values inside I cannot pull those values back out sometimes. I'm not sure if this was a documented change anywhere (I looked at the changelog between 1.8.2 and 1.9.0) and I couldn't find out if this was a config level change or something. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1246) Setting a Subdag task Instance State Throws exception
[ https://issues.apache.org/jira/browse/AIRFLOW-1246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383884#comment-16383884 ] Zachary Lawson commented on AIRFLOW-1246: - This is resolved in AIRFLOW-2160. This ticket can be closed. > Setting a Subdag task Instance State Throws exception > -- > > Key: AIRFLOW-1246 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1246 > Project: Apache Airflow > Issue Type: Bug > Components: subdag >Affects Versions: Airflow 1.8 >Reporter: kraman >Assignee: Zachary Lawson >Priority: Major > > When there is a parent dag with a subdag trying to set the task instance > state of any of the task instances in the subdag to ANY of the > states"failed,running/success" throws the below exception. > This is nasty when running a local executor the process are not terminated > and lie around until manually killed. > ] > Traceback (most recent call last): > File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 1988, in > wsgi_app > response = self.full_dispatch_request() > File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 1641, in > full_dispatch_request > rv = self.handle_user_exception(e) > File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 1544, in > handle_user_exception > reraise(exc_type, exc_value, tb) > File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 1639, in > full_dispatch_request > rv = self.dispatch_request() > File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 1625, in > dispatch_request > return self.view_functions[rule.endpoint](**req.view_args) > File "/usr/local/lib/python2.7/site-packages/flask_admin/base.py", line 69, > in inner > return self._run_view(f, *args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/flask_admin/base.py", line > 368, in _run_view > return fn(self, *args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/flask_admin/model/base.py", > line 2068, in action_view > return self.handle_action() > File "/usr/local/lib/python2.7/site-packages/flask_admin/actions.py", line > 113, in handle_action > response = handler[0](ids) > File "/usr/local/lib/python2.7/site-packages/airflow/www/views.py", line > 2341, in action_set_failed > self.set_task_instance_state(ids, State.FAILED) > File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 53, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/airflow/www/views.py", line > 2383, in set_task_instance_state > raise Exception("Ooops") > Exception: Ooops -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2065) Worker logging can raise FileExistsError when more than one process execute concurrently
[ https://issues.apache.org/jira/browse/AIRFLOW-2065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2065. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3040 [https://github.com/apache/incubator-airflow/pull/3040] > Worker logging can raise FileExistsError when more than one process execute > concurrently > > > Key: AIRFLOW-2065 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2065 > Project: Apache Airflow > Issue Type: Bug > Components: executor, logging >Affects Versions: 1.9.0 >Reporter: Sébastien Brochet >Priority: Critical > Fix For: 2.0.0 > > > Hello, > > We started observing random failing during the execution of our dags after > upgrading to 1.9.0. After careful debugging, we noticing the following > exception in the worker logs: > {noformat} > Traceback (most recent call last): > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 558, in configure > handler = self.configure_handler(handlers[name]) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 731, in configure_handler > result = factory(**kwargs) > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/utils/log/file_processor_handler.py", > line 48, in __init__ > os.makedirs(self._get_log_directory()) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/os.py", line 220, > in makedirs > mkdir(name, mode) > FileExistsError: [Errno 17] File exists: > '/projects/airflow-hadoop/airflow/logs/scheduler/2018-02-05' > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "/projects/airflow-hadoop/anaconda3/bin/airflow", line 16, in > from airflow import configuration > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/__init__.py", > line 31, in > from airflow import settings > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/settings.py", > line 148, in > configure_logging() > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/logging_config.py", > line 75, in configure_logging > raise e > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/logging_config.py", > line 70, in configure_logging > dictConfig(logging_config) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 795, in dictConfig > dictConfigClass(config).configure() > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 566, in configure > '%r: %s' % (name, e)) > ValueError: Unable to configure handler 'file.processor': [Errno 17] File > exists: '/projects/airflow-hadoop/airflow/logs/scheduler/2018-02-05 > {noformat} > > As you can see, an exception is raised when trying to create the directory > where to store the executor logs. This can happen if two tasks are scheduled > are the exact same time on the same worker. It appears to be the case here : > > {noformat} > [2018-02-05 02:10:07,886] \{celery_executor.py:50} INFO - Executing command > in Celery: airflow run pairing_sensor_check 2018-02-04T02:10:00 --local > --pool sensor -sd /projects/airflow-hadoop/airflow/dags/flow.py > [2018-02-05 02:10:07,908] \{celery_executor.py:50} INFO - Executing command > in Celery: airflow run yyy pairing_sensor_check 2018-02-04T02:10:00 --local > --pool sensor -sd /projects/airflow-hadoop/airflow/dags/flow.py > {noformat} > Culprits is here: > [https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/utils/log/file_processor_handler.py#L47-L48] > (not fixed in master) > A simple fix would be to wrap the {{makedirs}} command into a {{try}} / > {{catch}} block. > > Thanks, > > Sébastien -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2065) Worker logging can raise FileExistsError when more than one process execute concurrently
[ https://issues.apache.org/jira/browse/AIRFLOW-2065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383497#comment-16383497 ] ASF subversion and git services commented on AIRFLOW-2065: -- Commit b1deb3318f3fae4e21860bbd6e6463ef644aea8d in incubator-airflow's branch refs/heads/master from [~blinkseb] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=b1deb33 ] [AIRFLOW-2065] Fix race-conditions when creating loggers If two or more workers start at the same time, they will execute the same operations to create output directories for storing the log files. It can lead to race-conditions when, for example, worker A create the directory right after the non-existance check done by worker B; worker B will also try to create the directory while it does already exist. Closes #3040 from blinkseb/fix-airflow-2065 > Worker logging can raise FileExistsError when more than one process execute > concurrently > > > Key: AIRFLOW-2065 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2065 > Project: Apache Airflow > Issue Type: Bug > Components: executor, logging >Affects Versions: 1.9.0 >Reporter: Sébastien Brochet >Priority: Critical > Fix For: 2.0.0 > > > Hello, > > We started observing random failing during the execution of our dags after > upgrading to 1.9.0. After careful debugging, we noticing the following > exception in the worker logs: > {noformat} > Traceback (most recent call last): > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 558, in configure > handler = self.configure_handler(handlers[name]) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 731, in configure_handler > result = factory(**kwargs) > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/utils/log/file_processor_handler.py", > line 48, in __init__ > os.makedirs(self._get_log_directory()) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/os.py", line 220, > in makedirs > mkdir(name, mode) > FileExistsError: [Errno 17] File exists: > '/projects/airflow-hadoop/airflow/logs/scheduler/2018-02-05' > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "/projects/airflow-hadoop/anaconda3/bin/airflow", line 16, in > from airflow import configuration > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/__init__.py", > line 31, in > from airflow import settings > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/settings.py", > line 148, in > configure_logging() > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/logging_config.py", > line 75, in configure_logging > raise e > File > "/projects/airflow-hadoop/anaconda3/lib/python3.6/site-packages/airflow/logging_config.py", > line 70, in configure_logging > dictConfig(logging_config) > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 795, in dictConfig > dictConfigClass(config).configure() > File "/projects/airflow-hadoop/anaconda3/lib/python3.6/logging/config.py", > line 566, in configure > '%r: %s' % (name, e)) > ValueError: Unable to configure handler 'file.processor': [Errno 17] File > exists: '/projects/airflow-hadoop/airflow/logs/scheduler/2018-02-05 > {noformat} > > As you can see, an exception is raised when trying to create the directory > where to store the executor logs. This can happen if two tasks are scheduled > are the exact same time on the same worker. It appears to be the case here : > > {noformat} > [2018-02-05 02:10:07,886] \{celery_executor.py:50} INFO - Executing command > in Celery: airflow run pairing_sensor_check 2018-02-04T02:10:00 --local > --pool sensor -sd /projects/airflow-hadoop/airflow/dags/flow.py > [2018-02-05 02:10:07,908] \{celery_executor.py:50} INFO - Executing command > in Celery: airflow run yyy pairing_sensor_check 2018-02-04T02:10:00 --local > --pool sensor -sd /projects/airflow-hadoop/airflow/dags/flow.py > {noformat} > Culprits is here: > [https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/utils/log/file_processor_handler.py#L47-L48] > (not fixed in master) > A simple fix would be to wrap the {{makedirs}} command into a {{try}} / > {{catch}} block. > > Thanks, > > Sébastien -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2065] Fix race-conditions when creating loggers
Repository: incubator-airflow Updated Branches: refs/heads/master fedc5a092 -> b1deb3318 [AIRFLOW-2065] Fix race-conditions when creating loggers If two or more workers start at the same time, they will execute the same operations to create output directories for storing the log files. It can lead to race-conditions when, for example, worker A create the directory right after the non-existance check done by worker B; worker B will also try to create the directory while it does already exist. Closes #3040 from blinkseb/fix-airflow-2065 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b1deb331 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b1deb331 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b1deb331 Branch: refs/heads/master Commit: b1deb3318f3fae4e21860bbd6e6463ef644aea8d Parents: fedc5a0 Author: Sébastien BrochetAuthored: Fri Mar 2 12:38:03 2018 +0100 Committer: Fokko Driesprong Committed: Fri Mar 2 12:38:03 2018 +0100 -- airflow/utils/log/file_processor_handler.py | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b1deb331/airflow/utils/log/file_processor_handler.py -- diff --git a/airflow/utils/log/file_processor_handler.py b/airflow/utils/log/file_processor_handler.py index 176e316..f6d8d93 100644 --- a/airflow/utils/log/file_processor_handler.py +++ b/airflow/utils/log/file_processor_handler.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import logging import os @@ -45,7 +46,14 @@ class FileProcessorHandler(logging.Handler): self._cur_date = datetime.today() if not os.path.exists(self._get_log_directory()): -os.makedirs(self._get_log_directory()) +try: +os.makedirs(self._get_log_directory()) +except OSError as e: +# only ignore case where the directory already exist +if e.errno != errno.EEXIST: +raise + +logging.warning("%s already exists", self._get_log_directory()) self._symlink_latest_log_directory()
[jira] [Commented] (AIRFLOW-1642) An Alembic script not using scoped session causing deadlock
[ https://issues.apache.org/jira/browse/AIRFLOW-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383360#comment-16383360 ] Rolf Schroeder commented on AIRFLOW-1642: - Hi, the following workaround allows to use the Joy's patch on a one time basis: {code:bash} # Got to Airflow install dir cd /path/to/venv/lib/python*/site-packages/airflow/migrations/versions # Make a backup of the "faulty" revision rsync -a cc1e65623dc7_add_max_tries_column_to_task_instance.py cc1e65623dc7_add_max_tries_column_to_task_instance.py.bak # Add patch sed -i 's/session = sessionmaker(bind=connection)/session = settings.Session()/' cc1e65623dc7_add_max_tries_column_to_task_instance.py # Init db airflow initdb # Restore the revision rsync -av cc1e65623dc7_add_max_tries_column_to_task_instance.py.bak cc1e65623dc7_add_max_tries_column_to_task_instance.py {code} This is obviously not how things should get fixed but a working solution until someone is bold enough to actually fix the migration ;) > An Alembic script not using scoped session causing deadlock > --- > > Key: AIRFLOW-1642 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1642 > Project: Apache Airflow > Issue Type: Bug >Reporter: Joy Gao >Priority: Minor > > The bug I'm about to describe is a more of an obscure edge case, however I > think it's something still worth fixing. > After upgrading to airflow 1.9, while running `airflow resetdb` on my local > machine (with mysql), I encountered a deadlock on the final alembic revision > _d2ae31099d61 Increase text size for MySQL (not relevant for other DBs' text > types)_. > The deadlock turned out to be caused by another earlier session that was > created and left open in revision _cc1e65623dc7 add max tries column to task > instance_. Notably the code below: > {code} > sessionmaker = sa.orm.sessionmaker() > session = sessionmaker(bind=connection) > dagbag = DagBag(settings.DAGS_FOLDER) > {code} > The session created here was not a `scoped_session`, so when the DAGs were > being parsed in line 3 above, one of the DAG files makes a direct call to the > class method `Variable.get()` to acquire an env variable, which makes a db > query to the `variable` table, but raised a KeyError as the env variable was > non-existent, thus holding the lock to the `variable` table as a result of > that exception. > Later on, the latter alembic script `_cc1e65623dc7` needs to alter the > `Variable` table. Instead of creating its own Session object, it attempts to > reuse the same one as above. And because of the exception, it waits > indefinitely to acquire the lock on that table. > So the DAG file itself could have avoided the KeyError by providing a default > value when calling Variable.get(). However I think it would be a good idea to > avoid using unscoped sessions in general, as an exception could potentially > occur in the future elsewhere. The easiest fix is replacing *session = > sessionmaker(bind=connection)* with *session = settings.Session()*, which is > scoped. However, making a change on a migration script is going to make folks > anxious. > If anyone have any thoughts on this, let me know! Thanks :) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2147) Add 'sensors' attribute for plugins
[ https://issues.apache.org/jira/browse/AIRFLOW-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383346#comment-16383346 ] ASF subversion and git services commented on AIRFLOW-2147: -- Commit fedc5a092c4ceb74c7d02ff932ad7de796705d43 in incubator-airflow's branch refs/heads/master from [~arcward] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fedc5a0 ] [AIRFLOW-2147] Plugin manager: added 'sensors' attribute AirflowPlugin required both BaseOperator and BaseSensorOperator to be included in its `operators` attribute. Add a `sensors` attribute and updated import logic so that anything added to the new attribute can be imported from `airflow.sensors.{plugin_name}` The integration/`make_module` calls in `airflow.plugins_manager` for operators is also updated to maintain the ability to import sensors from `operators` to avoid breaking existing plugins - Update unit tests and documentation to reflect this - Added exclusion for flake8 module level import not at top of file Closes #3075 from arcward/AIRFLOW-2147 > Add 'sensors' attribute for plugins > --- > > Key: AIRFLOW-2147 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2147 > Project: Apache Airflow > Issue Type: Improvement > Components: plugins >Affects Versions: Airflow 1.8 >Reporter: Edward Wells >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > > I created a pull request: > https://github.com/apache/incubator-airflow/pull/3075 > {{airflow.plugins_manager.AirflowPlugin}} has attributes for operators, > hooks, executors, macros, admin_views, flask_blueprints and menu_links, but > not for sensors. > Right now, operators/sensors can be imported like: > {{ from airflow.operators.test_plugin import PluginOperator}} > It would be intuitive to have sensors importable similarly like: > {{ from airflow.sensors.test_plugin import PluginSensorOperator}} > The first time I wrote a plugin I instinctively did it this way until I > realized I had to bundle both operators and sensors into operators -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2147) Add 'sensors' attribute for plugins
[ https://issues.apache.org/jira/browse/AIRFLOW-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2147. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3075 [https://github.com/apache/incubator-airflow/pull/3075] > Add 'sensors' attribute for plugins > --- > > Key: AIRFLOW-2147 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2147 > Project: Apache Airflow > Issue Type: Improvement > Components: plugins >Affects Versions: Airflow 1.8 >Reporter: Edward Wells >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > > I created a pull request: > https://github.com/apache/incubator-airflow/pull/3075 > {{airflow.plugins_manager.AirflowPlugin}} has attributes for operators, > hooks, executors, macros, admin_views, flask_blueprints and menu_links, but > not for sensors. > Right now, operators/sensors can be imported like: > {{ from airflow.operators.test_plugin import PluginOperator}} > It would be intuitive to have sensors importable similarly like: > {{ from airflow.sensors.test_plugin import PluginSensorOperator}} > The first time I wrote a plugin I instinctively did it this way until I > realized I had to bundle both operators and sensors into operators -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2147) Add 'sensors' attribute for plugins
[ https://issues.apache.org/jira/browse/AIRFLOW-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383345#comment-16383345 ] ASF subversion and git services commented on AIRFLOW-2147: -- Commit fedc5a092c4ceb74c7d02ff932ad7de796705d43 in incubator-airflow's branch refs/heads/master from [~arcward] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fedc5a0 ] [AIRFLOW-2147] Plugin manager: added 'sensors' attribute AirflowPlugin required both BaseOperator and BaseSensorOperator to be included in its `operators` attribute. Add a `sensors` attribute and updated import logic so that anything added to the new attribute can be imported from `airflow.sensors.{plugin_name}` The integration/`make_module` calls in `airflow.plugins_manager` for operators is also updated to maintain the ability to import sensors from `operators` to avoid breaking existing plugins - Update unit tests and documentation to reflect this - Added exclusion for flake8 module level import not at top of file Closes #3075 from arcward/AIRFLOW-2147 > Add 'sensors' attribute for plugins > --- > > Key: AIRFLOW-2147 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2147 > Project: Apache Airflow > Issue Type: Improvement > Components: plugins >Affects Versions: Airflow 1.8 >Reporter: Edward Wells >Priority: Minor > Labels: pull-request-available > Fix For: 2.0.0 > > > I created a pull request: > https://github.com/apache/incubator-airflow/pull/3075 > {{airflow.plugins_manager.AirflowPlugin}} has attributes for operators, > hooks, executors, macros, admin_views, flask_blueprints and menu_links, but > not for sensors. > Right now, operators/sensors can be imported like: > {{ from airflow.operators.test_plugin import PluginOperator}} > It would be intuitive to have sensors importable similarly like: > {{ from airflow.sensors.test_plugin import PluginSensorOperator}} > The first time I wrote a plugin I instinctively did it this way until I > realized I had to bundle both operators and sensors into operators -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2147] Plugin manager: added 'sensors' attribute
Repository: incubator-airflow Updated Branches: refs/heads/master d4dfe2654 -> fedc5a092 [AIRFLOW-2147] Plugin manager: added 'sensors' attribute AirflowPlugin required both BaseOperator and BaseSensorOperator to be included in its `operators` attribute. Add a `sensors` attribute and updated import logic so that anything added to the new attribute can be imported from `airflow.sensors.{plugin_name}` The integration/`make_module` calls in `airflow.plugins_manager` for operators is also updated to maintain the ability to import sensors from `operators` to avoid breaking existing plugins - Update unit tests and documentation to reflect this - Added exclusion for flake8 module level import not at top of file Closes #3075 from arcward/AIRFLOW-2147 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fedc5a09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fedc5a09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fedc5a09 Branch: refs/heads/master Commit: fedc5a092c4ceb74c7d02ff932ad7de796705d43 Parents: d4dfe26 Author: Edward WellsAuthored: Fri Mar 2 09:29:06 2018 +0100 Committer: Fokko Driesprong Committed: Fri Mar 2 09:29:14 2018 +0100 -- airflow/__init__.py | 2 ++ airflow/plugins_manager.py | 5 +++-- airflow/sensors/__init__.py | 10 +- docs/plugins.rst | 12 ++-- tests/plugins/test_plugin.py | 10 +- tests/plugins_manager.py | 5 + 6 files changed, 34 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fedc5a09/airflow/__init__.py -- diff --git a/airflow/__init__.py b/airflow/__init__.py index 3c5f24c..4c4509e 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -76,11 +76,13 @@ class AirflowMacroPlugin(object): self.namespace = namespace from airflow import operators +from airflow import sensors # noqa: E402 from airflow import hooks from airflow import executors from airflow import macros operators._integrate_plugins() +sensors._integrate_plugins() # noqa: E402 hooks._integrate_plugins() executors._integrate_plugins() macros._integrate_plugins() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fedc5a09/airflow/plugins_manager.py -- diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 22a873c..aaae423 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -36,6 +36,7 @@ class AirflowPluginException(Exception): class AirflowPlugin(object): name = None operators = [] +sensors = [] hooks = [] executors = [] macros = [] @@ -115,9 +116,9 @@ menu_links = [] for p in plugins: operators_modules.append( -make_module('airflow.operators.' + p.name, p.operators)) +make_module('airflow.operators.' + p.name, p.operators + p.sensors)) sensors_modules.append( -make_module('airflow.sensors.' + p.name, p.operators) +make_module('airflow.sensors.' + p.name, p.sensors) ) hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks)) executors_modules.append( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fedc5a09/airflow/sensors/__init__.py -- diff --git a/airflow/sensors/__init__.py b/airflow/sensors/__init__.py index 2239467..9c936f7 100644 --- a/airflow/sensors/__init__.py +++ b/airflow/sensors/__init__.py @@ -48,13 +48,13 @@ def _integrate_plugins(): if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): from zope.deprecation import deprecated as _deprecated -for _operator in sensors_module._objects: -operator_name = _operator.__name__ -globals()[operator_name] = _operator +for _sensor in sensors_module._objects: +sensor_name = _sensor.__name__ +globals()[sensor_name] = _sensor _deprecated( -operator_name, +sensor_name, "Importing plugin operator '{i}' directly from " "'airflow.operators' has been deprecated. Please " "import from 'airflow.operators.[plugin_module]' " "instead. Support for direct imports will be dropped " -"entirely in Airflow 2.0.".format(i=operator_name)) +"entirely in Airflow 2.0.".format(i=sensor_name))
[jira] [Resolved] (AIRFLOW-2059) taskinstance query is awful, un-indexed, and does not scale
[ https://issues.apache.org/jira/browse/AIRFLOW-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2059. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3086 [https://github.com/apache/incubator-airflow/pull/3086] > taskinstance query is awful, un-indexed, and does not scale > --- > > Key: AIRFLOW-2059 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2059 > Project: Apache Airflow > Issue Type: Bug > Components: db, webserver >Affects Versions: Airflow 1.8 > Environment: [nhanlon@ ~]$ nproc > 4 > [nhanlon@ ~]$ free -g > total used free sharedbuffers cached > Mem: 7 5 1 0 0 1 > -/+ buffers/cache: 4 3 > Swap:0 0 0 > [nhanlon@ ~]$ cat /etc/*release* > CentOS release 6.7 (Final) > CentOS release 6.7 (Final) > CentOS release 6.7 (Final) > cpe:/o:centos:linux:6:GA > [nhanlon@ ~]$ mysqld --version > mysqld Ver 5.6.31-77.0 for Linux on x86_64 (Percona Server (GPL), Release > 77.0, Revision 5c1061c) >Reporter: Neil Hanlon >Assignee: Tao Feng >Priority: Critical > Fix For: 2.0.0 > > > > The page at /admin/taskinstance/ can reach a point where it blocks loading > the page and crushes the database. It appears this is because the > task_instance.job_id column is unindexed. On our database, getting the > results for this query took over four minutes, locking the table for the > duration. > > 500 rows in set (4 min 8.93 sec) > > Query: > > {code:java} > SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id > AS task_instance_dag_id, task_instance.execution_date AS > task_instance_execution_date, task_instance.start_date AS > task_instance_start_date, task_instance.end_date AS task_instance_end_date, > task_instance.duration AS task_instance_duration, task_instance.state AS > task_instance_state, task_instance.try_number AS task_instance_try_number, > task_instance.hostname AS task_instance_hostname, task_instance.unixname AS > task_instance_unixname, task_instance.job_id AS task_instance_job_id, > task_instance.pool AS task_instance_pool, task_instance.queue AS > task_instance_queue, task_instance.priority_weight AS > task_instance_priority_weight, task_instance.operator AS > task_instance_operator, task_instance.queued_dttm AS > task_instance_queued_dttm, task_instance.pid AS task_instance_pid > FROM task_instance ORDER BY task_instance.job_id DESC > LIMIT 500; > {code} > Profile, explain: > > {code:java} > :airflow> EXPLAIN SELECT task_instance.task_id AS > task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, > task_instance.execution_date AS task_instance_execution_date, > task_instance.start_date AS task_instance_start_date, task_instance.end_date > AS task_instance_end_date, task_instance.duration AS task_instance_duration, > task_instance.state AS task_instance_state, task_instance.try_number AS > task_instance_try_number, task_instance.hostname AS task_instance_hostname, > task_instance.unixname AS task_instance_unixname, task_instance.job_id AS > task_instance_job_id, task_instance.pool AS task_instance_pool, > task_instance.queue AS task_instance_queue, task_instance.priority_weight AS > task_instance_priority_weight, task_instance.operator AS > task_instance_operator, task_instance.queued_dttm AS > task_instance_queued_dttm, task_instance.pid AS task_instance_pid > -> FROM task_instance ORDER BY task_instance.job_id DESC > -> LIMIT 500; > ++-+---+--+---+--+-+--+-++ > | id | select_type | table | type | possible_keys | key | key_len | ref | > rows | Extra | > ++-+---+--+---+--+-+--+-++ > | 1 | SIMPLE | task_instance | ALL | NULL | NULL | NULL | NULL | 2542776 | > Using filesort | > ++-+---+--+---+--+-+--+-++ > 1 row in set (0.00 sec) > :airflow> select count(*) from task_instance; > +--+ > | count(*) | > +--+ > | 2984749 | > +--+ > 1 row in set (1.67 sec) > :airflow> show profile for query 2; > +--++ > | Status | Duration | > +--++ > | starting | 0.000157 | > | checking permissions | 0.17 | > | Opening tables | 0.33 | > | init | 0.46 | > | System lock | 0.17 | > | optimizing | 0.10 | > | statistics | 0.22 | > | preparing | 0.20 | > | Sorting result | 0.10 | > | executing | 0.08 | > | Sending data |
[jira] [Commented] (AIRFLOW-2059) taskinstance query is awful, un-indexed, and does not scale
[ https://issues.apache.org/jira/browse/AIRFLOW-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16383340#comment-16383340 ] ASF subversion and git services commented on AIRFLOW-2059: -- Commit d4dfe2654e16d1ae2e7464e642a3520de04496e2 in incubator-airflow's branch refs/heads/master from Tao feng [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=d4dfe26 ] [AIRFLOW-2059] taskinstance query is awful, un-indexed, and does not scale Closes #3086 from feng-tao/airflow-2059 > taskinstance query is awful, un-indexed, and does not scale > --- > > Key: AIRFLOW-2059 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2059 > Project: Apache Airflow > Issue Type: Bug > Components: db, webserver >Affects Versions: Airflow 1.8 > Environment: [nhanlon@ ~]$ nproc > 4 > [nhanlon@ ~]$ free -g > total used free sharedbuffers cached > Mem: 7 5 1 0 0 1 > -/+ buffers/cache: 4 3 > Swap:0 0 0 > [nhanlon@ ~]$ cat /etc/*release* > CentOS release 6.7 (Final) > CentOS release 6.7 (Final) > CentOS release 6.7 (Final) > cpe:/o:centos:linux:6:GA > [nhanlon@ ~]$ mysqld --version > mysqld Ver 5.6.31-77.0 for Linux on x86_64 (Percona Server (GPL), Release > 77.0, Revision 5c1061c) >Reporter: Neil Hanlon >Assignee: Tao Feng >Priority: Critical > > > The page at /admin/taskinstance/ can reach a point where it blocks loading > the page and crushes the database. It appears this is because the > task_instance.job_id column is unindexed. On our database, getting the > results for this query took over four minutes, locking the table for the > duration. > > 500 rows in set (4 min 8.93 sec) > > Query: > > {code:java} > SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id > AS task_instance_dag_id, task_instance.execution_date AS > task_instance_execution_date, task_instance.start_date AS > task_instance_start_date, task_instance.end_date AS task_instance_end_date, > task_instance.duration AS task_instance_duration, task_instance.state AS > task_instance_state, task_instance.try_number AS task_instance_try_number, > task_instance.hostname AS task_instance_hostname, task_instance.unixname AS > task_instance_unixname, task_instance.job_id AS task_instance_job_id, > task_instance.pool AS task_instance_pool, task_instance.queue AS > task_instance_queue, task_instance.priority_weight AS > task_instance_priority_weight, task_instance.operator AS > task_instance_operator, task_instance.queued_dttm AS > task_instance_queued_dttm, task_instance.pid AS task_instance_pid > FROM task_instance ORDER BY task_instance.job_id DESC > LIMIT 500; > {code} > Profile, explain: > > {code:java} > :airflow> EXPLAIN SELECT task_instance.task_id AS > task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, > task_instance.execution_date AS task_instance_execution_date, > task_instance.start_date AS task_instance_start_date, task_instance.end_date > AS task_instance_end_date, task_instance.duration AS task_instance_duration, > task_instance.state AS task_instance_state, task_instance.try_number AS > task_instance_try_number, task_instance.hostname AS task_instance_hostname, > task_instance.unixname AS task_instance_unixname, task_instance.job_id AS > task_instance_job_id, task_instance.pool AS task_instance_pool, > task_instance.queue AS task_instance_queue, task_instance.priority_weight AS > task_instance_priority_weight, task_instance.operator AS > task_instance_operator, task_instance.queued_dttm AS > task_instance_queued_dttm, task_instance.pid AS task_instance_pid > -> FROM task_instance ORDER BY task_instance.job_id DESC > -> LIMIT 500; > ++-+---+--+---+--+-+--+-++ > | id | select_type | table | type | possible_keys | key | key_len | ref | > rows | Extra | > ++-+---+--+---+--+-+--+-++ > | 1 | SIMPLE | task_instance | ALL | NULL | NULL | NULL | NULL | 2542776 | > Using filesort | > ++-+---+--+---+--+-+--+-++ > 1 row in set (0.00 sec) > :airflow> select count(*) from task_instance; > +--+ > | count(*) | > +--+ > | 2984749 | > +--+ > 1 row in set (1.67 sec) > :airflow> show profile for query 2; > +--++ > | Status | Duration | > +--++ > | starting | 0.000157 | > | checking permissions | 0.17 | > | Opening tables | 0.33 | > | init |
incubator-airflow git commit: [AIRFLOW-2059] taskinstance query is awful, un-indexed, and does not scale
Repository: incubator-airflow Updated Branches: refs/heads/master c7e39683d -> d4dfe2654 [AIRFLOW-2059] taskinstance query is awful, un-indexed, and does not scale Closes #3086 from feng-tao/airflow-2059 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d4dfe265 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d4dfe265 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d4dfe265 Branch: refs/heads/master Commit: d4dfe2654e16d1ae2e7464e642a3520de04496e2 Parents: c7e3968 Author: Tao fengAuthored: Fri Mar 2 09:26:00 2018 +0100 Committer: Fokko Driesprong Committed: Fri Mar 2 09:26:00 2018 +0100 -- airflow/models.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d4dfe265/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index f2c3f2e..20996e0 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -780,7 +780,7 @@ class TaskInstance(Base, LoggingMixin): max_tries = Column(Integer) hostname = Column(String(1000)) unixname = Column(String(1000)) -job_id = Column(Integer) +job_id = Column(Integer, index=True) pool = Column(String(50)) queue = Column(String(50)) priority_weight = Column(Integer)