[jira] [Commented] (AIRFLOW-3239) Test discovery partial fails due to incorrect name of the test files
[ https://issues.apache.org/jira/browse/AIRFLOW-3239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658088#comment-16658088 ] ASF GitHub Bot commented on AIRFLOW-3239: - XD-DENG opened a new pull request #4074: [AIRFLOW-3239] Fix test recovery further URL: https://github.com/apache/incubator-airflow/pull/4074 ### Jira - https://issues.apache.org/jira/browse/AIRFLOW-3239 ### Description Prepend "test_" for - tests/executors/dask_executor.py - tests/security/kerberos.py ### Tests Have tested in my own branch and confirmed "re-enabling" these tests doesn't cause issue/break anything. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Test discovery partial fails due to incorrect name of the test files > > > Key: AIRFLOW-3239 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3239 > Project: Apache Airflow > Issue Type: Bug > Components: tests >Reporter: Xiaodong DENG >Assignee: Xiaodong DENG >Priority: Major > > In PR [https://github.com/apache/incubator-airflow/pull/4049,] I have fixed > the incorrect name of some test files (resulting in partial failure in test > discovery). > There are some other scripts with this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XD-DENG opened a new pull request #4074: [AIRFLOW-3239] Fix test recovery further
XD-DENG opened a new pull request #4074: [AIRFLOW-3239] Fix test recovery further URL: https://github.com/apache/incubator-airflow/pull/4074 ### Jira - https://issues.apache.org/jira/browse/AIRFLOW-3239 ### Description Prepend "test_" for - tests/executors/dask_executor.py - tests/security/kerberos.py ### Tests Have tested in my own branch and confirmed "re-enabling" these tests doesn't cause issue/break anything. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3239) Test discovery partial fails due to incorrect name of the test files
Xiaodong DENG created AIRFLOW-3239: -- Summary: Test discovery partial fails due to incorrect name of the test files Key: AIRFLOW-3239 URL: https://issues.apache.org/jira/browse/AIRFLOW-3239 Project: Apache Airflow Issue Type: Bug Components: tests Reporter: Xiaodong DENG Assignee: Xiaodong DENG In PR [https://github.com/apache/incubator-airflow/pull/4049,] I have fixed the incorrect name of some test files (resulting in partial failure in test discovery). There are some other scripts with this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io edited a comment on issue #3475: [AIRFLOW-2315] Improve S3Hook
codecov-io edited a comment on issue #3475: [AIRFLOW-2315] Improve S3Hook URL: https://github.com/apache/incubator-airflow/pull/3475#issuecomment-413254466 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=h1) Report > Merging [#3475](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/b8be322d3badfeadfa8f08e0bf92a12a6cd26418?src=pr=desc) will **increase** coverage by `1.89%`. > The diff coverage is `93.75%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/3475/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=tree) ```diff @@Coverage Diff @@ ## master#3475 +/- ## == + Coverage 75.79% 77.69% +1.89% == Files 199 204 +5 Lines 1594615850 -96 == + Hits1208612314 +228 + Misses 3860 3536 -324 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/hooks/S3\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9TM19ob29rLnB5) | `96.77% <93.75%> (+2.44%)` | :arrow_up: | | [airflow/sensors/s3\_key\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3MzX2tleV9zZW5zb3IucHk=) | `31.03% <0%> (-68.97%)` | :arrow_down: | | [airflow/sensors/s3\_prefix\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3MzX3ByZWZpeF9zZW5zb3IucHk=) | `41.17% <0%> (-58.83%)` | :arrow_down: | | [airflow/utils/helpers.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9oZWxwZXJzLnB5) | `71.34% <0%> (-13.04%)` | :arrow_down: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `78% <0%> (-12%)` | :arrow_down: | | [airflow/sensors/sql\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3NxbF9zZW5zb3IucHk=) | `90.47% <0%> (-9.53%)` | :arrow_down: | | [airflow/configuration.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9jb25maWd1cmF0aW9uLnB5) | `84.07% <0%> (-5.19%)` | :arrow_down: | | [airflow/utils/state.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zdGF0ZS5weQ==) | `93.33% <0%> (-3.34%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `88.74% <0%> (-2.97%)` | :arrow_down: | | [airflow/utils/email.py](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9lbWFpbC5weQ==) | `97.4% <0%> (-2.6%)` | :arrow_down: | | ... and [61 more](https://codecov.io/gh/apache/incubator-airflow/pull/3475/diff?src=pr=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=footer). Last update [b8be322...5d4960d](https://codecov.io/gh/apache/incubator-airflow/pull/3475?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3238) Dags, removed from the filesystem, are not deactivated on initdb
[ https://issues.apache.org/jira/browse/AIRFLOW-3238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658073#comment-16658073 ] ASF GitHub Bot commented on AIRFLOW-3238: - jason-udacity opened a new pull request #4073: [AIRFLOW-3238] Fix models.DAG.deactivate_unknown_dags URL: https://github.com/apache/incubator-airflow/pull/4073 Unknown dags are now deactivated on initdb Make sure you have checked _all_ steps below. ### Jira - [ ] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [ ] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Dags, removed from the filesystem, are not deactivated on initdb > > > Key: AIRFLOW-3238 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3238 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Reporter: Jason Shao >Assignee: Jason Shao >Priority: Major > > Removed dags continue to show up in the airflow UI. This can only be > remedied, currently, by either deleting the dag or modifying the internal > meta db. Fix models.DAG.deactivate_unknown_dags so that removed dags are > automatically deactivated (hidden from the UI) on restart. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] jason-udacity opened a new pull request #4073: [AIRFLOW-3238] Fix models.DAG.deactivate_unknown_dags
jason-udacity opened a new pull request #4073: [AIRFLOW-3238] Fix models.DAG.deactivate_unknown_dags URL: https://github.com/apache/incubator-airflow/pull/4073 Unknown dags are now deactivated on initdb Make sure you have checked _all_ steps below. ### Jira - [ ] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [ ] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3238) Dags, removed from the filesystem, are not deactivated on initdb
Jason Shao created AIRFLOW-3238: --- Summary: Dags, removed from the filesystem, are not deactivated on initdb Key: AIRFLOW-3238 URL: https://issues.apache.org/jira/browse/AIRFLOW-3238 Project: Apache Airflow Issue Type: Bug Components: DAG Reporter: Jason Shao Assignee: Jason Shao Removed dags continue to show up in the airflow UI. This can only be remedied, currently, by either deleting the dag or modifying the internal meta db. Fix models.DAG.deactivate_unknown_dags so that removed dags are automatically deactivated (hidden from the UI) on restart. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] XD-DENG commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
XD-DENG commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226844985 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or Review comment: I just happened to notice it. A bit sensitive to this since I kept making this error as well previously. Running Sphinx may not work. There would be no parsing exception raised explicitly. Instead, it would silently "fail" the proper displaying of the first parameter directly. @kaxil kindly helped fix quite many such cases previously (a big ) So may need committers to pay extra attention to docstrings. Will let you know if I find any way to check this in CI later. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test
XD-DENG commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test URL: https://github.com/apache/incubator-airflow/pull/4049#issuecomment-431630635 Thanks @Fokko This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-431626151 Sorry @KevinYang21 , I was on vacation last month and didn't go through the whole PR. But I assume @Fokko has gone through. So I am ok from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao edited a comment on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
feng-tao edited a comment on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-431626151 Sorry @KevinYang21 , I was on vacation for the last month and didn't go through the whole PR. But I assume @Fokko has gone through. So I am ok from my side. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-2239) current docs refer to incorrect command reset db rather than resetdb found in 1.9.0
[ https://issues.apache.org/jira/browse/AIRFLOW-2239?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-2239. - Resolution: Implemented > current docs refer to incorrect command reset db rather than resetdb found in > 1.9.0 > --- > > Key: AIRFLOW-2239 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2239 > Project: Apache Airflow > Issue Type: Bug > Components: Documentation >Reporter: Taylor Miller >Priority: Minor > Labels: docuentation > > The docs on [https://airflow.apache.org/cli.html?highlight=reset] need to be > updated to match the default version installed by pip. > h2. Steps to Reproduce > # A new user installing airflow via `pip install airflow` currently get > version 1.9.0. > # The current documentation on > [https://airflow.apache.org/cli.html?highlight=reset] refers to an older > version of the cli > # If the user tries the command `airflow reset db` it fails because the > command is now `airflow resetdb` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-520) Show Airflow version on web page
[ https://issues.apache.org/jira/browse/AIRFLOW-520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658015#comment-16658015 ] ASF GitHub Bot commented on AIRFLOW-520: kaxil opened a new pull request #4072: [AIRFLOW-520] Fix Version Info in Flask UI URL: https://github.com/apache/incubator-airflow/pull/4072 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Show Airflow version on web page > > > Key: AIRFLOW-520 > URL: https://issues.apache.org/jira/browse/AIRFLOW-520 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Jakob Homan >Priority: Major > > It would be nice if the currently running version were included somewhere on > the web page. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kaxil opened a new pull request #4072: [AIRFLOW-520] Fix Version Info in Flask UI
kaxil opened a new pull request #4072: [AIRFLOW-520] Fix Version Info in Flask UI URL: https://github.com/apache/incubator-airflow/pull/4072 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-1228) Fix readthedocs timeout
[ https://issues.apache.org/jira/browse/AIRFLOW-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-1228. - Resolution: Implemented > Fix readthedocs timeout > --- > > Key: AIRFLOW-1228 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1228 > Project: Apache Airflow > Issue Type: Bug >Reporter: Maxime Beauchemin >Priority: Major > > RTD is a service that build documentation automatically on the latest master > and allow for serving specific versions of the docs. This will allow us to > have each Apache release have its own documentation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] kaxil commented on a change in pull request #4071: [AIRFLOW-3237] Refactor example DAGs
kaxil commented on a change in pull request #4071: [AIRFLOW-3237] Refactor example DAGs URL: https://github.com/apache/incubator-airflow/pull/4071#discussion_r226841724 ## File path: airflow/example_dags/example_branch_operator.py ## @@ -17,43 +17,47 @@ # specific language governing permissions and limitations # under the License. -import airflow -from airflow.operators.python_operator import BranchPythonOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG import random +import airflow +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator +from airflow.operators.python_operator import BranchPythonOperator -args = { -'owner': 'airflow', -'start_date': airflow.utils.dates.days_ago(2) -} +args = {'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2)} Review comment: Same here This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil commented on a change in pull request #4071: [AIRFLOW-3237] Refactor example DAGs
kaxil commented on a change in pull request #4071: [AIRFLOW-3237] Refactor example DAGs URL: https://github.com/apache/incubator-airflow/pull/4071#discussion_r226841710 ## File path: airflow/example_dags/example_bash_operator.py ## @@ -17,48 +17,54 @@ # specific language governing permissions and limitations # under the License. -import airflow from builtins import range -from airflow.operators.bash_operator import BashOperator -from airflow.operators.dummy_operator import DummyOperator -from airflow.models import DAG from datetime import timedelta +import airflow +from airflow.models import DAG +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy_operator import DummyOperator -args = { -'owner': 'airflow', -'start_date': airflow.utils.dates.days_ago(2) -} +args = {'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2)} Review comment: Don't understand why we need this refactor? It looked better formatted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] stale[bot] commented on issue #2398: [AIRFLOW-450] Remove http example operator
stale[bot] commented on issue #2398: [AIRFLOW-450] Remove http example operator URL: https://github.com/apache/incubator-airflow/pull/2398#issuecomment-431619881 This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-3237) Refactor example DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-3237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bas Harenslak updated AIRFLOW-3237: --- Description: I think it's important to have the example DAGs reflect good programming standards and how to write clean and concise Airflow code. Therefore, refactor the example DAGs, which mostly involves: * Replace set_upstream() and set_downstream() by the bitshift operators, since they exist since Airflow 1.8 and everybody uses those now. * Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). * Place function calls with arguments on single line if line is short enough, or put each function argument on a new line, but don't mix. * Passing list of tasks when setting dependencies instead of making multiple set_up/down-stream calls. * Switch upstream dependencies into downstream dependencies since these are more logical to read IMO. was: I think it's important to have the example DAGs reflect good programming standards and how to write clean and concise Airflow code. Therefore, refactor the example DAGs, which mostly involves: * Replace set_upstream() and set_downstream() by the bitshift operators, since they exist since Airflow 1.8 and everybody uses those now. * Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). * Place function calls with arguments on single line if line is short enough, or put each function argument on a new line, but don't mix. * Passing list of tasks when setting dependencies instead of making multiple set_up/down-stream calls. > Refactor example DAGs > - > > Key: AIRFLOW-3237 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3237 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bas Harenslak >Priority: Major > > I think it's important to have the example DAGs reflect good programming > standards and how to write clean and concise Airflow code. Therefore, > refactor the example DAGs, which mostly involves: > * Replace set_upstream() and set_downstream() by the bitshift operators, > since they exist since Airflow 1.8 and everybody uses those now. > * Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). > * Place function calls with arguments on single line if line is short enough, > or put each function argument on a new line, but don't mix. > * Passing list of tasks when setting dependencies instead of making multiple > set_up/down-stream calls. > * Switch upstream dependencies into downstream dependencies since these are > more logical to read IMO. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] BasPH opened a new pull request #4071: [AIRFLOW-3237] Refactor example DAGs
BasPH opened a new pull request #4071: [AIRFLOW-3237] Refactor example DAGs URL: https://github.com/apache/incubator-airflow/pull/4071 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. [AIRFLOW-3237](https://issues.apache.org/jira/browse/AIRFLOW-3237) ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: I think it's important to have the example DAGs reflect good programming standards and how to write clean and concise Airflow code. Therefore, refactor the example DAGs, which mostly involves: - Replace set_upstream() and set_downstream() by the bitshift operators, since they exist since Airflow 1.8 and everybody uses those now. - Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). - Place function calls with arguments on single line if line is short enough, or put each function argument on a new line, but don't mix. - Passing list of tasks when setting dependencies instead of making multiple set_up/down-stream calls. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Only refactored some code, all existing tests should pass. ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. No new functionality. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3237) Refactor example DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-3237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657968#comment-16657968 ] ASF GitHub Bot commented on AIRFLOW-3237: - BasPH opened a new pull request #4071: [AIRFLOW-3237] Refactor example DAGs URL: https://github.com/apache/incubator-airflow/pull/4071 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. [AIRFLOW-3237](https://issues.apache.org/jira/browse/AIRFLOW-3237) ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: I think it's important to have the example DAGs reflect good programming standards and how to write clean and concise Airflow code. Therefore, refactor the example DAGs, which mostly involves: - Replace set_upstream() and set_downstream() by the bitshift operators, since they exist since Airflow 1.8 and everybody uses those now. - Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). - Place function calls with arguments on single line if line is short enough, or put each function argument on a new line, but don't mix. - Passing list of tasks when setting dependencies instead of making multiple set_up/down-stream calls. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Only refactored some code, all existing tests should pass. ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. No new functionality. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor example DAGs > - > > Key: AIRFLOW-3237 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3237 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bas Harenslak >Priority: Major > > I think it's important to have the example DAGs reflect good programming > standards and how to write clean and concise Airflow code. Therefore, > refactor the example DAGs, which mostly involves: > * Replace set_upstream() and set_downstream() by the bitshift operators, > since they exist since Airflow 1.8 and everybody uses those now. > * Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). > * Place function calls with arguments on single line if line is short enough, > or put each function argument on a new line, but don't mix. > * Passing list of tasks when setting dependencies instead of making multiple > set_up/down-stream calls. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3237) Refactor example DAGs
Bas Harenslak created AIRFLOW-3237: -- Summary: Refactor example DAGs Key: AIRFLOW-3237 URL: https://issues.apache.org/jira/browse/AIRFLOW-3237 Project: Apache Airflow Issue Type: Improvement Reporter: Bas Harenslak I think it's important to have the example DAGs reflect good programming standards and how to write clean and concise Airflow code. Therefore, refactor the example DAGs, which mostly involves: * Replace set_upstream() and set_downstream() by the bitshift operators, since they exist since Airflow 1.8 and everybody uses those now. * Optimise imports (grouping by stdlib/3rd party/own and alphabetic ordering). * Place function calls with arguments on single line if line is short enough, or put each function argument on a new line, but don't mix. * Passing list of tasks when setting dependencies instead of making multiple set_up/down-stream calls. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3236) Create AzureDataLakeStorageListOperator
Brandon Kvarda created AIRFLOW-3236: --- Summary: Create AzureDataLakeStorageListOperator Key: AIRFLOW-3236 URL: https://issues.apache.org/jira/browse/AIRFLOW-3236 Project: Apache Airflow Issue Type: Improvement Components: operators Reporter: Brandon Kvarda Assignee: Brandon Kvarda Creates an Operator that is similar to GoogleCloudStorageListOperator and S3ListOperator that returns a list of files at some specified path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io commented on issue #4070: [AIRFLOW-3235] Add list function in AzureDataLakeHook
codecov-io commented on issue #4070: [AIRFLOW-3235] Add list function in AzureDataLakeHook URL: https://github.com/apache/incubator-airflow/pull/4070#issuecomment-431608176 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4070?src=pr=h1) Report > :exclamation: No coverage uploaded for pull request base (`master@b156151`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit). > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4070/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4070?src=pr=tree) ```diff @@Coverage Diff@@ ## master#4070 +/- ## = Coverage ? 77.69% = Files ? 199 Lines ?15957 Branches ?0 = Hits ?12397 Misses? 3560 Partials ?0 ``` -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4070?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4070?src=pr=footer). Last update [b156151...ac82b22](https://codecov.io/gh/apache/incubator-airflow/pull/4070?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3235) Add list function in AzureDataLakeHook
[ https://issues.apache.org/jira/browse/AIRFLOW-3235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657940#comment-16657940 ] ASF GitHub Bot commented on AIRFLOW-3235: - bkvarda opened a new pull request #4070: [AIRFLOW-3235] Add list function in AzureDataLakeHook URL: https://github.com/apache/incubator-airflow/pull/4070 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW-3235) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-3235 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - This PR adds a list function that returns a list of objects at some specified Azure Data Lake Storage path, something other similar Hooks for S3 and GCS have. It will be useful for Operators developed that leverage this Hook. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: test_list_glob test_list_walk ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add list function in AzureDataLakeHook > -- > > Key: AIRFLOW-3235 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3235 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Reporter: Brandon Kvarda >Assignee: Brandon Kvarda >Priority: Minor > > AzureDataLakeHook could use a list function similar to those of both the GCS > and S3 hooks. This could be the basis for an ADLS list operator (again, which > both GCS and S3 have). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bkvarda opened a new pull request #4070: [AIRFLOW-3235] Add list function in AzureDataLakeHook
bkvarda opened a new pull request #4070: [AIRFLOW-3235] Add list function in AzureDataLakeHook URL: https://github.com/apache/incubator-airflow/pull/4070 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW-3235) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-3235 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - This PR adds a list function that returns a list of objects at some specified Azure Data Lake Storage path, something other similar Hooks for S3 and GCS have. It will be useful for Operators developed that leverage this Hook. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: test_list_glob test_list_walk ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3193) Pin docker requirement version
[ https://issues.apache.org/jira/browse/AIRFLOW-3193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657938#comment-16657938 ] ASF GitHub Bot commented on AIRFLOW-3193: - Fokko closed pull request #4042: [AIRFLOW-3193] Pin docker requirement version URL: https://github.com/apache/incubator-airflow/pull/4042 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/setup.py b/setup.py index 76f55ab01b..733e22f3b5 100644 --- a/setup.py +++ b/setup.py @@ -175,7 +175,7 @@ def write_version(filename=os.path.join(*['airflow', 'sphinx-rtd-theme>=0.1.6', 'Sphinx-PyPI-upload>=0.2.1' ] -docker = ['docker>=2.0.0'] +docker = ['docker>=2.0.0,<3.0.0'] druid = ['pydruid>=0.4.1'] elasticsearch = [ 'elasticsearch>=5.0.0,<6.0.0', This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Pin docker requirement version > -- > > Key: AIRFLOW-3193 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3193 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Guoqiang Ding >Assignee: Guoqiang Ding >Priority: Major > > The method "create_container" in APIClient of docker has been incompatible > from version 3.0.0. > > Usage in `_airflow.operators.docker_operator_` as follows. > > {code:java} > self.container = self.cli.create_container( > command=self.get_command(), > cpu_shares=cpu_shares, > environment=self.environment, > host_config=self.cli.create_host_config( > binds=self.volumes, > network_mode=self.network_mode, > shm_size=self.shm_size, > dns=self.dns, > dns_search=self.dns_search), > image=image, > mem_limit=self.mem_limit, > user=self.user, > working_dir=self.working_dir > ) > {code} > > The arguments such as "cpu_shares" and "mem_limit" has gone off. In other > words, after version 3.0.0, they should be passed into `create_host_config` > method. > > {quote}airflow usage code link: > https://github.com/apache/incubator-airflow/blob/cdbdcae7c0645ac2987360fced43407202716b99/airflow/operators/docker_operator.py#L207 > {quote} > > {quote}version 3.0.0 code link: > https://github.com/docker/docker-py/blob/91bc75cc92f578ae9d659ad7e8ed11a0877b70aa/docker/api/container.py#L206 > {quote} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko closed pull request #4042: [AIRFLOW-3193] Pin docker requirement version
Fokko closed pull request #4042: [AIRFLOW-3193] Pin docker requirement version URL: https://github.com/apache/incubator-airflow/pull/4042 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/setup.py b/setup.py index 76f55ab01b..733e22f3b5 100644 --- a/setup.py +++ b/setup.py @@ -175,7 +175,7 @@ def write_version(filename=os.path.join(*['airflow', 'sphinx-rtd-theme>=0.1.6', 'Sphinx-PyPI-upload>=0.2.1' ] -docker = ['docker>=2.0.0'] +docker = ['docker>=2.0.0,<3.0.0'] druid = ['pydruid>=0.4.1'] elasticsearch = [ 'elasticsearch>=5.0.0,<6.0.0', This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on issue #4042: [AIRFLOW-3193] Pin docker requirement version
Fokko commented on issue #4042: [AIRFLOW-3193] Pin docker requirement version URL: https://github.com/apache/incubator-airflow/pull/4042#issuecomment-431605000 @deagon Please check if the error persists with #4049 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (AIRFLOW-3193) Pin docker requirement version
[ https://issues.apache.org/jira/browse/AIRFLOW-3193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong closed AIRFLOW-3193. - Resolution: Fixed > Pin docker requirement version > -- > > Key: AIRFLOW-3193 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3193 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Guoqiang Ding >Assignee: Guoqiang Ding >Priority: Major > > The method "create_container" in APIClient of docker has been incompatible > from version 3.0.0. > > Usage in `_airflow.operators.docker_operator_` as follows. > > {code:java} > self.container = self.cli.create_container( > command=self.get_command(), > cpu_shares=cpu_shares, > environment=self.environment, > host_config=self.cli.create_host_config( > binds=self.volumes, > network_mode=self.network_mode, > shm_size=self.shm_size, > dns=self.dns, > dns_search=self.dns_search), > image=image, > mem_limit=self.mem_limit, > user=self.user, > working_dir=self.working_dir > ) > {code} > > The arguments such as "cpu_shares" and "mem_limit" has gone off. In other > words, after version 3.0.0, they should be passed into `create_host_config` > method. > > {quote}airflow usage code link: > https://github.com/apache/incubator-airflow/blob/cdbdcae7c0645ac2987360fced43407202716b99/airflow/operators/docker_operator.py#L207 > {quote} > > {quote}version 3.0.0 code link: > https://github.com/docker/docker-py/blob/91bc75cc92f578ae9d659ad7e8ed11a0877b70aa/docker/api/container.py#L206 > {quote} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test
Fokko commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test URL: https://github.com/apache/incubator-airflow/pull/4049#issuecomment-431604925 @ashb Cherry-picked it onto the 1.10.1 branch This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3203) Bugs in DockerOperator & Some operator test scripts were named incorrectly
[ https://issues.apache.org/jira/browse/AIRFLOW-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657936#comment-16657936 ] ASF GitHub Bot commented on AIRFLOW-3203: - Fokko closed pull request #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test URL: https://github.com/apache/incubator-airflow/pull/4049 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 517199be51..07ad5f5c74 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -43,6 +43,7 @@ class DockerOperator(BaseOperator): be provided with the parameter ``docker_conn_id``. :param image: Docker image from which to create the container. +If image tag is omitted, "latest" will be used. :type image: str :param api_version: Remote API version. Set to ``auto`` to automatically detect the server's version. @@ -62,7 +63,7 @@ class DockerOperator(BaseOperator): :type docker_url: str :param environment: Environment variables to set in the container. (templated) :type environment: dict -:param force_pull: Pull the docker image on every run. Default is false. +:param force_pull: Pull the docker image on every run. Default is False. :type force_pull: bool :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, @@ -187,35 +188,28 @@ def execute(self, context): tls=tls_config ) -if ':' not in self.image: -image = self.image + ':latest' -else: -image = self.image - -if self.force_pull or len(self.cli.images(name=image)) == 0: -self.log.info('Pulling docker image %s', image) -for l in self.cli.pull(image, stream=True): +if self.force_pull or len(self.cli.images(name=self.image)) == 0: +self.log.info('Pulling docker image %s', self.image) +for l in self.cli.pull(self.image, stream=True): output = json.loads(l.decode('utf-8')) self.log.info("%s", output['status']) -cpu_shares = int(round(self.cpus * 1024)) - with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) self.container = self.cli.create_container( command=self.get_command(), -cpu_shares=cpu_shares, environment=self.environment, host_config=self.cli.create_host_config( binds=self.volumes, network_mode=self.network_mode, shm_size=self.shm_size, dns=self.dns, -dns_search=self.dns_search), -image=image, -mem_limit=self.mem_limit, +dns_search=self.dns_search, +cpu_shares=int(round(self.cpus * 1024)), +mem_limit=self.mem_limit), +image=self.image, user=self.user, working_dir=self.working_dir ) diff --git a/tests/operators/docker_operator.py b/tests/operators/test_docker_operator.py similarity index 97% rename from tests/operators/docker_operator.py rename to tests/operators/test_docker_operator.py index ea90c53c28..a7d63e4ebc 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/test_docker_operator.py @@ -64,20 +64,22 @@ def test_execute(self, client_class_mock, mkdtemp_mock): client_class_mock.assert_called_with(base_url='unix://var/run/docker.sock', tls=None, version='1.19') -client_mock.create_container.assert_called_with(command='env', cpu_shares=1024, +client_mock.create_container.assert_called_with(command='env', environment={ 'AIRFLOW_TMP_DIR': '/tmp/airflow', 'UNIT': 'TEST' }, host_config=host_config, image='ubuntu:latest', -mem_limit=None, user=None, +user=None,
[GitHub] Fokko closed pull request #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test
Fokko closed pull request #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test URL: https://github.com/apache/incubator-airflow/pull/4049 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/operators/docker_operator.py b/airflow/operators/docker_operator.py index 517199be51..07ad5f5c74 100644 --- a/airflow/operators/docker_operator.py +++ b/airflow/operators/docker_operator.py @@ -43,6 +43,7 @@ class DockerOperator(BaseOperator): be provided with the parameter ``docker_conn_id``. :param image: Docker image from which to create the container. +If image tag is omitted, "latest" will be used. :type image: str :param api_version: Remote API version. Set to ``auto`` to automatically detect the server's version. @@ -62,7 +63,7 @@ class DockerOperator(BaseOperator): :type docker_url: str :param environment: Environment variables to set in the container. (templated) :type environment: dict -:param force_pull: Pull the docker image on every run. Default is false. +:param force_pull: Pull the docker image on every run. Default is False. :type force_pull: bool :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, @@ -187,35 +188,28 @@ def execute(self, context): tls=tls_config ) -if ':' not in self.image: -image = self.image + ':latest' -else: -image = self.image - -if self.force_pull or len(self.cli.images(name=image)) == 0: -self.log.info('Pulling docker image %s', image) -for l in self.cli.pull(image, stream=True): +if self.force_pull or len(self.cli.images(name=self.image)) == 0: +self.log.info('Pulling docker image %s', self.image) +for l in self.cli.pull(self.image, stream=True): output = json.loads(l.decode('utf-8')) self.log.info("%s", output['status']) -cpu_shares = int(round(self.cpus * 1024)) - with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir: self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir)) self.container = self.cli.create_container( command=self.get_command(), -cpu_shares=cpu_shares, environment=self.environment, host_config=self.cli.create_host_config( binds=self.volumes, network_mode=self.network_mode, shm_size=self.shm_size, dns=self.dns, -dns_search=self.dns_search), -image=image, -mem_limit=self.mem_limit, +dns_search=self.dns_search, +cpu_shares=int(round(self.cpus * 1024)), +mem_limit=self.mem_limit), +image=self.image, user=self.user, working_dir=self.working_dir ) diff --git a/tests/operators/docker_operator.py b/tests/operators/test_docker_operator.py similarity index 97% rename from tests/operators/docker_operator.py rename to tests/operators/test_docker_operator.py index ea90c53c28..a7d63e4ebc 100644 --- a/tests/operators/docker_operator.py +++ b/tests/operators/test_docker_operator.py @@ -64,20 +64,22 @@ def test_execute(self, client_class_mock, mkdtemp_mock): client_class_mock.assert_called_with(base_url='unix://var/run/docker.sock', tls=None, version='1.19') -client_mock.create_container.assert_called_with(command='env', cpu_shares=1024, +client_mock.create_container.assert_called_with(command='env', environment={ 'AIRFLOW_TMP_DIR': '/tmp/airflow', 'UNIT': 'TEST' }, host_config=host_config, image='ubuntu:latest', -mem_limit=None, user=None, +user=None, working_dir='/container/path' ) client_mock.create_host_config.assert_called_with(binds=['/host/path:/container/path',
[jira] [Closed] (AIRFLOW-3203) Bugs in DockerOperator & Some operator test scripts were named incorrectly
[ https://issues.apache.org/jira/browse/AIRFLOW-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong closed AIRFLOW-3203. - Resolution: Fixed Fix Version/s: 1.10.1 > Bugs in DockerOperator & Some operator test scripts were named incorrectly > -- > > Key: AIRFLOW-3203 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3203 > Project: Apache Airflow > Issue Type: Bug > Components: operators, tests >Affects Versions: 1.10.0 >Reporter: Xiaodong DENG >Assignee: Xiaodong DENG >Priority: Critical > Fix For: 1.10.1 > > > Usage of `cpu_shares` and `cpu_shares` is incorrect in DockerOperator, based > on documentation of Python package "docker". > In addition, its test is not really working due to incorrect file name. This > also happens for some other test scripts for Operators. This results in test > discovery failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko commented on issue #4060: [AIRFLOW-3222] Remove bql keyword from BigQueryOperator
Fokko commented on issue #4060: [AIRFLOW-3222] Remove bql keyword from BigQueryOperator URL: https://github.com/apache/incubator-airflow/pull/4060#issuecomment-431604439 Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko removed a comment on issue #4060: [AIRFLOW-3222] Remove bql keyword from BigQueryOperator
Fokko removed a comment on issue #4060: [AIRFLOW-3222] Remove bql keyword from BigQueryOperator URL: https://github.com/apache/incubator-airflow/pull/4060#issuecomment-431604439 Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3133) Implement xcom_push flag for contrib's operators
[ https://issues.apache.org/jira/browse/AIRFLOW-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657933#comment-16657933 ] ASF GitHub Bot commented on AIRFLOW-3133: - Fokko closed pull request #3981: [AIRFLOW-3133] Implement xcom_push flag for contrib's operators URL: https://github.com/apache/incubator-airflow/pull/3981 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index f5e6e50f06..f96392945d 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -66,6 +66,8 @@ class BigQueryGetDataOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str +:param xcom_push: return the result which also get set in XCOM +:type xcom_push: bool """ template_fields = ('dataset_id', 'table_id', 'max_results') ui_color = '#e4f0e8' @@ -78,6 +80,7 @@ def __init__(self, selected_fields=None, bigquery_conn_id='bigquery_default', delegate_to=None, + xcom_push=True, *args, **kwargs): super(BigQueryGetDataOperator, self).__init__(*args, **kwargs) @@ -87,6 +90,7 @@ def __init__(self, self.selected_fields = selected_fields self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to +self.xcom_push_flag = xcom_push def execute(self, context): self.log.info('Fetching Data from:') @@ -113,4 +117,5 @@ def execute(self, context): single_row.append(fields['v']) table_data.append(single_row) -return table_data +if self.xcom_push_flag: +return table_data diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index 3ebc729f78..49b55b4bc8 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -63,11 +63,11 @@ def _handle_databricks_operator_execution(operator, hook, log, context): :param operator: Databricks operator being handled :param context: Airflow context """ -if operator.do_xcom_push: +if operator.xcom_push: context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id) log.info('Run submitted with run_id: %s', operator.run_id) run_page_url = hook.get_run_page_url(operator.run_id) -if operator.do_xcom_push: +if operator.xcom_push: context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url) log.info('View run status, Spark UI, and logs at %s', run_page_url) @@ -209,8 +209,8 @@ class DatabricksSubmitRunOperator(BaseOperator): :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). :type databricks_retry_delay: float -:param do_xcom_push: Whether we should push run_id and run_page_url to xcom. -:type do_xcom_push: bool +:param xcom_push: Whether we should push run_id and run_page_url to xcom. +:type xcom_push: bool """ # Used in airflow.models.BaseOperator template_fields = ('json',) @@ -232,7 +232,7 @@ def __init__( polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, -do_xcom_push=False, +xcom_push=False, **kwargs): """ Creates a new ``DatabricksSubmitRunOperator``. @@ -263,7 +263,7 @@ def __init__( self.json = _deep_string_coerce(self.json) # This variable will be used in case our task gets killed. self.run_id = None -self.do_xcom_push = do_xcom_push +self.xcom_push_flag = xcom_push def get_hook(self): return DatabricksHook( @@ -410,8 +410,8 @@ class DatabricksRunNowOperator(BaseOperator): :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :type databricks_retry_limit: int -:param do_xcom_push: Whether we should push run_id and run_page_url to xcom. -:type do_xcom_push: bool +:param xcom_push: Whether we should push run_id and run_page_url to xcom. +:type xcom_push: bool """ # Used in airflow.models.BaseOperator template_fields = ('json',) @@ -430,7 +430,7 @@ def __init__( polling_period_seconds=30, databricks_retry_limit=3,
[jira] [Closed] (AIRFLOW-3133) Implement xcom_push flag for contrib's operators
[ https://issues.apache.org/jira/browse/AIRFLOW-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong closed AIRFLOW-3133. - Resolution: Won't Fix > Implement xcom_push flag for contrib's operators > - > > Key: AIRFLOW-3133 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3133 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Affects Versions: 1.10.0 >Reporter: Minh Quan TRAN >Priority: Minor > > disable `return True` for SSHOperator since there is already a flag to push > to XCOM the stdout, pushing True to XCOM automatically might be too noisy for > XCOM database. > There are other operators that do this as well, and it doesn't provide any > value - it just creates an entry in the XCom table that is redundant - we > already know that the task succeeded. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko commented on issue #3981: [AIRFLOW-3133] Implement xcom_push flag for contrib's operators
Fokko commented on issue #3981: [AIRFLOW-3133] Implement xcom_push flag for contrib's operators URL: https://github.com/apache/incubator-airflow/pull/3981#issuecomment-431604152 @itscaro Thanks for the PR, but lets work on a global implementation as provided in #2092 #4056 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko closed pull request #3981: [AIRFLOW-3133] Implement xcom_push flag for contrib's operators
Fokko closed pull request #3981: [AIRFLOW-3133] Implement xcom_push flag for contrib's operators URL: https://github.com/apache/incubator-airflow/pull/3981 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index f5e6e50f06..f96392945d 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -66,6 +66,8 @@ class BigQueryGetDataOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str +:param xcom_push: return the result which also get set in XCOM +:type xcom_push: bool """ template_fields = ('dataset_id', 'table_id', 'max_results') ui_color = '#e4f0e8' @@ -78,6 +80,7 @@ def __init__(self, selected_fields=None, bigquery_conn_id='bigquery_default', delegate_to=None, + xcom_push=True, *args, **kwargs): super(BigQueryGetDataOperator, self).__init__(*args, **kwargs) @@ -87,6 +90,7 @@ def __init__(self, self.selected_fields = selected_fields self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to +self.xcom_push_flag = xcom_push def execute(self, context): self.log.info('Fetching Data from:') @@ -113,4 +117,5 @@ def execute(self, context): single_row.append(fields['v']) table_data.append(single_row) -return table_data +if self.xcom_push_flag: +return table_data diff --git a/airflow/contrib/operators/databricks_operator.py b/airflow/contrib/operators/databricks_operator.py index 3ebc729f78..49b55b4bc8 100644 --- a/airflow/contrib/operators/databricks_operator.py +++ b/airflow/contrib/operators/databricks_operator.py @@ -63,11 +63,11 @@ def _handle_databricks_operator_execution(operator, hook, log, context): :param operator: Databricks operator being handled :param context: Airflow context """ -if operator.do_xcom_push: +if operator.xcom_push: context['ti'].xcom_push(key=XCOM_RUN_ID_KEY, value=operator.run_id) log.info('Run submitted with run_id: %s', operator.run_id) run_page_url = hook.get_run_page_url(operator.run_id) -if operator.do_xcom_push: +if operator.xcom_push: context['ti'].xcom_push(key=XCOM_RUN_PAGE_URL_KEY, value=run_page_url) log.info('View run status, Spark UI, and logs at %s', run_page_url) @@ -209,8 +209,8 @@ class DatabricksSubmitRunOperator(BaseOperator): :param databricks_retry_delay: Number of seconds to wait between retries (it might be a floating point number). :type databricks_retry_delay: float -:param do_xcom_push: Whether we should push run_id and run_page_url to xcom. -:type do_xcom_push: bool +:param xcom_push: Whether we should push run_id and run_page_url to xcom. +:type xcom_push: bool """ # Used in airflow.models.BaseOperator template_fields = ('json',) @@ -232,7 +232,7 @@ def __init__( polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, -do_xcom_push=False, +xcom_push=False, **kwargs): """ Creates a new ``DatabricksSubmitRunOperator``. @@ -263,7 +263,7 @@ def __init__( self.json = _deep_string_coerce(self.json) # This variable will be used in case our task gets killed. self.run_id = None -self.do_xcom_push = do_xcom_push +self.xcom_push_flag = xcom_push def get_hook(self): return DatabricksHook( @@ -410,8 +410,8 @@ class DatabricksRunNowOperator(BaseOperator): :param databricks_retry_limit: Amount of times retry if the Databricks backend is unreachable. Its value must be greater than or equal to 1. :type databricks_retry_limit: int -:param do_xcom_push: Whether we should push run_id and run_page_url to xcom. -:type do_xcom_push: bool +:param xcom_push: Whether we should push run_id and run_page_url to xcom. +:type xcom_push: bool """ # Used in airflow.models.BaseOperator template_fields = ('json',) @@ -430,7 +430,7 @@ def __init__( polling_period_seconds=30, databricks_retry_limit=3, databricks_retry_delay=1, -do_xcom_push=False, +xcom_push=False, **kwargs): """ @@ -455,7 +455,7 @@ def __init__( self.json = _deep_string_coerce(self.json) # This variable will be used in
[jira] [Resolved] (AIRFLOW-1867) sendgrid fails on python3 with attachments
[ https://issues.apache.org/jira/browse/AIRFLOW-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-1867. --- Resolution: Fixed Assignee: Scott Kruger > sendgrid fails on python3 with attachments > -- > > Key: AIRFLOW-1867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1867 > Project: Apache Airflow > Issue Type: Bug >Reporter: Scott Kruger >Assignee: Scott Kruger >Priority: Minor > Fix For: 2.0.0 > > > Sendgrid emails raise an exception on python 3 when attaching files due to > {{base64.b64encode}} returning {{bytes}} rather than {{unicode/string}} (see: > https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/utils/sendgrid.py#L69). > The fix is simple: decode the base64 data to `utf-8`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-1867) sendgrid fails on python3 with attachments
[ https://issues.apache.org/jira/browse/AIRFLOW-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong updated AIRFLOW-1867: -- Fix Version/s: (was: 1.10.1) 2.0.0 > sendgrid fails on python3 with attachments > -- > > Key: AIRFLOW-1867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1867 > Project: Apache Airflow > Issue Type: Bug >Reporter: Scott Kruger >Priority: Minor > Fix For: 2.0.0 > > > Sendgrid emails raise an exception on python 3 when attaching files due to > {{base64.b64encode}} returning {{bytes}} rather than {{unicode/string}} (see: > https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/utils/sendgrid.py#L69). > The fix is simple: decode the base64 data to `utf-8`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1867) sendgrid fails on python3 with attachments
[ https://issues.apache.org/jira/browse/AIRFLOW-1867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657931#comment-16657931 ] ASF GitHub Bot commented on AIRFLOW-1867: - Fokko closed pull request #2824: [AIRFLOW-1867] Fix sendgrid py3k bug; add sandbox mode URL: https://github.com/apache/incubator-airflow/pull/2824 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/utils/sendgrid.py b/airflow/contrib/utils/sendgrid.py index ceb3718ff5..1b932bcb9b 100644 --- a/airflow/contrib/utils/sendgrid.py +++ b/airflow/contrib/utils/sendgrid.py @@ -27,16 +27,16 @@ import os import sendgrid -from sendgrid.helpers.mail import Attachment, Content, Email, Mail, \ -Personalization, CustomArg, Category +from sendgrid.helpers.mail import ( +Attachment, Content, Email, Mail, Personalization, CustomArg, Category, +MailSettings, SandBoxMode) from airflow.utils.email import get_email_address_list from airflow.utils.log.logging_mixin import LoggingMixin -def send_email(to, subject, html_content, files=None, - dryrun=False, cc=None, bcc=None, - mime_subtype='mixed', **kwargs): +def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, + bcc=None, mime_subtype='mixed', sandbox_mode=False, **kwargs): """ Send an email with html content using sendgrid. @@ -50,11 +50,18 @@ def send_email(to, subject, html_content, files=None, SENDGRID_MAIL_FROM={your-mail-from} SENDGRID_API_KEY={your-sendgrid-api-key}. """ +if files is None: +files = [] + mail = Mail() from_email = kwargs.get('from_email') or os.environ.get('SENDGRID_MAIL_FROM') from_name = kwargs.get('from_name') or os.environ.get('SENDGRID_MAIL_SENDER') mail.from_email = Email(from_email, from_name) mail.subject = subject +mail.mail_settings = MailSettings() + +if sandbox_mode: +mail.mail_settings.sandbox_mode = SandBoxMode(enable=True) # Add the recipient list of to emails. personalization = Personalization() @@ -84,15 +91,18 @@ def send_email(to, subject, html_content, files=None, mail.add_category(Category(cat)) # Add email attachment. -for fname in files or []: +for fname in files: basename = os.path.basename(fname) + attachment = Attachment() +attachment.type = mimetypes.guess_type(basename)[0] +attachment.filename = basename +attachment.disposition = "attachment" +attachment.content_id = '<{0}>'.format(basename) + with open(fname, "rb") as f: -attachment.content = str(base64.b64encode(f.read()), 'utf-8') -attachment.type = mimetypes.guess_type(basename)[0] -attachment.filename = basename -attachment.disposition = "attachment" -attachment.content_id = '<%s>' % basename +attachment.content = base64.b64encode(f.read()).decode('utf-8') + mail.add_attachment(attachment) _post_sendgrid_mail(mail.get()) @@ -103,8 +113,8 @@ def _post_sendgrid_mail(mail_data): response = sg.client.mail.send.post(request_body=mail_data) # 2xx status code. if response.status_code >= 200 and response.status_code < 300: -log.info('Email with subject %s is successfully sent to recipients: %s' % - (mail_data['subject'], mail_data['personalizations'])) +log.info('Email with subject %s is successfully sent to recipients: %s', + mail_data['subject'], mail_data['personalizations']) else: -log.warning('Failed to send out email with subject %s, status code: %s' % -(mail_data['subject'], response.status_code)) +log.warning('Failed to send out email with subject %s, status code: %s', +mail_data['subject'], response.status_code) diff --git a/tests/contrib/utils/test_sendgrid.py b/tests/contrib/utils/test_sendgrid.py index 6710076c2d..d12c597d70 100644 --- a/tests/contrib/utils/test_sendgrid.py +++ b/tests/contrib/utils/test_sendgrid.py @@ -20,6 +20,8 @@ import copy import unittest +import tempfile +import os from airflow.contrib.utils.sendgrid import send_email @@ -41,58 +43,78 @@ def setUp(self): self.cc = ['foo...@foo.com', 'bar...@bar.com'] self.bcc = ['foo-...@foo.com', 'bar-...@bar.com'] self.expected_mail_data = { -'content': [{'type': u'text/html', 'value': 'Foo bar'}], +'content': [{'type': u'text/html', 'value': self.html_content}], 'personalizations': [ {'cc': [{'email': 'foo...@foo.com'}, {'email': 'bar...@bar.com'}],
[GitHub] Fokko commented on issue #2824: [AIRFLOW-1867] Fix sendgrid py3k bug; add sandbox mode
Fokko commented on issue #2824: [AIRFLOW-1867] Fix sendgrid py3k bug; add sandbox mode URL: https://github.com/apache/incubator-airflow/pull/2824#issuecomment-431602999 Thanks @thesquelched This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko closed pull request #2824: [AIRFLOW-1867] Fix sendgrid py3k bug; add sandbox mode
Fokko closed pull request #2824: [AIRFLOW-1867] Fix sendgrid py3k bug; add sandbox mode URL: https://github.com/apache/incubator-airflow/pull/2824 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/utils/sendgrid.py b/airflow/contrib/utils/sendgrid.py index ceb3718ff5..1b932bcb9b 100644 --- a/airflow/contrib/utils/sendgrid.py +++ b/airflow/contrib/utils/sendgrid.py @@ -27,16 +27,16 @@ import os import sendgrid -from sendgrid.helpers.mail import Attachment, Content, Email, Mail, \ -Personalization, CustomArg, Category +from sendgrid.helpers.mail import ( +Attachment, Content, Email, Mail, Personalization, CustomArg, Category, +MailSettings, SandBoxMode) from airflow.utils.email import get_email_address_list from airflow.utils.log.logging_mixin import LoggingMixin -def send_email(to, subject, html_content, files=None, - dryrun=False, cc=None, bcc=None, - mime_subtype='mixed', **kwargs): +def send_email(to, subject, html_content, files=None, dryrun=False, cc=None, + bcc=None, mime_subtype='mixed', sandbox_mode=False, **kwargs): """ Send an email with html content using sendgrid. @@ -50,11 +50,18 @@ def send_email(to, subject, html_content, files=None, SENDGRID_MAIL_FROM={your-mail-from} SENDGRID_API_KEY={your-sendgrid-api-key}. """ +if files is None: +files = [] + mail = Mail() from_email = kwargs.get('from_email') or os.environ.get('SENDGRID_MAIL_FROM') from_name = kwargs.get('from_name') or os.environ.get('SENDGRID_MAIL_SENDER') mail.from_email = Email(from_email, from_name) mail.subject = subject +mail.mail_settings = MailSettings() + +if sandbox_mode: +mail.mail_settings.sandbox_mode = SandBoxMode(enable=True) # Add the recipient list of to emails. personalization = Personalization() @@ -84,15 +91,18 @@ def send_email(to, subject, html_content, files=None, mail.add_category(Category(cat)) # Add email attachment. -for fname in files or []: +for fname in files: basename = os.path.basename(fname) + attachment = Attachment() +attachment.type = mimetypes.guess_type(basename)[0] +attachment.filename = basename +attachment.disposition = "attachment" +attachment.content_id = '<{0}>'.format(basename) + with open(fname, "rb") as f: -attachment.content = str(base64.b64encode(f.read()), 'utf-8') -attachment.type = mimetypes.guess_type(basename)[0] -attachment.filename = basename -attachment.disposition = "attachment" -attachment.content_id = '<%s>' % basename +attachment.content = base64.b64encode(f.read()).decode('utf-8') + mail.add_attachment(attachment) _post_sendgrid_mail(mail.get()) @@ -103,8 +113,8 @@ def _post_sendgrid_mail(mail_data): response = sg.client.mail.send.post(request_body=mail_data) # 2xx status code. if response.status_code >= 200 and response.status_code < 300: -log.info('Email with subject %s is successfully sent to recipients: %s' % - (mail_data['subject'], mail_data['personalizations'])) +log.info('Email with subject %s is successfully sent to recipients: %s', + mail_data['subject'], mail_data['personalizations']) else: -log.warning('Failed to send out email with subject %s, status code: %s' % -(mail_data['subject'], response.status_code)) +log.warning('Failed to send out email with subject %s, status code: %s', +mail_data['subject'], response.status_code) diff --git a/tests/contrib/utils/test_sendgrid.py b/tests/contrib/utils/test_sendgrid.py index 6710076c2d..d12c597d70 100644 --- a/tests/contrib/utils/test_sendgrid.py +++ b/tests/contrib/utils/test_sendgrid.py @@ -20,6 +20,8 @@ import copy import unittest +import tempfile +import os from airflow.contrib.utils.sendgrid import send_email @@ -41,58 +43,78 @@ def setUp(self): self.cc = ['foo...@foo.com', 'bar...@bar.com'] self.bcc = ['foo-...@foo.com', 'bar-...@bar.com'] self.expected_mail_data = { -'content': [{'type': u'text/html', 'value': 'Foo bar'}], +'content': [{'type': u'text/html', 'value': self.html_content}], 'personalizations': [ {'cc': [{'email': 'foo...@foo.com'}, {'email': 'bar...@bar.com'}], 'to': [{'email': 'f...@foo.com'}, {'email': 'b...@bar.com'}], 'bcc': [{'email': 'foo-...@foo.com'}, {'email': 'bar-...@bar.com'}]}], 'from': {'email': u'f...@bar.com'}, -'subject':
[GitHub] Fokko commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop
Fokko commented on issue #3873: [Airflow-2760] Decouple DAG parsing loop from scheduler loop URL: https://github.com/apache/incubator-airflow/pull/3873#issuecomment-431602418 @feng-tao @ashb @XD-DENG Any further comments? Otherwise I'll merge this one. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833970 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + job_name=None, + desc=None, + aws_conn_id='aws_default', + region_name=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.aws_conn_id = aws_conn_id +self.region_name = region_name +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +glue_client = self.get_conn() + +try: +job_response = self.get_glue_job() +job_name = job_response['Name'] +job_run = glue_client.start_job_run( +JobName=job_name, +Arguments=script_arguments +) +return self.job_completion(job_name, job_run['JobRunId']) +except Exception as general_error: +raise AirflowException( +'Failed to run aws glue job, error: {error}'.format( +error=str(general_error) +) +) + +def job_completion(self, job_name=None, run_id=None): +""" +:param job_name: +:param run_id: +:return: +""" +glue_client = self.get_conn() +job_status = glue_client.get_job_run( +JobName=job_name, +RunId=run_id, +PredecessorsIncluded=True +) +job_run_state = job_status['JobRun']['JobRunState'] +failed = job_run_state == 'FAILED' +stopped = job_run_state == 'STOPPED' +completed = job_run_state == 'SUCCEEDED' + +while True: +if failed or stopped or completed: +self.log.info("Exiting Job {} Run State: {}" + .format(run_id, job_run_state)) +return {'JobRunState': job_run_state, 'JobRunId': run_id} +else: +self.log.info("Polling for AWS Glue Job {} current run state" + .format(job_name)) +time.sleep(6) Review comment: Why 6 seconds? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833946 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + job_name=None, + desc=None, + aws_conn_id='aws_default', + region_name=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.aws_conn_id = aws_conn_id +self.region_name = region_name +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +glue_client = self.get_conn() + +try: +job_response = self.get_glue_job() +job_name = job_response['Name'] +job_run = glue_client.start_job_run( +JobName=job_name, +Arguments=script_arguments +) +return self.job_completion(job_name, job_run['JobRunId']) +except Exception as general_error: Review comment: Please narrow the exception, `Exception` is too broad. Why not propagate the Exception to Airflow? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833896 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str Review comment: Maybe store the `region_name` in the connection as well? This could be stored in the extra's field. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833921 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + job_name=None, + desc=None, + aws_conn_id='aws_default', + region_name=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.aws_conn_id = aws_conn_id +self.region_name = region_name +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) Review comment: I would change this in ```suggestion return self.get_client_type('glue', self.region_name) ``` Using the `conn` variable does not really add value from my perspective. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833975 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + job_name=None, + desc=None, + aws_conn_id='aws_default', + region_name=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.aws_conn_id = aws_conn_id +self.region_name = region_name +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +glue_client = self.get_conn() + +try: +job_response = self.get_glue_job() +job_name = job_response['Name'] +job_run = glue_client.start_job_run( +JobName=job_name, +Arguments=script_arguments +) +return self.job_completion(job_name, job_run['JobRunId']) +except Exception as general_error: +raise AirflowException( +'Failed to run aws glue job, error: {error}'.format( +error=str(general_error) +) +) + +def job_completion(self, job_name=None, run_id=None): +""" +:param job_name: +:param run_id: +:return: +""" +glue_client = self.get_conn() +job_status = glue_client.get_job_run( +JobName=job_name, +RunId=run_id, +PredecessorsIncluded=True +) +job_run_state = job_status['JobRun']['JobRunState'] +failed = job_run_state == 'FAILED' +stopped = job_run_state == 'STOPPED' +completed = job_run_state == 'SUCCEEDED' + +while True: +if failed or stopped or completed: +self.log.info("Exiting Job {} Run State: {}" + .format(run_id, job_run_state)) +return {'JobRunState': job_run_state, 'JobRunId': run_id} +else: +self.log.info("Polling for AWS Glue Job {} current run state" + .format(job_name)) +time.sleep(6) + +def get_glue_job(self): +""" +Function should return glue job details +:return: +""" +try: +self.log.info( +"Retrieving AWS Glue Job: {job_name}" +.format(job_name=self.job_name) +) +glue_client = self.get_conn() +get_job_response = glue_client.get_job( +JobName=self.job_name +)['Job'] +self.logger.info("Found AWS Glue Job: {job_name}".format(job_name=self.job_name)) +self.logger.info("Job Creation Time: {}".format(get_job_response['CreatedOn'])) +self.logger.info("Last Job Modification Time: {}".format(get_job_response['LastModifiedOn'])) +return get_job_response +except Exception as general_error: Review comment: Too broad exception This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833880 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) Review comment: Missing `aws_conn_id` in the docstring. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833966 ## File path: airflow/contrib/hooks/aws_glue_job_hook.py ## @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + job_name=None, + desc=None, + aws_conn_id='aws_default', + region_name=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.aws_conn_id = aws_conn_id +self.region_name = region_name +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +glue_client = self.get_conn() + +try: +job_response = self.get_glue_job() +job_name = job_response['Name'] +job_run = glue_client.start_job_run( +JobName=job_name, +Arguments=script_arguments +) +return self.job_completion(job_name, job_run['JobRunId']) +except Exception as general_error: +raise AirflowException( +'Failed to run aws glue job, error: {error}'.format( +error=str(general_error) +) +) + +def job_completion(self, job_name=None, run_id=None): +""" +:param job_name: +:param run_id: +:return: +""" +glue_client = self.get_conn() +job_status = glue_client.get_job_run( +JobName=job_name, +RunId=run_id, +PredecessorsIncluded=True +) +job_run_state = job_status['JobRun']['JobRunState'] +failed = job_run_state == 'FAILED' +stopped = job_run_state == 'STOPPED' +completed = job_run_state == 'SUCCEEDED' + +while True: +if failed or stopped or completed: Review comment: ```python if job_run_state in {'FAILED', 'STOPPED', 'SUCCEEDED'}: ... ``` is much more readable than the construction above I'd say. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226834023 ## File path: tests/contrib/hooks/test_aws_glue_job_hook.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- Review comment: Can't we mock this stuff with `moto`? With the current tests, a lot is mocked. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on a change in pull request #4068: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/4068#discussion_r226833986 ## File path: airflow/contrib/operators/aws_glue_job_operator.py ## @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import unicode_literals + +from airflow.contrib.hooks.aws_glue_job_hook import AwsGlueJobHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class AWSGlueJobOperator(BaseOperator): +""" +Creates an AWS Glue Job. AWS Glue is a serverless Spark +ETL service for running Spark Jobs on the AWS cloud. +Language support: Python and Scala +:param job_name: unique job name per AWS Account +:type str +:param job_desc: job description details +:type str +:param script_args: etl script arguments and AWS Glue arguments +:type dict +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" +template_fields = () +template_ext = () +ui_color = '#ededed' + +@apply_defaults +def __init__(self, + job_name='aws_glue_default_job', + job_desc='AWS Glue Job with Airflow', + script_args={}, + aws_conn_id='aws_default', Review comment: Add `aws_conn_id` to the docstring. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration
[ https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657924#comment-16657924 ] ASF GitHub Bot commented on AIRFLOW-2310: - Fokko closed pull request #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/3504 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/aws_glue_job_hook.py b/airflow/contrib/aws_glue_job_hook.py new file mode 100644 index 00..323313400a --- /dev/null +++ b/airflow/contrib/aws_glue_job_hook.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import os.path +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param concurrent_run_limit: The maximum number of concurrent runs allowed for a job +:type int +:param script_location: path to etl script either on s3 or local +:type str +:param conns: A list of connections used by the job +:type list +:param retry_limit: Maximum number of times to retry this job if it fails +:type int +:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job +:type int +:param region_name: aws region name (example: us-east-1) +:type region_name: str +:param s3_bucket: S3 bucket where logs and local etl script will be uploaded +:type str +:param iam_role_name: AWS IAM Role for Glue Job +:type str +""" + +def __init__(self, + job_name=None, + desc=None, + concurrent_run_limit=None, + script_location=None, + conns=None, + retry_limit=None, + num_of_dpus=None, + aws_conn_id='aws_default', + region_name=None, + iam_role_name=None, + s3_bucket=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.concurrent_run_limit = concurrent_run_limit or 1 +self.script_location = script_location +self.conns = conns or ["s3"] +self.retry_limit = retry_limit or 0 +self.num_of_dpus = num_of_dpus or 10 +self.aws_conn_id = aws_conn_id +self.region_name = region_name +self.s3_bucket = s3_bucket +self.role_name = iam_role_name +self.S3_PROTOCOL = "s3://" +self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' +self.S3_GLUE_LOGS = 'logs/glue-logs/' +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def get_iam_execution_role(self): +""" +:return: iam role for job execution +""" +iam_client = self.get_client_type('iam', self.region_name) + +try: +glue_execution_role = iam_client.get_role(RoleName=self.role_name) +self.log.info("Iam Role Name: {}".format(self.role_name)) +return glue_execution_role +except Exception as general_error: +raise AirflowException( +'Failed to create aws glue job, error: {error}'.format( +error=str(general_error) +) +) + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +if self.s3_bucket is None: +raise AirflowException( +'Could not initialize
[GitHub] Fokko commented on issue #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko commented on issue #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/3504#issuecomment-431601530 Moved to #4068 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko closed pull request #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow
Fokko closed pull request #3504: [AIRFLOW-2310]: Add AWS Glue Job Compatibility to Airflow URL: https://github.com/apache/incubator-airflow/pull/3504 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/aws_glue_job_hook.py b/airflow/contrib/aws_glue_job_hook.py new file mode 100644 index 00..323313400a --- /dev/null +++ b/airflow/contrib/aws_glue_job_hook.py @@ -0,0 +1,212 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.exceptions import AirflowException +from airflow.contrib.hooks.aws_hook import AwsHook +import os.path +import time + + +class AwsGlueJobHook(AwsHook): +""" +Interact with AWS Glue - create job, trigger, crawler + +:param job_name: unique job name per AWS account +:type str +:param desc: job description +:type str +:param concurrent_run_limit: The maximum number of concurrent runs allowed for a job +:type int +:param script_location: path to etl script either on s3 or local +:type str +:param conns: A list of connections used by the job +:type list +:param retry_limit: Maximum number of times to retry this job if it fails +:type int +:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job +:type int +:param region_name: aws region name (example: us-east-1) +:type region_name: str +:param s3_bucket: S3 bucket where logs and local etl script will be uploaded +:type str +:param iam_role_name: AWS IAM Role for Glue Job +:type str +""" + +def __init__(self, + job_name=None, + desc=None, + concurrent_run_limit=None, + script_location=None, + conns=None, + retry_limit=None, + num_of_dpus=None, + aws_conn_id='aws_default', + region_name=None, + iam_role_name=None, + s3_bucket=None, *args, **kwargs): +self.job_name = job_name +self.desc = desc +self.concurrent_run_limit = concurrent_run_limit or 1 +self.script_location = script_location +self.conns = conns or ["s3"] +self.retry_limit = retry_limit or 0 +self.num_of_dpus = num_of_dpus or 10 +self.aws_conn_id = aws_conn_id +self.region_name = region_name +self.s3_bucket = s3_bucket +self.role_name = iam_role_name +self.S3_PROTOCOL = "s3://" +self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' +self.S3_GLUE_LOGS = 'logs/glue-logs/' +super(AwsGlueJobHook, self).__init__(*args, **kwargs) + +def get_conn(self): +conn = self.get_client_type('glue', self.region_name) +return conn + +def list_jobs(self): +conn = self.get_conn() +return conn.get_jobs() + +def get_iam_execution_role(self): +""" +:return: iam role for job execution +""" +iam_client = self.get_client_type('iam', self.region_name) + +try: +glue_execution_role = iam_client.get_role(RoleName=self.role_name) +self.log.info("Iam Role Name: {}".format(self.role_name)) +return glue_execution_role +except Exception as general_error: +raise AirflowException( +'Failed to create aws glue job, error: {error}'.format( +error=str(general_error) +) +) + +def initialize_job(self, script_arguments=None): +""" +Initializes connection with AWS Glue +to run job +:return: +""" +if self.s3_bucket is None: +raise AirflowException( +'Could not initialize glue job, ' +'error: Specify Parameter `s3_bucket`' +) + +glue_client = self.get_conn() + +try: +job_response = self.get_or_create_glue_job() +job_name = job_response['Name'] +
[GitHub] Fokko commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
Fokko commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226833629 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or Review comment: How do you test these kind of violations? Would it be possible to automatically check this in the CI? Maybe run Sphinx and check the number of errors? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
Fokko commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226833574 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or +identifier for establishing a connection to the SFTP server. +:type sftp_conn_id: string Review comment: Good spot @kaxil We should maybe see if we could automate the list of operators in the `integration.rst`, but this is for a new PR. For now we should add both of the operators to the list indeed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test
XD-DENG commented on issue #4049: [AIRFLOW-3203] Fix DockerOperator & some operator test URL: https://github.com/apache/incubator-airflow/pull/4049#issuecomment-431591819 Hi @Fokko , a gentle ping. Cheers! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
XD-DENG commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226830809 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or Review comment: Hi @wmorris75 @Fokko , possibly this comment is coming in a bit late, but I would like to remind that an empty line may be needed after line 30 to make sure this docstring can be parsed properly, according to Sphinx standard. The same for the docstring in other files in this PR. CC @kaxil This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
kaxil commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226830756 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or +identifier for establishing a connection to the SFTP server. +:type sftp_conn_id: string Review comment: You can follow it up with a new PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
kaxil commented on a change in pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#discussion_r226830746 ## File path: airflow/contrib/operators/s3_to_sftp_operator.py ## @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or +identifier for establishing a connection to the SFTP server. +:type sftp_conn_id: string Review comment: Can you please change `string` to `str` for all parameters? and and this Operator to `integration.rst`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-2993) Addition of S3_to_SFTP and SFTP_to_S3 Operators
[ https://issues.apache.org/jira/browse/AIRFLOW-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2993. --- Resolution: Fixed > Addition of S3_to_SFTP and SFTP_to_S3 Operators > > > Key: AIRFLOW-2993 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2993 > Project: Apache Airflow > Issue Type: New Feature > Components: operators >Affects Versions: 1.9.0 >Reporter: Wayne Morris >Assignee: Wayne Morris >Priority: Major > Fix For: 2.0.0 > > > New features enable transferring of files or data from S3 to a SFTP remote > path and SFTP to S3 path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2993) Addition of S3_to_SFTP and SFTP_to_S3 Operators
[ https://issues.apache.org/jira/browse/AIRFLOW-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong updated AIRFLOW-2993: -- Fix Version/s: (was: 1.9.0) 2.0.0 > Addition of S3_to_SFTP and SFTP_to_S3 Operators > > > Key: AIRFLOW-2993 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2993 > Project: Apache Airflow > Issue Type: New Feature > Components: operators >Affects Versions: 1.9.0 >Reporter: Wayne Morris >Assignee: Wayne Morris >Priority: Major > Fix For: 2.0.0 > > > New features enable transferring of files or data from S3 to a SFTP remote > path and SFTP to S3 path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] Fokko commented on issue #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
Fokko commented on issue #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828#issuecomment-431583228 Thanks @wmorris75. Looks great. Merging to master! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko closed pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators
Fokko closed pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/s3_to_sftp_operator.py b/airflow/contrib/operators/s3_to_sftp_operator.py new file mode 100644 index 00..7e02e97cd2 --- /dev/null +++ b/airflow/contrib/operators/s3_to_sftp_operator.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or +identifier for establishing a connection to the SFTP server. +:type sftp_conn_id: string +:param sftp_path: The sftp remote path. This is the specified +file path for uploading file to the SFTP server. +:type sftp_path:string +:param s3_conn_id: The s3 connnection id. The name or identifier for establishing +a connection to S3 +:type s3_conn_id: string +:param s3_bucket: The targeted s3 bucket. This is the S3 bucket +from where the file is downloaded. +:type s3_bucket:string +:param s3_key: The targeted s3 key. This is the specified file path +for downloading the file from S3. +:type s3_key: string +""" + +template_fields = ('s3_key', 'sftp_path') + +@apply_defaults +def __init__(self, + s3_bucket, + s3_key, + sftp_path, + sftp_conn_id='ssh_default', + s3_conn_id='aws_default', + *args, + **kwargs): +super(S3ToSFTPOperator, self).__init__(*args, **kwargs) +self.sftp_conn_id = sftp_conn_id +self.sftp_path = sftp_path +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.s3_conn_id = s3_conn_id + +@staticmethod +def get_s3_key(s3_key): +"""This parses the correct format for S3 keys +regardless of how the S3 url is passed.""" + +parsed_s3_key = urlparse(s3_key) +return parsed_s3_key.path.lstrip('/') + +def execute(self, context): +self.s3_key = self.get_s3_key(self.s3_key) +ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) +s3_hook = S3Hook(self.s3_conn_id) + +s3_client = s3_hook.get_conn() +sftp_client = ssh_hook.get_conn().open_sftp() + +with NamedTemporaryFile("w") as f: +s3_client.download_file(self.s3_bucket, self.s3_key, f.name) +sftp_client.put(f.name, self.sftp_path) diff --git a/airflow/contrib/operators/sftp_to_s3_operator.py b/airflow/contrib/operators/sftp_to_s3_operator.py new file mode 100644 index 00..b0ed1e16a3 --- /dev/null +++ b/airflow/contrib/operators/sftp_to_s3_operator.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS,
[jira] [Commented] (AIRFLOW-2993) Addition of S3_to_SFTP and SFTP_to_S3 Operators
[ https://issues.apache.org/jira/browse/AIRFLOW-2993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16657862#comment-16657862 ] ASF GitHub Bot commented on AIRFLOW-2993: - Fokko closed pull request #3828: [AIRFLOW-2993] s3_to_sftp and sftp_to_s3 operators URL: https://github.com/apache/incubator-airflow/pull/3828 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/s3_to_sftp_operator.py b/airflow/contrib/operators/s3_to_sftp_operator.py new file mode 100644 index 00..7e02e97cd2 --- /dev/null +++ b/airflow/contrib/operators/s3_to_sftp_operator.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.models import BaseOperator +from airflow.hooks.S3_hook import S3Hook +from airflow.contrib.hooks.ssh_hook import SSHHook +from tempfile import NamedTemporaryFile +from urllib.parse import urlparse +from airflow.utils.decorators import apply_defaults + + +class S3ToSFTPOperator(BaseOperator): +""" +This operator enables the transferring of files from S3 to a SFTP server +:param sftp_conn_id:The sftp connection id. The name or +identifier for establishing a connection to the SFTP server. +:type sftp_conn_id: string +:param sftp_path: The sftp remote path. This is the specified +file path for uploading file to the SFTP server. +:type sftp_path:string +:param s3_conn_id: The s3 connnection id. The name or identifier for establishing +a connection to S3 +:type s3_conn_id: string +:param s3_bucket: The targeted s3 bucket. This is the S3 bucket +from where the file is downloaded. +:type s3_bucket:string +:param s3_key: The targeted s3 key. This is the specified file path +for downloading the file from S3. +:type s3_key: string +""" + +template_fields = ('s3_key', 'sftp_path') + +@apply_defaults +def __init__(self, + s3_bucket, + s3_key, + sftp_path, + sftp_conn_id='ssh_default', + s3_conn_id='aws_default', + *args, + **kwargs): +super(S3ToSFTPOperator, self).__init__(*args, **kwargs) +self.sftp_conn_id = sftp_conn_id +self.sftp_path = sftp_path +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.s3_conn_id = s3_conn_id + +@staticmethod +def get_s3_key(s3_key): +"""This parses the correct format for S3 keys +regardless of how the S3 url is passed.""" + +parsed_s3_key = urlparse(s3_key) +return parsed_s3_key.path.lstrip('/') + +def execute(self, context): +self.s3_key = self.get_s3_key(self.s3_key) +ssh_hook = SSHHook(ssh_conn_id=self.sftp_conn_id) +s3_hook = S3Hook(self.s3_conn_id) + +s3_client = s3_hook.get_conn() +sftp_client = ssh_hook.get_conn().open_sftp() + +with NamedTemporaryFile("w") as f: +s3_client.download_file(self.s3_bucket, self.s3_key, f.name) +sftp_client.put(f.name, self.sftp_path) diff --git a/airflow/contrib/operators/sftp_to_s3_operator.py b/airflow/contrib/operators/sftp_to_s3_operator.py new file mode 100644 index 00..b0ed1e16a3 --- /dev/null +++ b/airflow/contrib/operators/sftp_to_s3_operator.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the
[GitHub] kaxil closed pull request #4066: [AIRFLOW-XXX] BigQuery Hook - Minor Refactoring
kaxil closed pull request #4066: [AIRFLOW-XXX] BigQuery Hook - Minor Refactoring URL: https://github.com/apache/incubator-airflow/pull/4066 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #3748: [AIRFLOW-2899] Hide sensitive data when Exporting Variables
codecov-io edited a comment on issue #3748: [AIRFLOW-2899] Hide sensitive data when Exporting Variables URL: https://github.com/apache/incubator-airflow/pull/3748#issuecomment-412912402 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=h1) Report > Merging [#3748](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/b8be322d3badfeadfa8f08e0bf92a12a6cd26418?src=pr=desc) will **increase** coverage by `1.85%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/3748/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=tree) ```diff @@Coverage Diff @@ ## master#3748 +/- ## == + Coverage 75.79% 77.64% +1.85% == Files 199 204 +5 Lines 1594615853 -93 == + Hits1208612309 +223 + Misses 3860 3544 -316 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.95% <0%> (+0.1%)` | :arrow_up: | | [airflow/www\_rbac/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy93d3dfcmJhYy92aWV3cy5weQ==) | `72.61% <0%> (+0.56%)` | :arrow_up: | | [airflow/sensors/s3\_key\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3MzX2tleV9zZW5zb3IucHk=) | `31.03% <0%> (-68.97%)` | :arrow_down: | | [airflow/sensors/s3\_prefix\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3MzX3ByZWZpeF9zZW5zb3IucHk=) | `41.17% <0%> (-58.83%)` | :arrow_down: | | [airflow/utils/helpers.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9oZWxwZXJzLnB5) | `71.34% <0%> (-13.04%)` | :arrow_down: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `78% <0%> (-12%)` | :arrow_down: | | [airflow/sensors/sql\_sensor.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9zZW5zb3JzL3NxbF9zZW5zb3IucHk=) | `90.47% <0%> (-9.53%)` | :arrow_down: | | [airflow/configuration.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9jb25maWd1cmF0aW9uLnB5) | `83.95% <0%> (-5.31%)` | :arrow_down: | | [airflow/utils/state.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zdGF0ZS5weQ==) | `93.33% <0%> (-3.34%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `88.78% <0%> (-2.93%)` | :arrow_down: | | ... and [66 more](https://codecov.io/gh/apache/incubator-airflow/pull/3748/diff?src=pr=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=footer). Last update [b8be322...8a73aa1](https://codecov.io/gh/apache/incubator-airflow/pull/3748?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services