[GitHub] [airflow] potiuk commented on pull request #8807: Added automated release notes generation for backport operators
potiuk commented on pull request #8807: URL: https://github.com/apache/airflow/pull/8807#issuecomment-629042359 Hey @ashb @kaxil @mik-laj @feluelle . I think I have finally solved all the small quirks - including some extras (like apache-beam) messing with the installation. This one contains all the fixes discussed above plus: * we do not have to modify setup_backport_packages.py to change version or add rc1/rc2. Package version is now the same as the generated readme version and you can add `--version-suffix rc1` manually when preparing packages. This way from exactly the same commit we can generate rc1/rc2 etc. without having to modify anything (we could use the same technique to generate main airflow package BTW). * Documentation in BACKPORT_PACKAGES.md is updated to include this simplified process * we excluded some of the google packages (bigquery ones) from the google provider release. We (@turbaszek ) are running now big rewrite of the bigquery operators and they will not be backwards compatible, so we think it is bettter to exclude them from this release (we will release this and some new operators in a few weeks - at the same time testing the single-package releases. * When you prepare packages - by default when you do not provide the date, by default it is +5 days - this is mostly used in the CI job. The CI job will prepare (and upload to files.io) readmes for all packages - this way we can see the release notes for packages that need to be released. I will improve it further to only show and summarize the packages that "need release" - i.e. those that had some changes since the last release (but I will add it later). I want to have a single page/report showing - "those are the changed backport packages - maybe we should release some of them"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara commented on pull request #8867: Add argument to BaseSensorOperator to control skipping all downstream tasks or not
j-y-matsubara commented on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-629042421 @potiuk @kaxil Would you mind confirming this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8807: Added automated release notes generation for backport operators
mik-laj commented on a change in pull request #8807: URL: https://github.com/apache/airflow/pull/8807#discussion_r425580282 ## File path: airflow/providers/apache/cassandra/README.md ## @@ -0,0 +1,105 @@ + + + +# Package apache-airflow-backport-providers-apache-cassandra + +Release: 2020.05.19 + +**Table of contents** + +- [Backport package](#backport-package) +- [Installation](#installation) +- [Compatibility](#compatibility) +- [PIP requirements](#pip-requirements) +- [Provider class summary](#provider-class-summary) +- [Sensors](#sensors) +- [Moved sensors](#moved-sensors) +- [Hooks](#hooks) +- [Moved hooks](#moved-hooks) +- [Releases](#releases) +- [Release 2020.05.19](#release-20200519) + +## Backport package + +This is a backport providers package for `` provider. All classes for this provider package Review comment: Missing provider name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] stale[bot] commented on pull request #7761: [AIRFLOW-7086] /dags/paused should be a POST
stale[bot] commented on pull request #7761: URL: https://github.com/apache/airflow/pull/7761#issuecomment-629035026 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-7086) Change the dags-paused API endpoint to a POST
[ https://issues.apache.org/jira/browse/AIRFLOW-7086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107953#comment-17107953 ] ASF GitHub Bot commented on AIRFLOW-7086: - stale[bot] commented on pull request #7761: URL: https://github.com/apache/airflow/pull/7761#issuecomment-629035026 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Change the dags-paused API endpoint to a POST > - > > Key: AIRFLOW-7086 > URL: https://issues.apache.org/jira/browse/AIRFLOW-7086 > Project: Apache Airflow > Issue Type: Improvement > Components: api >Affects Versions: 1.10.9 >Reporter: Daniel Imberman >Assignee: Daniel Imberman >Priority: Minor > Fix For: 2.0.0 > > > We shouldn't be changing state using a get method. We should change the > /dags/paused endpoint to a POST where the data contains the desired pause > state -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6831) Support for various ssh keys and certificates
[ https://issues.apache.org/jira/browse/AIRFLOW-6831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107954#comment-17107954 ] ASF GitHub Bot commented on AIRFLOW-6831: - stale[bot] commented on pull request #7452: URL: https://github.com/apache/airflow/pull/7452#issuecomment-629035038 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Support for various ssh keys and certificates > - > > Key: AIRFLOW-6831 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6831 > Project: Apache Airflow > Issue Type: New Feature > Components: hooks >Affects Versions: 2.0.0 >Reporter: Bert Desmet >Priority: Major > > For our usecase need to support both ed25519 keys and 'MFA', using a signed > key. > This pull request adds the possibility to both add support for these 2 things. > > When creating a hook some extra parameters become availble in the > 'extra_parameters' hash, notably: > * key_type: (str) can be rsa, ecdsa or ed25519; defaults to rsa > * cert_file: (str) path towards the certicate. Defaults to None > > Bert > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] stale[bot] commented on pull request #8002: DST bug #7999 failing unit test
stale[bot] commented on pull request #8002: URL: https://github.com/apache/airflow/pull/8002#issuecomment-629035048 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] stale[bot] commented on pull request #7452: [AIRFLOW-6831] Support for various ssh keys and certificates
stale[bot] commented on pull request #7452: URL: https://github.com/apache/airflow/pull/7452#issuecomment-629035038 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] samuelkhtu commented on issue #8525: SQLBranchOperator
samuelkhtu commented on issue #8525: URL: https://github.com/apache/airflow/issues/8525#issuecomment-629029045 Hey @jeffolsi , quick question for you. In the existing Airflow Python Branching Operator, the python callback function will return the 'task_id' or list of 'tasl_ids' for selecting the branching to follow. I am just wondering if you would like to use the SQL query to select the branches as well? For example, the SQL query "SELECT 'branch_a', 'branch_b' will return 2 columns and the SQLBranchOperator will follow branch_a and branch_b. (branch_a and branch_b are task_ids within the DAG) Or you expect the SQL query to return multiple rows, each row will represent the task_id within the DAG? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8873: Fix list formatting of plugins doc.
ashb commented on pull request #8873: URL: https://github.com/apache/airflow/pull/8873#issuecomment-628920686 For reference this is what it looks like in our published docs: ![image](https://user-images.githubusercontent.com/34150/81992446-5ca86c80-963b-11ea-8948-c219ab723924.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new pull request #8873: Fix list formatting of plugins doc.
ashb opened a new pull request #8873: URL: https://github.com/apache/airflow/pull/8873 This was causing it to be picked up as a `/` containing a list, instead of a paragraph and a list. ``` This will create a hook, and an operator accessible at: airflow.hooks.my_namespace.MyHook airflow.operators.my_namespace.MyOperator ``` I have tested this locally and looked at the html produced -- this fixes the problem (I can't include screenshot as it doesn't look noticably different as I don't have the theme installed. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dimberman commented on a change in pull request #8829: Fix KubernetesPodOperator pod name length validation
dimberman commented on a change in pull request #8829: URL: https://github.com/apache/airflow/pull/8829#discussion_r425467323 ## File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py ## @@ -322,5 +322,5 @@ def _set_resources(resources): def _set_name(self, name): if self.pod_template_file or self.full_pod_spec: return None -validate_key(name, max_length=63) +validate_key(name, max_length=220) Review comment: LGTM just would like to see some tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (fe42191 -> 4813b94)
This is an automated email from the ASF dual-hosted git repository. ash pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from fe42191 Don't use ProcessorAgent to test ProcessorManager (#8871) add 4813b94 Create log file w/abs path so tests pass on MacOS (#8820) No new revisions were added by this update. Summary of changes: airflow/utils/log/file_processor_handler.py | 13 +++-- tests/jobs/test_scheduler_job.py| 14 -- tests/test_utils/config.py | 20 3 files changed, 35 insertions(+), 12 deletions(-)
[GitHub] [airflow] ashb merged pull request #8820: Create log file w abs path so tests pass on MacOS
ashb merged pull request #8820: URL: https://github.com/apache/airflow/pull/8820 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8820: Create log file w abs path so tests pass on MacOS
ashb commented on pull request #8820: URL: https://github.com/apache/airflow/pull/8820#issuecomment-628915037 One failure: ``` 2020-05-14T21:36:29.9829995Z FAILED tests/utils/test_serve_logs.py::TestServeLogs::test_should_serve_file ``` Perhaps that should be set up to retry, or moved to quarantine. Merging this now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #8829: Fix KubernetesPodOperator pod name length validation
kaxil commented on a change in pull request #8829: URL: https://github.com/apache/airflow/pull/8829#discussion_r425456845 ## File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py ## @@ -322,5 +322,5 @@ def _set_resources(resources): def _set_name(self, name): if self.pod_template_file or self.full_pod_spec: return None -validate_key(name, max_length=63) +validate_key(name, max_length=220) Review comment: Can we add tests too, please? @dimberman Can you take a look please at this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107724#comment-17107724 ] ASF GitHub Bot commented on AIRFLOW-4052: - XD-DENG commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628910082 Thanks @kaxil ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.11 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] XD-DENG commented on pull request #4881: [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view
XD-DENG commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628910082 Thanks @kaxil ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch v1-10-test updated: [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view (#4881)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/v1-10-test by this push: new 7eba52b [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view (#4881) 7eba52b is described below commit 7eba52b460d398fb8c39196f23ee6a25a8cd4866 Author: Xiaodong AuthorDate: Sat Mar 9 07:24:51 2019 +0800 [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view (#4881) In the RBAC UI, users can check Logs. But they could only use "dag_id", "task_id", "execution_date", or "extra" to filter, while filtering using "event" and "owner" will be very useful (to allow users to check specific events that happened, or check what a specific user did). --- airflow/www_rbac/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/www_rbac/views.py b/airflow/www_rbac/views.py index 6ea21c1..49cb7d2 100644 --- a/airflow/www_rbac/views.py +++ b/airflow/www_rbac/views.py @@ -2622,7 +2622,7 @@ class LogModelView(AirflowModelView): list_columns = ['id', 'dttm', 'dag_id', 'task_id', 'event', 'execution_date', 'owner', 'extra'] -search_columns = ['dag_id', 'task_id', 'execution_date', 'extra'] +search_columns = ['dag_id', 'task_id', 'event', 'execution_date', 'owner', 'extra'] base_order = ('dttm', 'desc')
[jira] [Commented] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107720#comment-17107720 ] ASF subversion and git services commented on AIRFLOW-4052: -- Commit 7eba52b460d398fb8c39196f23ee6a25a8cd4866 in airflow's branch refs/heads/v1-10-test from Xiaodong Deng [ https://gitbox.apache.org/repos/asf?p=airflow.git;h=7eba52b ] [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view (#4881) In the RBAC UI, users can check Logs. But they could only use "dag_id", "task_id", "execution_date", or "extra" to filter, while filtering using "event" and "owner" will be very useful (to allow users to check specific events that happened, or check what a specific user did). > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.11 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107718#comment-17107718 ] ASF GitHub Bot commented on AIRFLOW-4052: - kaxil edited a comment on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628907026 Thanks for the note @XD-DENG - I have added it to 1.10.11 milestone and cherry-picked it too :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.11 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil edited a comment on pull request #4881: [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view
kaxil edited a comment on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628907026 Thanks for the note @XD-DENG - I have added it to 1.10.11 milestone and cherry-picked it too :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-4052: Fix Version/s: (was: 1.10.3) 1.10.11 > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.11 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on pull request #4881: [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view
kaxil commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628907026 Thanks for the note @XD-DENG - I have added it to 1.10.11 milestone and will cherry-pick it once I start the process :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107716#comment-17107716 ] ASF GitHub Bot commented on AIRFLOW-4052: - kaxil commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628907026 Thanks for the note @XD-DENG - I have added it to 1.10.11 milestone and will cherry-pick it once I start the process :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.3 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on pull request #8807: Added automated release notes generation for backport operators
kaxil commented on pull request #8807: URL: https://github.com/apache/airflow/pull/8807#issuecomment-628906632 Static checks are failing, maybe needs rebasing on Master ?? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info
[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107714#comment-17107714 ] ASF GitHub Bot commented on AIRFLOW-6497: - mik-laj edited a comment on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628641784 1. For executor events https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1430 Send to DagFileProcessorManager via DagFileProcesorAgent https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1464-L1469 https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L737-L742 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 2. For zombies Find zombies https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1139 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler creates DagBag in the same process with outdated info > --- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.7 >Reporter: Qian Yu >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " >"although the task says its {}. Was the task " >"killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" >". Setting task to FAILED without > callbacks or " >"retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.6/imp.py", line 172, in load_source > module = _load(spec) > File "", line 684, in
[GitHub] [airflow] mik-laj edited a comment on pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
mik-laj edited a comment on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628641784 1. For executor events https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1430 Send to DagFileProcessorManager via DagFileProcesorAgent https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1464-L1469 https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L737-L742 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 2. For zombies Find zombies https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1139 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/apache/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] timhealz commented on a change in pull request #8849: Add multiple file upload functionality to GCS hook
timhealz commented on a change in pull request #8849: URL: https://github.com/apache/airflow/pull/8849#discussion_r425450247 ## File path: tests/providers/google/cloud/operators/test_local_to_gcs.py ## @@ -42,6 +67,10 @@ def setUp(self): 'start_date': datetime.datetime(2017, 1, 1) } self.dag = DAG('test_dag_id', default_args=args) +with open('/tmp/fake1.csv', 'wb') as f: +f.write(b"x" * 393216) +with open('/tmp/fake2.csv', 'wb') as f: +f.write(b"x" * 393216) Review comment: Thanks @turbaszek use of temp files has been implemented in the latest. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk opened a new issue #8872: Add dependencies on build image
potiuk opened a new issue #8872: URL: https://github.com/apache/airflow/issues/8872 **Description** Add dependencies with "on-build" feature of Docker build. **Use case / motivation** Some dependencies can be added easily using on-build feature of Docker build. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #8605: Add Production-ready docker compose for the production image
potiuk commented on issue #8605: URL: https://github.com/apache/airflow/issues/8605#issuecomment-628864609 You should also be able to build a new image using ON_BUILD feature - for building images depending on the base one. Added a separate issue here: #8872 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6981) Move Google Cloud Build from Discovery API to Python Library
[ https://issues.apache.org/jira/browse/AIRFLOW-6981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107494#comment-17107494 ] ASF GitHub Bot commented on AIRFLOW-6981: - mik-laj commented on a change in pull request #8575: URL: https://github.com/apache/airflow/pull/8575#discussion_r425316910 ## File path: airflow/providers/google/cloud/operators/cloud_build.py ## @@ -15,186 +15,678 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Operators that integrat with Google Cloud Build service.""" -import re -from copy import deepcopy -from typing import Any, Dict, Iterable, Optional -from urllib.parse import unquote, urlparse -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook -from airflow.utils.decorators import apply_defaults - -REGEX_REPO_PATH = re.compile(r"^/p/(?P[^/]+)/r/(?P[^/]+)") +"""Operators that integrates with Google Cloud Build service.""" +from typing import Dict, Optional, Sequence, Tuple, Union -class BuildProcessor: -""" -Processes build configurations to add additional functionality to support the use of operators. +from google.api_core.retry import Retry +from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource +from google.protobuf.json_format import MessageToDict -The following improvements are made: +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook # noqa +from airflow.utils.decorators import apply_defaults -* It is required to provide the source and only one type can be given, -* It is possible to provide the source as the URL address instead dict. -:param body: The request body. -See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build -:type body: dict +class CloudBuildCancelBuildOperator(BaseOperator): +""" +Cancels a build in progress. + +:param id_: The ID of the build. +:type id_: str +:param project_id: Optional, Google Cloud Project project_id where the function belongs. +If set to None or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param retry: Optional, a retry object used to retry requests. If `None` is specified, requests +will not be retried. +:type retry: Optional[Retry] +:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete. +Note that if `retry` is specified, the timeout applies to each individual attempt. +:type timeout: Optional[float] +:param metadata: Optional, additional metadata that is provided to the method. +:type metadata: Optional[Sequence[Tuple[str, str]]] +:param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform. +:type gcp_conn_id: Optional[str] + +:rtype: dict """ -def __init__(self, body: Dict) -> None: -self.body = deepcopy(body) - -def _verify_source(self): -is_storage = "storageSource" in self.body["source"] -is_repo = "repoSource" in self.body["source"] - -sources_count = sum([is_storage, is_repo]) -if sources_count != 1: -raise AirflowException( -"The source could not be determined. Please choose one data source from: " -"storageSource and repoSource." -) +template_fields = ("project_id", "id_", "gcp_conn_id") -def _reformat_source(self): -self._reformat_repo_source() -self._reformat_storage_source() +@apply_defaults +def __init__( +self, +id_: str, +project_id: Optional[str] = None, +retry: Optional[Retry] = None, +timeout: Optional[float] = None, +metadata: Optional[Sequence[Tuple[str, str]]] = None, +gcp_conn_id: str = "google_cloud_default", +*args, +**kwargs +) -> None: +super().__init__(*args, **kwargs) +self.id_ = id_ +self.project_id = project_id +self.retry = retry +self.timeout = timeout +self.metadata = metadata +self.gcp_conn_id = gcp_conn_id -def _reformat_repo_source(self): -if "repoSource" not in self.body["source"]: -return +def execute(self, context): +hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id) +result = hook.cancel_build( +id_=self.id_, +project_id=self.project_id, +retry=self.retry, +timeout=self.timeout, +metadata=self.metadata, +) +return MessageToDict(result) + + +class CloudBuildCreateBuildOperator(BaseOperator): +""" +
[GitHub] [airflow] mik-laj commented on a change in pull request #8575: [AIRFLOW-6981] Move Google Cloud Build from Discovery API to Python Library
mik-laj commented on a change in pull request #8575: URL: https://github.com/apache/airflow/pull/8575#discussion_r425316910 ## File path: airflow/providers/google/cloud/operators/cloud_build.py ## @@ -15,186 +15,678 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Operators that integrat with Google Cloud Build service.""" -import re -from copy import deepcopy -from typing import Any, Dict, Iterable, Optional -from urllib.parse import unquote, urlparse -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook -from airflow.utils.decorators import apply_defaults - -REGEX_REPO_PATH = re.compile(r"^/p/(?P[^/]+)/r/(?P[^/]+)") +"""Operators that integrates with Google Cloud Build service.""" +from typing import Dict, Optional, Sequence, Tuple, Union -class BuildProcessor: -""" -Processes build configurations to add additional functionality to support the use of operators. +from google.api_core.retry import Retry +from google.cloud.devtools.cloudbuild_v1.types import Build, BuildTrigger, RepoSource +from google.protobuf.json_format import MessageToDict -The following improvements are made: +from airflow.models import BaseOperator +from airflow.providers.google.cloud.hooks.cloud_build import CloudBuildHook # noqa +from airflow.utils.decorators import apply_defaults -* It is required to provide the source and only one type can be given, -* It is possible to provide the source as the URL address instead dict. -:param body: The request body. -See: https://cloud.google.com/cloud-build/docs/api/reference/rest/Shared.Types/Build -:type body: dict +class CloudBuildCancelBuildOperator(BaseOperator): +""" +Cancels a build in progress. + +:param id_: The ID of the build. +:type id_: str +:param project_id: Optional, Google Cloud Project project_id where the function belongs. +If set to None or missing, the default project_id from the GCP connection is used. +:type project_id: Optional[str] +:param retry: Optional, a retry object used to retry requests. If `None` is specified, requests +will not be retried. +:type retry: Optional[Retry] +:param timeout: Optional, the amount of time, in seconds, to wait for the request to complete. +Note that if `retry` is specified, the timeout applies to each individual attempt. +:type timeout: Optional[float] +:param metadata: Optional, additional metadata that is provided to the method. +:type metadata: Optional[Sequence[Tuple[str, str]]] +:param gcp_conn_id: Optional, the connection ID used to connect to Google Cloud Platform. +:type gcp_conn_id: Optional[str] + +:rtype: dict """ -def __init__(self, body: Dict) -> None: -self.body = deepcopy(body) - -def _verify_source(self): -is_storage = "storageSource" in self.body["source"] -is_repo = "repoSource" in self.body["source"] - -sources_count = sum([is_storage, is_repo]) -if sources_count != 1: -raise AirflowException( -"The source could not be determined. Please choose one data source from: " -"storageSource and repoSource." -) +template_fields = ("project_id", "id_", "gcp_conn_id") -def _reformat_source(self): -self._reformat_repo_source() -self._reformat_storage_source() +@apply_defaults +def __init__( +self, +id_: str, +project_id: Optional[str] = None, +retry: Optional[Retry] = None, +timeout: Optional[float] = None, +metadata: Optional[Sequence[Tuple[str, str]]] = None, +gcp_conn_id: str = "google_cloud_default", +*args, +**kwargs +) -> None: +super().__init__(*args, **kwargs) +self.id_ = id_ +self.project_id = project_id +self.retry = retry +self.timeout = timeout +self.metadata = metadata +self.gcp_conn_id = gcp_conn_id -def _reformat_repo_source(self): -if "repoSource" not in self.body["source"]: -return +def execute(self, context): +hook = CloudBuildHook(gcp_conn_id=self.gcp_conn_id) +result = hook.cancel_build( +id_=self.id_, +project_id=self.project_id, +retry=self.retry, +timeout=self.timeout, +metadata=self.metadata, +) +return MessageToDict(result) + + +class CloudBuildCreateBuildOperator(BaseOperator): +""" +Starts a build with the specified configuration. -source = self.body["source"]["repoSource"] +:param build: The build resource to create. If a dict is provided, it must be of the same form +as the protobuf message
[GitHub] [airflow] XD-DENG commented on pull request #4881: [AIRFLOW-4052] Allow filtering using "event" and "owner" in "Log" view
XD-DENG commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628776701 Hi @ashb and @kaxil , mind helping cherry-pick this commit into `v-10`? It was labelled to be released in 1.10.3, however until today it's still only in `master` branch. It's a very simple change but may be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4052) To allow filtering using "event" and "owner" in "Log" view
[ https://issues.apache.org/jira/browse/AIRFLOW-4052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107486#comment-17107486 ] ASF GitHub Bot commented on AIRFLOW-4052: - XD-DENG commented on pull request #4881: URL: https://github.com/apache/airflow/pull/4881#issuecomment-628776701 Hi @ashb and @kaxil , mind helping cherry-pick this commit into `v-10`? It was labelled to be released in 1.10.3, however until today it's still only in `master` branch. It's a very simple change but may be useful. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > To allow filtering using "event" and "owner" in "Log" view > -- > > Key: AIRFLOW-4052 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4052 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Affects Versions: 1.10.2 >Reporter: Xiaodong Deng >Assignee: Xiaodong Deng >Priority: Minor > Labels: webapp > Fix For: 1.10.3 > > > In the RBAC UI, users can check Logs. But they can only use "dag id", "task > id", "execution date", or "extra" to filter, while filtering using "event" > and "owner" will be very useful (to allow users to check specific events > happened, or check what a specific user did). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] xinbinhuang commented on a change in pull request #8863: Add EMR operators howto docs
xinbinhuang commented on a change in pull request #8863: URL: https://github.com/apache/airflow/pull/8863#discussion_r425261116 ## File path: docs/howto/operator/amazon/aws/emr.rst ## @@ -0,0 +1,121 @@ + .. Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + + +.. _howto/operator:EMROperators: + +Amazon EMR Operators + + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +-- + +.. include:: _partials/prerequisite_tasks.rst + +Overview + + +Airflow to AWS EMR integraion provides several operators to create and interact with EMR service. Review comment: Thanks for spotting this! Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (961c710 -> fe42191)
This is an automated email from the ASF dual-hosted git repository. ash pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 961c710 Make Custom XCom backend a subsection of XCom docs (#8869) add fe42191 Don't use ProcessorAgent to test ProcessorManager (#8871) No new revisions were added by this update. Summary of changes: airflow/utils/dag_processing.py| 4 +++ tests/utils/test_dag_processing.py | 64 +- 2 files changed, 47 insertions(+), 21 deletions(-)
[GitHub] [airflow] ashb merged pull request #8871: Don't use ProcessorAgent to test ProcessorManager
ashb merged pull request #8871: URL: https://github.com/apache/airflow/pull/8871 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on issue #8674: Fix problems with, and test agsinst, Python 3.8
ashb commented on issue #8674: URL: https://github.com/apache/airflow/issues/8674#issuecomment-628719122 > For development we also have Breeze environment which I think long term might become the default Let's separate out development Airflow itself and developing DAGs etc. For the former: maybe, but it should never be a requirement on contributors. (And as long as I'm working on Airflow my default's gonna be direct development, not using Docker :grinning:. I am on Linux now though.) For the latter: being able to use OSX (and yes, eventually Windows) as a user writing dags and running a development Airflow instance without Docker is a critical goal of on-boarding for new users because: 1. Running multiple docker containers at once can start to seriously impact performance on less-powerful laptops. 2. Docker is still quite confusing if you aren't familiar with it, especially when it comes to rebuilding and volume mounts etc. We can solve some of that, but I am set on still supporting running directly for users on OSX. > I believe this is a very small subset of tests. Yes, it is. The only real issue in the spawn-vs-fork comes when we use the `conf_vars` decorator in the test -- where the temporarily changed config values doesn't affect any spawned subprocess. Given the fix for that is quite easy (we can set the `AIRFLOW__*` environment variable in the conf_var decorator too.) that I wouldn't even say we need a special decorator for it. And the config mocking issue aside, I think being able to test on OSX/with spawn is useful to getting us _towards_ testing it runs on OSX/Windows. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] PeWu commented on a change in pull request #8721: Add OpenAPI specification (II)
PeWu commented on a change in pull request #8721: URL: https://github.com/apache/airflow/pull/8721#discussion_r425211003 ## File path: openapi.yaml ## @@ -0,0 +1,2203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +openapi: 3.0.3 + +info: + title: "Airflow API (Stable)" + description: Apache Airflow management API. + version: '1.0.0' + license: +name: Apache 2.0 +url: http://www.apache.org/licenses/LICENSE-2.0.html + contact: +name: Apache Foundation +url: https://airflow.apache.org +email: d...@airflow.apache.org + +servers: + - url: /api/v1 +description: Airfow Stable API. + +paths: + # Database entities + /connections: +get: + summary: Get all connection entries + operationId: getConnections + tags: [Connection] + parameters: +- $ref: '#/components/parameters/PageLimit' +- $ref: '#/components/parameters/PageOffset' + responses: +'200': + description: List of connection entry. + content: +application/json: + schema: +allOf: + - $ref: '#/components/schemas/ConnectionCollection' + - $ref: '#/components/schemas/CollectionInfo' +'401': + $ref: '#/components/responses/Unauthenticated' +'403': + $ref: '#/components/responses/PermissionDenied' + +post: + summary: Create connection entry + operationId: createConnection + tags: [Connection] + requestBody: +required: true +content: + application/json: +schema: + $ref: '#/components/schemas/Connection' + responses: +'200': + description: Successful response. + content: +application/json: + schema: +$ref: '#/components/schemas/Connection' +'400': + $ref: '#/components/responses/BadRequest' +'401': + $ref: '#/components/responses/Unauthenticated' +'403': + $ref: '#/components/responses/PermissionDenied' + + /connections/{connection_id}: +parameters: + - $ref: '#/components/parameters/ConnectionID' + +get: + summary: Get a connection entry + operationId: getConnection + tags: [Connection] + responses: +'200': + description: Successful response. + content: +application/json: + schema: +$ref: '#/components/schemas/Connection' +'401': + $ref: '#/components/responses/Unauthenticated' +'403': + $ref: '#/components/responses/PermissionDenied' +'404': + $ref: '#/components/responses/NotFound' + +patch: + summary: Update a connection entry + operationId: updaateConnection + tags: [Connection] + parameters: +- $ref: '#/components/parameters/UpdateMask' + requestBody: +required: true +content: + application/json: +schema: + $ref: '#/components/schemas/Connection' + + responses: +'200': + description: Successful response. + content: +application/json: + schema: +$ref: '#/components/schemas/Connection' +'400': + $ref: '#/components/responses/BadRequest' +'401': + $ref: '#/components/responses/Unauthenticated' +'403': + $ref: '#/components/responses/PermissionDenied' +'404': + $ref: '#/components/responses/NotFound' + +delete: + summary: Delete a connection entry + operationId: deleteConnection + tags: [Connection] + responses: +'204': + description: No content. +'400': + $ref: '#/components/responses/BadRequest' +'401': + $ref: '#/components/responses/Unauthenticated' +'403': + $ref: '#/components/responses/PermissionDenied' + + /dags: +get: + summary: Get all DAGs + operationId: getDags + tags: [DAG] + parameters: +- $ref: '#/components/parameters/PageLimit' +- $ref: '#/components/parameters/PageOffset' +
[GitHub] [airflow] ashb commented on a change in pull request #8871: Don't use ProcessorAgent to test ProcessorManager
ashb commented on a change in pull request #8871: URL: https://github.com/apache/airflow/pull/8871#discussion_r425184805 ## File path: tests/utils/test_dag_processing.py ## @@ -85,6 +87,23 @@ class TestDagFileProcessorManager(unittest.TestCase): def setUp(self): clear_db_runs() +def run_processor_manager_one_loop(self, manager, parent_pipe): +if not manager._async_mode: +parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) + +results = [] + +while True: # pylint: disable=too-many-nested-blocks +manager._run_parsing_loop() + +while parent_pipe.poll(timeout=0.01): +obj = parent_pipe.recv() +if isinstance(obj, DagParsingStat): +if obj.done: +return results +continue +results.append(obj) Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] chamcca commented on a change in pull request #8749: add AWS StepFunctions integrations to the aws provider
chamcca commented on a change in pull request #8749: URL: https://github.com/apache/airflow/pull/8749#discussion_r425185039 ## File path: airflow/providers/amazon/aws/operators/step_function_get_execution_output.py ## @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +from airflow.utils.decorators import apply_defaults + + +class StepFunctionGetExecutionOutputOperator(BaseOperator): +""" +An Operator that begins execution of an Step Function State Machine + +Additional arguments may be specified and are passed down to the underlying BaseOperator. + +.. seealso:: +:class:`~airflow.models.BaseOperator` + +:param execution_arn: ARN of the Step Function State Machine Execution +:type execution_arn: str +:param aws_conn_id: aws connection to use, defaults to 'aws_default' +:type aws_conn_id: str +""" +template_fields = ['execution_arn'] +template_ext = () +ui_color = '#f9c915' + +@apply_defaults +def __init__(self, execution_arn: str, aws_conn_id='aws_default', region_name=None, *args, **kwargs): +if kwargs.get('xcom_push') is not None: +raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") Review comment: I took that from another, probably older operator I used as an example. If unnecessary I can remove it. I'll hold off for a bit in case there are additional comments on it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8871: Don't use ProcessorAgent to test ProcessorManager
ashb commented on a change in pull request #8871: URL: https://github.com/apache/airflow/pull/8871#discussion_r425184513 ## File path: tests/utils/test_dag_processing.py ## @@ -85,6 +87,23 @@ class TestDagFileProcessorManager(unittest.TestCase): def setUp(self): clear_db_runs() +def run_processor_manager_one_loop(self, manager, parent_pipe): +if not manager._async_mode: +parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) + +results = [] + +while True: # pylint: disable=too-many-nested-blocks +manager._run_parsing_loop() + +while parent_pipe.poll(timeout=0.01): +obj = parent_pipe.recv() +if isinstance(obj, DagParsingStat): +if obj.done: +return results +continue +results.append(obj) Review comment: Yes okay, I was being lazy :) Would please plyint, but you've inverted one case. ```python if not isinstance(obj, DagParsingStat): results.append(obj) elif obj.done: return results ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] cmccaughey-ias commented on a change in pull request #8749: add AWS StepFunctions integrations to the aws provider
cmccaughey-ias commented on a change in pull request #8749: URL: https://github.com/apache/airflow/pull/8749#discussion_r425182749 ## File path: airflow/providers/amazon/aws/operators/step_function_get_execution_output.py ## @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.step_function import StepFunctionHook +from airflow.utils.decorators import apply_defaults + + +class StepFunctionGetExecutionOutputOperator(BaseOperator): +""" +An Operator that begins execution of an Step Function State Machine + +Additional arguments may be specified and are passed down to the underlying BaseOperator. + +.. seealso:: +:class:`~airflow.models.BaseOperator` + +:param execution_arn: ARN of the Step Function State Machine Execution +:type execution_arn: str +:param aws_conn_id: aws connection to use, defaults to 'aws_default' +:type aws_conn_id: str +""" +template_fields = ['execution_arn'] +template_ext = () +ui_color = '#f9c915' + +@apply_defaults +def __init__(self, execution_arn: str, aws_conn_id='aws_default', region_name=None, *args, **kwargs): +if kwargs.get('xcom_push') is not None: +raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead") Review comment: I took that from another, probably older operator I used as an example. If unnecessary I can remove it. I'll hold off for a bit in case there are additional comments on it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (fc862a3 -> 961c710)
This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from fc862a3 Do not create a separate process for one task in CeleryExecutor (#8855) add 961c710 Make Custom XCom backend a subsection of XCom docs (#8869) No new revisions were added by this update. Summary of changes: docs/concepts.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[GitHub] [airflow] turbaszek merged pull request #8869: Make Custom XCom backend a subsection of XCom docs
turbaszek merged pull request #8869: URL: https://github.com/apache/airflow/pull/8869 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8871: Don't use ProcessorAgent to test ProcessorManager
turbaszek commented on a change in pull request #8871: URL: https://github.com/apache/airflow/pull/8871#discussion_r425180376 ## File path: tests/utils/test_dag_processing.py ## @@ -85,6 +87,23 @@ class TestDagFileProcessorManager(unittest.TestCase): def setUp(self): clear_db_runs() +def run_processor_manager_one_loop(self, manager, parent_pipe): +if not manager._async_mode: +parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) + +results = [] + +while True: # pylint: disable=too-many-nested-blocks +manager._run_parsing_loop() + +while parent_pipe.poll(timeout=0.01): +obj = parent_pipe.recv() +if isinstance(obj, DagParsingStat): +if obj.done: +return results +continue +results.append(obj) Review comment: ```suggestion if not isinstance(obj, DagParsingStat): continue elif obj.done: return results results.append(obj) ``` This may please pylint This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8871: Don't use ProcessorAgent to test ProcessorManager
turbaszek commented on a change in pull request #8871: URL: https://github.com/apache/airflow/pull/8871#discussion_r425177568 ## File path: tests/utils/test_dag_processing.py ## @@ -85,6 +87,23 @@ class TestDagFileProcessorManager(unittest.TestCase): def setUp(self): clear_db_runs() +def run_processor_manager_one_loop(self, manager, parent_pipe): +if not manager._async_mode: +parent_pipe.send(DagParsingSignal.AGENT_RUN_ONCE) + +results = [] + +while True: # pylint: disable=too-many-nested-blocks +manager._run_parsing_loop() + +while parent_pipe.poll(timeout=0.01): +obj = parent_pipe.recv() +if isinstance(obj, DagParsingStat): +if obj.done: +return results +continue Review comment: ```suggestion if isinstance(obj, DagParsingStat) and obj.done: return results ``` This should please pylint This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8871: Don't use ProcessorAgent to test ProcessorManager
ashb commented on a change in pull request #8871: URL: https://github.com/apache/airflow/pull/8871#discussion_r425173106 ## File path: tests/utils/test_dag_processing.py ## @@ -149,10 +168,11 @@ def test_find_zombies(self): ti = TI(task, DEFAULT_DATE, State.RUNNING) local_job = LJ(ti) local_job.state = State.SHUTDOWN -local_job.id = 1 -ti.job_id = local_job.id session.add(local_job) +session.commit() + +ti.job_id = local_job.id Review comment: This change is _somewhat_ unrelated, I can split it out if people would like. When re-running individual tests I got a Unique constraint violation on the DB because we aren't using the autoinc PK on the "job" table here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new pull request #8871: Don't use ProcessorAgent to test ProcessorManager
ashb opened a new pull request #8871: URL: https://github.com/apache/airflow/pull/8871 Some of our tests (when I was looking at another change) were using the ProcessorAgent to run and test the behaviour of our ProcessorManager in certain cases. Having that extra process in the middle is not critical for the tests, and makes it harder to debug the problem when if something breaks. (We already test the Agent elsewhere.) To make this possible I have made a small refactor to the loop of DagFileProcessorManager (to give us a method we can call in tests that doesn't do `os.setsid`). --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info
[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107327#comment-17107327 ] ASF GitHub Bot commented on AIRFLOW-6497: - ashb commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628660872 Oh sorry, I misread this section of tests https://github.com/apache/airflow/blob/fc862a3edd010e65b9b3fe586855fe81807ee4e8/tests/utils/test_dag_processing.py#L225-L229 I thought that was expecting the failure callback to also come back, but that's not what it's checking This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler creates DagBag in the same process with outdated info > --- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.7 >Reporter: Qian Yu >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " >"although the task says its {}. Was the task " >"killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" >". Setting task to FAILED without > callbacks or " >"retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.6/imp.py", line 172, in load_source > module = _load(spec) > File "", line 684, in _load > File "", line 665, in _load_unlocked > File "", line 678, in exec_module > File "", line 219, in _call_with_frames_removed > File "/dags/dag1.py", line 280, in > task1 >> task2.join > AttributeError: 'CustomOperator' object has no attribute 'join' > [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag > bag to handle failure for [queued]>. Setting task to FAILED without callbacks or retries. Do you have > enough resou > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ashb commented on pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
ashb commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628660872 Oh sorry, I misread this section of tests https://github.com/apache/airflow/blob/fc862a3edd010e65b9b3fe586855fe81807ee4e8/tests/utils/test_dag_processing.py#L225-L229 I thought that was expecting the failure callback to also come back, but that's not what it's checking This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ewjmulder commented on issue #8484: Airflow 1.10.7+ suppresses Operator logs
ewjmulder commented on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628656975 Yes, I guess the issue is related to an upgrade from an older version where you also (try to) migrate the log config but somehow they are not fully compatible and this issue appears. If someone does a deep dive into the diffs on the logging config and code between 1.10.6 and 1.10.7 I'm sure some underlying reason will pop up. But I'm happy with the resolution that we have found :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] davido912 commented on issue #8484: Airflow 1.10.7+ suppresses Operator logs
davido912 commented on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628654368 @ewjmulder seems to work, nice job! we have 3 remote airflow instances and I basically wiped out one and installed without using any custom logging configs, that worked as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ewjmulder commented on issue #8484: Airflow 1.10.7+ suppresses Operator logs
ewjmulder commented on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628644762 @davido912 In the Airflow project the example config is in the airflow_local_settings.py file indeed. But you can customize this with the `logging_config_class = ...` setting in airflow.cfg (as we have done). Either way, it boils down to a dictionary variable called something like `(DEFAULT_)LOGGING_CONFIG` that contains (among others) the `formatters`, `handlers` and `loggers` properties. Inside that `loggers` property the various loggers are defined and that is where you should add one with as key the Airflow package name for which you are missing the logging (for instance `airflow.contrib`), see the example snippet in my comment above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info
[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107308#comment-17107308 ] ASF GitHub Bot commented on AIRFLOW-6497: - mik-laj commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628643638 For processor events Scheduler send the callback to DagFileProcessorManager, DagFileProcessorManager starts DagFileProcessor via DagFileProcessorProcess and process callback Scheduler => DagFileProcessorManager => DagFileProcessor => DagFileProcessorProcess For zombies: DagFileProcessorManager => DagFileProcessor => DagFileProcessorProcess This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler creates DagBag in the same process with outdated info > --- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.7 >Reporter: Qian Yu >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " >"although the task says its {}. Was the task " >"killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" >". Setting task to FAILED without > callbacks or " >"retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.6/imp.py", line 172, in load_source > module = _load(spec) > File "", line 684, in _load > File "", line 665, in _load_unlocked > File "", line 678, in exec_module > File "", line 219, in _call_with_frames_removed > File "/dags/dag1.py", line 280, in > task1 >> task2.join > AttributeError: 'CustomOperator' object has no attribute 'join' > [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag > bag to handle failure for [queued]>. Setting task to FAILED without callbacks or retries. Do you have > enough resou > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] mik-laj commented on pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
mik-laj commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628643638 For processor events Scheduler send the callback to DagFileProcessorManager, DagFileProcessorManager starts DagFileProcessor via DagFileProcessorProcess and process callback Scheduler => DagFileProcessorManager => DagFileProcessor => DagFileProcessorProcess For zombies: DagFileProcessorManager => DagFileProcessor => DagFileProcessorProcess This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info
[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107307#comment-17107307 ] ASF GitHub Bot commented on AIRFLOW-6497: - mik-laj commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628641784 1. For executor events https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1430 Send to DagFileProcessorManager via DagFileProcesorAgent https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L1464-L1469 https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L737-L742 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 2. For zombies Find zombies https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L670-L673 Send to queue https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1139 Start a new DagFileProcessor via DagFileProcessorProcess https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/utils/dag_processing.py#L1045-L1057 Execute callback https://github.com/PolideaInternal/airflow/blob/99d237b684bbcdcf808451dfbfad425c0c7f772b/airflow/jobs/scheduler_job.py#L835-L838 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler creates DagBag in the same process with outdated info > --- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.7 >Reporter: Qian Yu >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " >"although the task says its {}. Was the task " >"killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" >". Setting task to FAILED without > callbacks or " >"retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: Hm, I'm not sure how this should be approached. What I would is : ```python def set_xcomargs_dependencies(self) -> None: from airflow.models.xcom_arg import XComArg def apply_set_upstream_and_lineage(arg: Any): if isinstance(arg, XComArg): op: BaseOperator = arg.operator self.set_upstream(op) # Picks up any outlets from direct upstream tasks that have outlets defined, # as such that if A -> B and B does not have inlets but A has outlets, then # these are provided as inlets to B. self.add_inlets(op.get_outlet_defs()) ... for field in self.template_fields: arg = getattr(self, field) apply_set_upstream_and_lineage(arg) ``` However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done out of the box. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: Hm, I'm not sure how this should be approached. What I would is : ```python def set_xcomargs_dependencies(self) -> None: from airflow.models.xcom_arg import XComArg def apply_set_upstream_and_lineage(arg: Any): if isinstance(arg, XComArg): op: BaseOperator = arg.operator self.set_upstream(op) # Picks up any outlets from direct upstream tasks that have outlets defined, # as such that if A -> B and B does not have inlets but A has outlets, then # these are provided as inlets to B. self.add_inlets(op.get_outlet_defs()) ``` However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: Hm, I'm not sure how this should be approached. What I would is : ```python def set_xcomargs_dependencies(self) -> None: from airflow.models.xcom_arg import XComArg def apply_set_upstream_and_lineage(arg: Any): if isinstance(arg, XComArg): op: BaseOperator = arg.operator self.set_upstream(op) # Picks up any outlets from direct upstream tasks that have outlets defined, # as such that if A -> B and B does not have inlets but A has outlets, then # these are provided as inlets to B. self.add_inlets(op.get_outlet_defs()) ... for field in self.template_fields: arg = getattr(self, field) apply_set_upstream_and_lineage(arg) ``` However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425141185 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: Hm, I'm not sure how this should be approached. What I would is : ```python if isinstance(arg, XComArg): op: BaseOperator = arg.operator self.set_upstream(op) # Picks up any outlets from direct upstream tasks that have outlets defined, # as such that if A -> B and B does not have inlets but A has outlets, then # these are provided as inlets to B. self.add_inlets(op.get_outlet_defs()) ``` However, I am not sure if this is the right approach. Not all outlets of A has to be inlets of B. Proposed behaviour is what we get when we use `inlets=AUTO` and I'm not sure this is something that should be done auto of the box. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on a change in pull request #8807: Added automated release notes generation for backport operators
potiuk commented on a change in pull request #8807: URL: https://github.com/apache/airflow/pull/8807#discussion_r425137911 ## File path: airflow/providers/apache/druid/README.md ## @@ -0,0 +1,123 @@ + + + +# Package apache-airflow-backport-providers-apache-druid + +**Table of contents** + +- [Backport package](#backport-package) +- [Installation](#installation) +- [Compatibility](#compatibility) +- [PIP requirements](#pip-requirements) +- [Cross provider package dependencies](#cross-provider-package-dependencies) +- [Provider class summary](#provider-class-summary) +- [Operators](#operators) +- [Moved operators](#moved-operators) +- [Hooks](#hooks) +- [Moved hooks](#moved-hooks) +- [Releases](#releases) +- [Release 2020.05.11](#release-20200511) + +## Backport package + +This is a backport providers package for `apache.druid` provider. All classes for this provider package +are in `airflow.providers.apache.druid` python package. + +## Installation + +You can install this package on top of an existing airflow 1.10.* installation via +`pip install apache-airflow-backport-providers-apache-druid` + +## Compatibility + +For full compatibility and test status of the backport packages check +[Airflow Backport Package Compatibility](https://cwiki.apache.org/confluence/display/AIRFLOW/Backported+providers+packages+for+Airflow+1.10.*+series) + +## PIP requirements + +| PIP package | Version required | +|:--|:---| +| pydruid | =0.4.1,=0.5.8| + +## Cross provider package dependencies + +Those are dependencies that might be needed in order to use all the features of the package. +You need to install the specified backport providers package in order to use them. + +You can install such cross-provider dependencies when installing from PyPI. For example: + +```bash +pip install apache-airflow-backport-providers-apache-druid[apache.hive] +``` + +| Dependent package | Extra | +|:-|:| +| [apache-airflow-backport-providers-apache-hive](https://github.com/apache/airflow/tree/master/airflow/providers/apache/hive) | apache.hive | + +# Provider class summary + +All classes in Airflow 2.0 are in `airflow.providers.apache.druid` package. + + +## Operators + + + + +### Moved operators + +| Airflow 2.0 operators: `airflow.providers.apache.druid` package | Airflow 1.10.* previous location (usually `airflow.contrib`) | +|:---|:---| +| [operators.druid.DruidOperator](https://github.com/apache/airflow/blob/master/airflow/providers/apache/druid/operators/druid.py) | [operators.druid_operator.DruidOperator](https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/druid_operator.py) | +| [operators.druid_check.DruidCheckOperator](https://github.com/apache/airflow/blob/master/airflow/providers/apache/druid/operators/druid_check.py) | [airflow.operators.druid_check_operator.DruidCheckOperator](https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/druid_check_operator.py) | +| [operators.hive_to_druid.HiveToDruidTransfer](https://github.com/apache/airflow/blob/master/airflow/providers/apache/druid/operators/hive_to_druid.py) | [airflow.operators.hive_to_druid.HiveToDruidTransfer](https://github.com/apache/airflow/blob/v1-10-stable/airflow/operators/hive_to_druid.py) | + + + + + +## Hooks + + + +### Moved hooks + +| Airflow 2.0 hooks: `airflow.providers.apache.druid` package | Airflow 1.10.* previous location (usually `airflow.contrib`) | +|:--|:---| +| [hooks.druid.DruidDbApiHook](https://github.com/apache/airflow/blob/master/airflow/providers/apache/druid/hooks/druid.py) | [airflow.hooks.druid_hook.DruidDbApiHook](https://github.com/apache/airflow/blob/v1-10-stable/airflow/hooks/druid_hook.py) | +|
[GitHub] [airflow] potiuk commented on a change in pull request #8807: Added automated release notes generation for backport operators
potiuk commented on a change in pull request #8807: URL: https://github.com/apache/airflow/pull/8807#discussion_r425137699 ## File path: BREEZE.rst ## @@ -870,6 +872,78 @@ This is the current syntax for `./breeze <./breeze>`_: + Detailed usage for command: generate-backport-readme + + breeze [FLAGS] generate-backport-readme -- + +Prepares README.md files for backport packages. You can provide (after --) optional version +in the form of .MM.DD, optionally followed by the list of packages to generate readme for. +If the first parameter is not formatted as a date, then today is used as version. +If no packages are specified, readme for all packages are generated. + +Examples: + +'breeze generate-backport-readme' or +'breeze generate-backport-readme -- 2020.05.10' or +'breeze generate-backport-readme -- 2020.05.10 https google amazon' + +General form: + +'breeze generate-backport-readme -- .MM.DD ...' + +* .MM.DD - is the CALVER version of the package to prepare. Note that this date + cannot be earlier than the already released version (the script will fail if it + will be). It can be set in the future anticipating the future release date. + +* is usually directory in the airflow/providers folder (for example + 'google' but in several cases, it might be one level deeper separated with + '.' for example 'apache.hive' + + Flags: + + -v, --verbose + Show verbose information about executed commands (enabled by default for running test). + Note that you can further increase verbosity and see all the commands executed by breeze + by running 'export VERBOSE_COMMANDS="true"' before running breeze. + + + + + + Detailed usage for command: prepare-backport-packages + + breeze [FLAGS] prepare-backport-packages -- + +Builds backport packages. You can provide (after --) optional list of packages to prepare. +If no packages are specified, readme for all packages are generated. + +Make sure to set the right version in './backport_packages/setup_backport_packages.py' + +Examples: + +'breeze prepare-backport-packages' or +'breeze prepare-backport-packages' -- google' or +'breeze prepare-backport-packages' -- http google amazon' Review comment: REsolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on a change in pull request #8807: Added automated release notes generation for backport operators
potiuk commented on a change in pull request #8807: URL: https://github.com/apache/airflow/pull/8807#discussion_r425137237 ## File path: breeze ## @@ -1181,6 +1221,32 @@ $(flag_airflow_variants) " # shellcheck disable=SC2090 export DETAILED_USAGE_INITIALIZE_LOCAL_VIRTUALENV +# shellcheck disable=SC2089 +DETAILED_USAGE_PREPARE_BACKPORT_PACKAGES=" + Builds backport packages. You can provide (after --) optional list of packages to prepare. + If no packages are specified, readme for all packages are generated. + + Make sure to set the right version in './backport_packages/setup_backport_packages.py' + + Examples: + + '${CMDNAME} prepare-backport-packages' or + '${CMDNAME} prepare-backport-packages' -- google' or + '${CMDNAME} prepare-backport-packages' -- http google amazon' Review comment: Yep. Thanks. Resolved.! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] dinigo opened a new pull request #8870: Add template_fields to GCSFileTransformOperator
dinigo opened a new pull request #8870: URL: https://github.com/apache/airflow/pull/8870 source_object and destination_object were not being templated, thus not rendered correctly. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [] Unit tests coverage for changes (not needed for documentation changes) - [] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek opened a new pull request #8869: Make Custom XCom backend a subsection of XCom docs
turbaszek opened a new pull request #8869: URL: https://github.com/apache/airflow/pull/8869 --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek opened a new pull request #8868: Add BigQueryInsertJobOperator
turbaszek opened a new pull request #8868: URL: https://github.com/apache/airflow/pull/8868 Depends on: #8858 --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8259: [AIRFLOW-XXX] Optimize airflow scheduler
ashb commented on pull request #8259: URL: https://github.com/apache/airflow/pull/8259#issuecomment-628613314 We don't accept PRs against 1.9, sorry. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb closed pull request #8259: [AIRFLOW-XXX] Optimize airflow scheduler
ashb closed pull request #8259: URL: https://github.com/apache/airflow/pull/8259 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-6497) Scheduler creates DagBag in the same process with outdated info
[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17107265#comment-17107265 ] ASF GitHub Bot commented on AIRFLOW-6497: - ashb commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628612706 @mik-laj Am I following the path of `FailureCallbackRequest` correctly for zombie requests? Does it start in the DagFileProcessorManager, get sent back via the Agent to the scheduler, only to the get sent _back_ to the ProcessorManager? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler creates DagBag in the same process with outdated info > --- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.7 >Reporter: Qian Yu >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " >"although the task says its {}. Was the task " >"killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" >". Setting task to FAILED without > callbacks or " >"retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.6/imp.py", line 172, in load_source > module = _load(spec) > File "", line 684, in _load > File "", line 665, in _load_unlocked > File "", line 678, in exec_module > File "", line 219, in _call_with_frames_removed > File "/dags/dag1.py", line 280, in > task1 >> task2.join > AttributeError: 'CustomOperator' object has no attribute 'join' > [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag > bag to handle failure for [queued]>. Setting task to FAILED without callbacks or retries. Do you have > enough resou > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ashb commented on pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
ashb commented on pull request #7597: URL: https://github.com/apache/airflow/pull/7597#issuecomment-628612706 @mik-laj Am I following the path of `FailureCallbackRequest` correctly for zombie requests? Does it start in the DagFileProcessorManager, get sent back via the Agent to the scheduler, only to the get sent _back_ to the ProcessorManager? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara edited a comment on pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
j-y-matsubara edited a comment on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-628604933 @jeffolsi Thank you for your comments. > This behavior needs to be triple checked. You are assuming that the downstream has only one branch which doesn't join with other branches. > If there is a join there is a question of should the skip cascade or stop at the join. Everything will be skipped, including downstream tasks of the downstream branches. ( This is the same behavior as when soft_fail is set True in the current BaseSensorOperator. ) Added test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara edited a comment on pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
j-y-matsubara edited a comment on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-628604933 @jeffolsi Thank you for your comments. > This behavior needs to be triple checked. You are assuming that the downstream has only one branch which doesn't join with other branches. > If there is a join there is a question of should the skip cascade or stop at the join. Everything will be skipped, including downstream tasks of the downstream branches. ( This is the same behavior as when soft_fail is set True in the current BaseSensorOperator. ) Add test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8777: Add Production Helm chart support
ashb commented on a change in pull request #8777: URL: https://github.com/apache/airflow/pull/8777#discussion_r425104759 ## File path: chart/templates/configmap.yaml ## @@ -0,0 +1,108 @@ + +## Airflow ConfigMap +# +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ template "airflow_config" . }} + labels: +tier: airflow +component: config +release: {{ .Release.Name }} +chart: "{{ .Chart.Name }}-{{ .Chart.Version }}" +heritage: {{ .Release.Service }} +{{- with .Values.labels }} +{{ toYaml . | indent 4 }} +{{- end }} +data: + # These are system-specified config overrides. + airflow.cfg: | Review comment: Perhaps, but you can set all config options by environment variables: `AIRFLOW__CORE__STORE_SERIALIZED_DAGS` (for instance) via the `env` top level key in Values so it's not needed. Might be a good idea, but not needed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8777: Add Production Helm chart support
ashb commented on a change in pull request #8777: URL: https://github.com/apache/airflow/pull/8777#discussion_r425104164 ## File path: chart/templates/workers/worker-horizontalpodautoscaler.yaml ## @@ -0,0 +1,34 @@ + +## Airflow Worker HorizontalPodAutoscaler Review comment: Should remove this now we have Keda as this never worked that well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8777: Add Production Helm chart support
ashb commented on a change in pull request #8777: URL: https://github.com/apache/airflow/pull/8777#discussion_r425103758 ## File path: chart/values.yaml ## @@ -0,0 +1,438 @@ +# Default values for airflow. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +# User and group of airflow user +uid: 100 +gid: 101 + +# Select certain nodes for airflow pods. +nodeSelector: {} +affinity: {} +tolerations: [] + +# Add common labels to all objects and pods defined in this chart. +labels: {} + +# Ingress configuration +ingress: + # Enable ingress resource + enabled: false + + # Enable for cert-manager or kube-lego + acme: false + + # Name of tls secret to use on ingress + tlsSecretName: ~ + + # Annotations always injected when configuring webserver Ingress ojbect + webserverAnnotations: {} + + # Annotations always injected when configuring Flower Ingress object + flowerAnnotations: {} + + # Base domain for ingress vhosts + baseDomain: ~ + +# Network policy configuration +networkPolicies: + # Enabled network policies + enabled: false + +# Airflow home directory +# Used for mount paths +airflowHome: "/usr/local/airflow" + +# Extra annotations to apply to all +# Airflow pods +airflowPodAnnotations: {} + +# Enable RBAC (default on most clusters these days) +rbacEnabled: true + +# Airflow executor +# Options: SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor +executor: "KubernetesExecutor" + +# If this is true and using LocalExecutor/SequentialExecutor/KubernetesExecutor, the scheudler's +# service account will have access to communicate with the api-server and launch pods. +# If this is true and using the CeleryExecutor, the workers will be able to launch pods. +allowPodLaunching: true + +# Default airflow repository +defaultAirflowRepository: astronomerinc/ap-airflow + +# Default airflow tag to deploy +defaultAirflowTag: 1.10.7-alpine3.10 + +# Images +images: + airflow: +repository: astronomerinc/ap-airflow +tag: ~ +pullPolicy: IfNotPresent + flower: +repository: astronomerinc/ap-airflow +tag: ~ +pullPolicy: IfNotPresent + statsd: +repository: astronomerinc/ap-statsd-exporter +tag: 0.11.0 +pullPolicy: IfNotPresent + redis: +repository: astronomerinc/ap-redis +tag: 0.11.0 +pullPolicy: IfNotPresent + pgbouncer: +repository: astronomerinc/ap-pgbouncer +tag: 0.11.0 +pullPolicy: IfNotPresent + pgbouncerExporter: +repository: astronomerinc/ap-pgbouncer-exporter +tag: 0.11.0 +pullPolicy: IfNotPresent + +# Environment variables for all airflow containers +env: [] +# - name: "" +# value: "" + +# Secrets for all airflow containers +secret: [] +# - envName: "" +# secretName: "" +# secretKey: "" + +# Astronomer Airflow database config Review comment: ```suggestion # Airflow database config ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara edited a comment on pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
j-y-matsubara edited a comment on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-628604933 Thank you for your comments. > This behavior needs to be triple checked. You are assuming that the downstream has only one branch which doesn't join with other branches. > If there is a join there is a question of should the skip cascade or stop at the join. Everything will be skipped, including downstream tasks of the downstream branches. ( This is the same behavior as when soft_fail is set True in the current BaseSensorOperator. ) Add test case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara commented on pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
j-y-matsubara commented on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-628604933 Thank you for your comments. > This behavior needs to be triple checked. You are assuming that the downstream has only one branch which doesn't join with other branches. > If there is a join there is a question of should the skip cascade or stop at the join. Everything will be skipped, including downstream tasks of the downstream branches. ( This is the same behavior as when soft_fail is set True in the current BaseSensorOperator. ) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425101458 ## File path: airflow/models/dag.py ## @@ -1468,6 +1468,10 @@ def create_dagrun(self, :param session: database session :type session: sqlalchemy.orm.session.Session """ +# Resolve relationship between task set by XComArgs +for task in self.tasks: +task.set_xcomargs_dependencies() Review comment: That's my main concern. I will add tests for serialization and see what can be done. That was my main concern, that we have to keep this logic in more than one place (dag bag and somewhere around dag creation without dag bag) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425100781 ## File path: airflow/providers/google/marketing_platform/operators/display_video.py ## @@ -565,7 +565,7 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator): :type delegate_to: str """ -template_fields = ("operation_name", "bucket_name", "object_name", "body_request") +template_fields = ("operation_name", "bucket_name", "object_name") Review comment: I will rebase, it was fixed in other PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425100679 ## File path: airflow/models/baseoperator.py ## @@ -1141,7 +1178,7 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat @property def output(self): Review comment: As far as I know you can use `op.output["your key"]` because `XComArg` implements `__getitem__`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425099398 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: No, it only resolves up/down stream. I will take a look at the lineage This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425099860 ## File path: airflow/providers/google/cloud/operators/automl.py ## @@ -558,7 +558,7 @@ class AutoMLTablesUpdateDatasetOperator(BaseOperator): :type gcp_conn_id: str """ -template_fields = ("dataset", "update_mask", "location", "project_id") +template_fields = ("dataset", "update_mask", "location") Review comment: I will rebase, it was fixed in other PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
turbaszek commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425098996 ## File path: airflow/example_dags/example_xcomargs.py ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example DAG demonstrating the usage of the XComArgs.""" + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago + +args = { +'owner': 'airflow', +'start_date': days_ago(2), +} + + +def dummy(*args, **kwargs): +"""Dummy function""" +return "pass" + + +with DAG( +dag_id='example_xcom_args', +default_args=args, +schedule_interval=None, +tags=['example'] +) as dag: +task1 = PythonOperator( +task_id='task1', +python_callable=dummy, +) + +task2 = PythonOperator( +task_id='task2', +python_callable=dummy, +op_kwargs={"dummy": task1.output}, +) + +task3 = PythonOperator( +task_id='task3', +python_callable=dummy, +) + +task3.op_kwargs = {"dummy": task2.output} Review comment: Yes, I will move it to a test dags folder. I need a file because I have to create dag bag. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jeffolsi commented on pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
jeffolsi commented on pull request #8867: URL: https://github.com/apache/airflow/pull/8867#issuecomment-628585641 This behavior needs to be triple checked. You are assuming that the downstream has only one branch which doesn't join with other branches. If there is a join there is a question of should the skip cascade or stop at the join. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] davido912 edited a comment on issue #8484: Airflow 1.10.7+ suppresses Operator logs
davido912 edited a comment on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628583109 @ewjmulder You added this to airflow_local_settings.py? And, for airflow.contrib, did you import something on there? because I'm failing to find where and what you did to be honest. Would appreciate if you could be a bit more specific. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] davido912 commented on issue #8484: Airflow 1.10.7+ suppresses Operator logs
davido912 commented on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628583109 @ewjmulder You added this to airflow_local_settings.py? And, for airflow.contrib, did you import something on there? because I'm failing to find where and what you did to be honest This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] KimchaC commented on issue #8605: Add Production-ready docker compose for the production image
KimchaC commented on issue #8605: URL: https://github.com/apache/airflow/issues/8605#issuecomment-628575340 Oh, that's super cool. But for that you have to rebuild the entire airflow image? Can you just add the build arg in the docker-compose and it will propagate through to the published airflow image? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] j-y-matsubara opened a new pull request #8867: Add argument to BaseSensorOperator to control whether to skipping all downstream tasks or not
j-y-matsubara opened a new pull request #8867: URL: https://github.com/apache/airflow/pull/8867 This PR is to prevents all downstream tasks from being forcibly skipped when setting soft_fail=True. In addition, conversely, to allows to skip all downstream tasks even if it's not setting up soft_fail or soft_fail=False. [https://github.com/apache/airflow/issues/8696](url) --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425012045 ## File path: airflow/models/baseoperator.py ## @@ -1141,7 +1178,7 @@ def set_upstream(self, task_or_task_list: Union['BaseOperator', List['BaseOperat @property def output(self): Review comment: What about other xcom values than the default `return_value` key? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425011133 ## File path: airflow/providers/google/cloud/operators/automl.py ## @@ -558,7 +558,7 @@ class AutoMLTablesUpdateDatasetOperator(BaseOperator): :type gcp_conn_id: str """ -template_fields = ("dataset", "update_mask", "location", "project_id") +template_fields = ("dataset", "update_mask", "location") Review comment: Unrelated change? ## File path: airflow/providers/google/marketing_platform/operators/display_video.py ## @@ -565,7 +565,7 @@ class GoogleDisplayVideo360SDFtoGCSOperator(BaseOperator): :type delegate_to: str """ -template_fields = ("operation_name", "bucket_name", "object_name", "body_request") +template_fields = ("operation_name", "bucket_name", "object_name") Review comment: Unrelated change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425010809 ## File path: airflow/models/dag.py ## @@ -1468,6 +1468,10 @@ def create_dagrun(self, :param session: database session :type session: sqlalchemy.orm.session.Session """ +# Resolve relationship between task set by XComArgs +for task in self.tasks: +task.set_xcomargs_dependencies() Review comment: This is going to be too late to affect the Graph view of the webserver isn't it? (This either needs to go much, much earlier, to dag creation time so it is correctly reflected in the serialized representation, or this is not needed here at all.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425009815 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +634,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def set_xcomargs_dependencies(self) -> None: Review comment: Does this function set lineage/inlets/outlets too? If not it should please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425009313 ## File path: airflow/example_dags/example_xcomargs.py ## @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example DAG demonstrating the usage of the XComArgs.""" + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.utils.dates import days_ago + +args = { +'owner': 'airflow', +'start_date': days_ago(2), +} + + +def dummy(*args, **kwargs): +"""Dummy function""" +return "pass" + + +with DAG( +dag_id='example_xcom_args', +default_args=args, +schedule_interval=None, +tags=['example'] +) as dag: +task1 = PythonOperator( +task_id='task1', +python_callable=dummy, +) + +task2 = PythonOperator( +task_id='task2', +python_callable=dummy, +op_kwargs={"dummy": task1.output}, +) + +task3 = PythonOperator( +task_id='task3', +python_callable=dummy, +) + +task3.op_kwargs = {"dummy": task2.output} Review comment: Is this set afterwards to check that it works? If so we should have that in a test, but not in an example dag (as we don't want to encourage users to do this, as they often copy what our example dags to.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425008181 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def _set_xcomargs_dependencies(self) -> None: +""" +Resolves upstream dependencies of a task. In this way passing an ``XComArg`` +as value for a template field will result in creating upstream relation between +two tasks. + +**Example**: :: + +with DAG(...): +generate_content = GenerateContentOperator(task_id="generate_content") +send_email = EmailOperator(..., html_content=generate_content.output) + +# This is equivalent to +with DAG(...): +generate_content = GenerateContentOperator(task_id="generate_content") +send_email = EmailOperator( +..., html_content="{{ task_instance.xcom_pull('generate_content') }}" +) +generate_content >> send_email + +""" +from airflow.models.xcom_arg import XComArg + +def apply_set_upstream(arg: Any): +if isinstance(arg, XComArg): +self.set_upstream(arg.operator) +elif isinstance(arg, (tuple, set, list)): +for elem in arg: +apply_set_upstream(elem) +elif isinstance(arg, dict): Review comment: https://github.com/apache/airflow/commit/d567f9ab8d5fdd6d18509fd8d623b26795eca25c Example of such a class from the docs/changelog: ```python class MyDataReader: template_fields = ['path'] def __init__(self, my_path): self.path = my_path # [additional code here...] t = PythonOperator( task_id='transform_data', python_callable=transform_data op_args=[ MyDataReader('/tmp/{{ ds }}/my_file') ], dag=dag) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8805: Resolve upstream tasks when template field is XComArg
ashb commented on a change in pull request #8805: URL: https://github.com/apache/airflow/pull/8805#discussion_r425008181 ## File path: airflow/models/baseoperator.py ## @@ -634,6 +650,43 @@ def deps(self) -> Set[BaseTIDep]: NotPreviouslySkippedDep(), } +def _set_xcomargs_dependencies(self) -> None: +""" +Resolves upstream dependencies of a task. In this way passing an ``XComArg`` +as value for a template field will result in creating upstream relation between +two tasks. + +**Example**: :: + +with DAG(...): +generate_content = GenerateContentOperator(task_id="generate_content") +send_email = EmailOperator(..., html_content=generate_content.output) + +# This is equivalent to +with DAG(...): +generate_content = GenerateContentOperator(task_id="generate_content") +send_email = EmailOperator( +..., html_content="{{ task_instance.xcom_pull('generate_content') }}" +) +generate_content >> send_email + +""" +from airflow.models.xcom_arg import XComArg + +def apply_set_upstream(arg: Any): +if isinstance(arg, XComArg): +self.set_upstream(arg.operator) +elif isinstance(arg, (tuple, set, list)): +for elem in arg: +apply_set_upstream(elem) +elif isinstance(arg, dict): Review comment: https://github.com/apache/airflow/commit/d567f9ab8d5fdd6d18509fd8d623b26795eca25c This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk opened a new issue #8866: Additional extras
potiuk opened a new issue #8866: URL: https://github.com/apache/airflow/issues/8866 **Description** It would be great to be able to pass "ADDITIONAL_EXTRAS" build arg and add extras to the "base" extras of Production Docker image. **Use case / motivation** If someone wants to add just one extra, they might want to specify only that one extra to add rather than copy the whole list **Related Issues** It's been discussed in https://github.com/apache/airflow/issues/8605#issuecomment-628520489 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #8605: Add Production-ready docker compose for the production image
potiuk commented on issue #8605: URL: https://github.com/apache/airflow/issues/8605#issuecomment-628520489 I the preferred way will be to set properly AIRFLOW_EXTRAS variable and pass them as --build-arg They are defined like that in the Dockerfile: ``` ARG AIRFLOW_EXTRAS="async,aws,azure,celery,dask,elasticsearch,gcp,kubernetes,mysql,postgres,redis,slack,ssh,statsd,virtualenv" ``` and when building the dockerfile you can set them as `--build-arg AIRFLOW_EXTRAS=""` I think that maybe it's worth to have "additional extras" and append them though This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ewjmulder edited a comment on issue #8484: Airflow 1.10.7+ suppresses Operator logs
ewjmulder edited a comment on issue #8484: URL: https://github.com/apache/airflow/issues/8484#issuecomment-628512253 We experienced a similar issue after upgrading from 1.10.3 to 1.10.9: no more task logging visible in the UI. In our case it was specifically for the KubernetesPodOperator, but not for the BashOperator for instance. We fixed it by adding a logger for the airflow.contrib package (in the main airflow logging configuration file, see snippet below), since the KubernetesPodOperator lives there. So if the operator you're missing logs from is also from the contrib package, you can try this fix to see if it re-appears. Probably it also works for other operators, as long as you specify the right package they are in as extra logger. My guess is that the combination of forking instead of starting a whole new process + having no specific logger for the package triggered this particular issue. Added log config snippet: ``` 'airflow.contrib': { 'handlers': bucket_handler, 'level': LOG_LEVEL, 'propagate': True, } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org