[GitHub] XD-DENG commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
XD-DENG commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229559111 ## File path: airflow/contrib/hooks/aws_athena_hook.py ## @@ -0,0 +1,140 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from time import sleep +from airflow.contrib.hooks.aws_hook import AwsHook +from uuid import uuid4 + +INTERMEDIATE_STATES = ('QUEUED', 'RUNNING',) +FAILURE_STATES = ('FAILED', 'CANCELLED',) +SUCCESS_STATES = ('SUCCEEDED',) + + +class AWSAthenaHook(AwsHook): +""" +Interact with AWS Athena to run, poll queries and return query results +""" + +def __init__(self, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaHook, self).__init__(aws_conn_id, **kwargs) +self.sleep_time = kwargs.get('sleep_time') or 30 +self.conn = None + +def get_conn(self): +""" +check if aws conn exists already or create one and return it +:return: boto3 session +""" +if not hasattr(self, 'conn'): +self.conn = self.get_client_type('athena') +return self.conn + +def run_query(self, query, query_context, result_configuration, client_request_token=None): +""" +Run Presto query on athena with provided config and return submitted query_execution_id +:param query: Presto query to run Review comment: Minor stuff: there should be an empty line after line 50, otherwise the docstring can't be parsed properly by Sphinx. Also applicable for line 72, 86, 103, 135 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#issuecomment-433705746 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=h1) Report > Merging [#4111](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **not change** coverage. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4111/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#4111 +/- ## === Coverage 76.68% 76.68% === Files 199 199 Lines 1618916189 === Hits1241412414 Misses 3775 3775 ``` -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=footer). Last update [c4e5151...b1105a7](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator
kaxil commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator URL: https://github.com/apache/incubator-airflow/pull/4102#issuecomment-434544055 Added the test @Fokko This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator
codecov-io edited a comment on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator URL: https://github.com/apache/incubator-airflow/pull/4102#issuecomment-433663881 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=h1) Report > Merging [#4102](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **decrease** coverage by `0.01%`. > The diff coverage is `66.66%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4102/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4102 +/- ## == - Coverage 76.68% 76.66% -0.02% == Files 199 199 Lines 1618916189 == - Hits1241412412 -2 - Misses 3775 3777 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/operators/http\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/4102/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvaHR0cF9vcGVyYXRvci5weQ==) | `90.32% <66.66%> (-2.54%)` | :arrow_down: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4102/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90% <0%> (-0.39%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4102/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (-0.05%)` | :arrow_down: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=footer). Last update [c4e5151...11917fc](https://codecov.io/gh/apache/incubator-airflow/pull/4102?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229509804 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) +utc = timezone.convert_to_utc(start) + +dag = DAG('tz_dag', start_date=start, schedule_interval="'*/5 * * * *'") +next = dag.following_schedule(utc) +next_local = local_tz.convert(next) + +# should give 2018, 10, 28, 3, 5 +self.assertEqual(next_local.hour, 3) +self.assertEqual(next_local.minute, 5) + +prev = dag.previous_schedule(utc) +prev_local = local_tz.convert(prev) + +# should give 2018, 10, 28, 1, 55 +self.assertEqual(prev_local.hour, 1) +self.assertEqual(prev_local.minute, 55) Review comment: Tests that cover the 3am case: ```python start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, 0)) utc = timezone.convert_to_utc(start) dag = DAG('tz_dag', start_date=start, schedule_interval='0 3 * * *') next = dag.following_schedule(utc) next_local = local_tz.convert(next) self.assertEqual(str(next_local), "2018-10-28 03:00:00+01:00") self.assertEqual(str(next), "2018-10-28 02:00:00+00:00") prev = dag.previous_schedule(next) prev_local = local_tz.convert(prev) self.assertEqual(str(next_local), "2018-10-27 03:00:00+02:00") # This fails. self.assertEqual(str(next), "2018-10-27 01:00:00+00:00") ``` Buuut croniter's get_prev is flaky around DST transition :( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229494185 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) +utc = timezone.convert_to_utc(start) + +dag = DAG('tz_dag', start_date=start, schedule_interval="'*/5 * * * *'") +next = dag.following_schedule(utc) +next_local = local_tz.convert(next) + +# should give 2018, 10, 28, 3, 5 +self.assertEqual(next_local.hour, 3) +self.assertEqual(next_local.minute, 5) + +prev = dag.previous_schedule(utc) +prev_local = local_tz.convert(prev) + +# should give 2018, 10, 28, 1, 55 +self.assertEqual(prev_local.hour, 1) +self.assertEqual(prev_local.minute, 55) Review comment: I'm not quite sure we're constructing the DTs right here. (cos 2am local is ambigious time). This makes sure that the pre-condition time is correct: ``` local_tz = pendulum.timezone('Europe/Zurich') start = local_tz.convert(datetime.datetime(2018, 10, 28, 2, 55), dst_rule=pendulum.PRE_TRANSITION) self.assertEqual(str(start), "2018-10-28 02:55:00+02:00", "Pre-condition: start date is in DST") utc = timezone.convert_to_utc(start) dag = DAG('tz_dag', start_date=start, schedule_interval='*/5 * * * *') next = dag.following_schedule(utc) next_local = local_tz.convert(next) self.assertEqual(str(next), "2018-10-28 01:00:00+00:00") self.assertEqual(str(next_local), "2018-10-28 02:00:00+01:00") prev = dag.previous_schedule(utc) prev_local = local_tz.convert(prev) self.assertEqual(str(prev_local), "2018-10-28 02:50:00+02:00") prev = dag.previous_schedule(next) prev_local = local_tz.convert(prev) self.assertEqual(str(prev_local), "2018-10-28 02:55:00+02:00") self.assertEqual(prev, utc) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229495682 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) +utc = timezone.convert_to_utc(start) + +dag = DAG('tz_dag', start_date=start, schedule_interval="'*/5 * * * *'") +next = dag.following_schedule(utc) +next_local = local_tz.convert(next) + +# should give 2018, 10, 28, 3, 5 +self.assertEqual(next_local.hour, 3) +self.assertEqual(next_local.minute, 5) + +prev = dag.previous_schedule(utc) +prev_local = local_tz.convert(prev) + +# should give 2018, 10, 28, 1, 55 +self.assertEqual(prev_local.hour, 1) +self.assertEqual(prev_local.minute, 55) Review comment: This change fixes it for hourly dags, but if (as was reported in Slack) you have a 3am daily dag I think this breaks the TZ feature: ``` >>> start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, 0)) >>> utc = timezone.convert_to_utc(start) >>> dag = DAG('tz_dag', start_date=start, schedule_interval='0 3 * * *') >>> next = dag.following_schedule(utc) >>> str(next) 2018-10-27 05:00:00+02:00 ``` It is no longer 3am local time as requested :( (*edit*: fix the cron schedule field order in the example) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229495682 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) +utc = timezone.convert_to_utc(start) + +dag = DAG('tz_dag', start_date=start, schedule_interval="'*/5 * * * *'") +next = dag.following_schedule(utc) +next_local = local_tz.convert(next) + +# should give 2018, 10, 28, 3, 5 +self.assertEqual(next_local.hour, 3) +self.assertEqual(next_local.minute, 5) + +prev = dag.previous_schedule(utc) +prev_local = local_tz.convert(prev) + +# should give 2018, 10, 28, 1, 55 +self.assertEqual(prev_local.hour, 1) +self.assertEqual(prev_local.minute, 55) Review comment: This change fixes it for hourly dags, but if (as was reported in Slack) you have a 3am daily dag I think this breaks the TZ feature: ``` >>> start = local_tz.convert(datetime.datetime(2018, 10, 27, 3, 0)) >>> utc = timezone.convert_to_utc(start) >>> dag = DAG('tz_dag', start_date=start, schedule_interval='3 0 * * *') >>> next = dag.following_schedule(utc) >>> str(next) 2018-10-27 05:00:00+02:00 ``` It is no longer 3am local time as requested :( This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server
feluelle commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661#issuecomment-434475901 > Will this hook be in 1.10.1? I hope so. I will try to get it ready for review tomorrow, containing checking for escaping chars and symlinks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229488482 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) Review comment: ```suggestion start = datetime.datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
ashb commented on a change in pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117#discussion_r229488428 ## File path: tests/models.py ## @@ -580,6 +580,29 @@ def test_cycle(self): with self.assertRaises(AirflowDagCycleException): dag.test_cycle() +def test_following_previous_schedule(self): +""" +Make sure DST transitions are properly observed +""" +local_tz = pendulum.timezone('Europe/Zurich') +start = datetime(2018, 10, 28, 3, 00, tzinfo=local_tz) +utc = timezone.convert_to_utc(start) + +dag = DAG('tz_dag', start_date=start, schedule_interval="'*/5 * * * *'") Review comment: ```suggestion dag = DAG('tz_dag', start_date=start, schedule_interval='*/5 * * * *') ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3271) Airflow RBAC Permissions modification via UI do not persist
[ https://issues.apache.org/jira/browse/AIRFLOW-3271?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669322#comment-16669322 ] ASF GitHub Bot commented on AIRFLOW-3271: - smithakoduri opened a new pull request #4118: [AIRFLOW-3271]Fix is added to not reset the database whenever a new… URL: https://github.com/apache/incubator-airflow/pull/4118 … process comes up (or init_roles is called). Otherwise Airflow RBAC permissions that are modified, do not persist for long Make sure you have checked _all_ steps below. ### Jira - [ ] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [ ] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Airflow RBAC Permissions modification via UI do not persist > --- > > Key: AIRFLOW-3271 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3271 > Project: Apache Airflow > Issue Type: Bug >Reporter: Smitha Koduri >Assignee: Smitha Koduri >Priority: Major > > After upgrading Airflow to 1.10, we have noticed that when attempting to add > a new permission-role mapping (via UI), initially it gets successfully added > to db. But later, the entry doesn't persist in the db. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] smithakoduri opened a new pull request #4118: [AIRFLOW-3271]Fix is added to not reset the database whenever a new…
smithakoduri opened a new pull request #4118: [AIRFLOW-3271]Fix is added to not reset the database whenever a new… URL: https://github.com/apache/incubator-airflow/pull/4118 … process comes up (or init_roles is called). Otherwise Airflow RBAC permissions that are modified, do not persist for long Make sure you have checked _all_ steps below. ### Jira - [ ] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [ ] Here are some details about my PR, including screenshots of any UI changes: ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: ### Commits - [ ] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [ ] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3277) Invalid timezone transition handling for cron schedules
[ https://issues.apache.org/jira/browse/AIRFLOW-3277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669308#comment-16669308 ] ASF GitHub Bot commented on AIRFLOW-3277: - bolkedebruin opened a new pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117 Make sure you have checked _all_ steps below. ### Jira - [X] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-3277 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [X] Here are some details about my PR, including screenshots of any UI changes: `following_schedule` converts to naive time by using the local time zone. In case of a DST transition, say 3AM -> 2AM ("summer time to winter time") we generate date times that could overlap with earlier schedules. Therefore a DAG that should run every 5 minutes will not do so if it has already seen the schedule. ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: added ### Commits - [X] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [X] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [X] Passes `flake8` cc @ashb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Invalid timezone transition handling for cron schedules > --- > > Key: AIRFLOW-3277 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3277 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10.0 >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.10.1 > > > `following_schedule` converts to naive time by using the local time zone. In > case of a DST transition, say 3AM -> 2AM ("summer time to winter time") we > generate date times that could overlap with earlier schedules. Therefore a > DAG that should run every 5 minutes will not do so if it has already seen the > schedule. > We should not convert to naive and keep UTC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] bolkedebruin opened a new pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron
bolkedebruin opened a new pull request #4117: [AIRFLOW-3277] Correctly observe DST transitions for cron URL: https://github.com/apache/incubator-airflow/pull/4117 Make sure you have checked _all_ steps below. ### Jira - [X] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-3277 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [X] Here are some details about my PR, including screenshots of any UI changes: `following_schedule` converts to naive time by using the local time zone. In case of a DST transition, say 3AM -> 2AM ("summer time to winter time") we generate date times that could overlap with earlier schedules. Therefore a DAG that should run every 5 minutes will not do so if it has already seen the schedule. ### Tests - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: added ### Commits - [X] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [X] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [X] Passes `flake8` cc @ashb This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart
jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart URL: https://github.com/apache/incubator-airflow/pull/4083#discussion_r229476304 ## File path: airflow/contrib/hooks/gcp_dataproc_hook.py ## @@ -33,12 +33,82 @@ def __init__(self, dataproc_api, project_id, job, region='global', self.dataproc_api = dataproc_api self.project_id = project_id self.region = region + +# Check if the job to submit is already running on the cluster. +# If so, don't resubmit the job. +try: +cluster_name = job['job']['placement']['clusterName'] +except KeyError: +self.log.error('Job to submit is incorrectly configured.') +raise + +jobs_on_cluster_response = dataproc_api.projects().regions().jobs().list( +projectId=self.project_id, +region=self.region, +clusterName=cluster_name).execute() + +UUID_LENGTH = 9 Review comment: @fenglu-g Just wanted to clarify if my interpretation of your comment was in line with what you meant? If so I'll go ahead and implement it. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart
jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart URL: https://github.com/apache/incubator-airflow/pull/4083#discussion_r228297977 ## File path: airflow/contrib/hooks/gcp_dataproc_hook.py ## @@ -33,12 +33,82 @@ def __init__(self, dataproc_api, project_id, job, region='global', self.dataproc_api = dataproc_api self.project_id = project_id self.region = region + +# Check if the job to submit is already running on the cluster. +# If so, don't resubmit the job. +try: +cluster_name = job['job']['placement']['clusterName'] +except KeyError: +self.log.error('Job to submit is incorrectly configured.') +raise + +jobs_on_cluster_response = dataproc_api.projects().regions().jobs().list( +projectId=self.project_id, +region=self.region, +clusterName=cluster_name).execute() + +UUID_LENGTH = 9 Review comment: Thank you @fenglu-g. Just to clarify, do you mean that I should modify the [`DataProcHook.submit`](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L229) and [`_DataProcJob.__init__`](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L31) functions to take in the new regex param like so? ``` class _DataProcJob(LoggingMixin): def __init__(other args..., job_dedupe_regex=None): class DataProcHook(GoogleCloudBaseHook): def submit(other args..., job_dedupe_regex=[by default, regex that dedupes by matching the task ID]): ``` And then have `submit()` pass the regex along to the _DataProcJob init [here](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L230 ), because the deduping happens during the init: ``` def submit(other args..., job_dedupe_regex=[by default, regex that dedupes by matching the task ID]): submitted = _DataProcJob(other args..., job_dedupe_regex=job_dedupe_regex) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart
jj-ian commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart URL: https://github.com/apache/incubator-airflow/pull/4083#discussion_r228297977 ## File path: airflow/contrib/hooks/gcp_dataproc_hook.py ## @@ -33,12 +33,82 @@ def __init__(self, dataproc_api, project_id, job, region='global', self.dataproc_api = dataproc_api self.project_id = project_id self.region = region + +# Check if the job to submit is already running on the cluster. +# If so, don't resubmit the job. +try: +cluster_name = job['job']['placement']['clusterName'] +except KeyError: +self.log.error('Job to submit is incorrectly configured.') +raise + +jobs_on_cluster_response = dataproc_api.projects().regions().jobs().list( +projectId=self.project_id, +region=self.region, +clusterName=cluster_name).execute() + +UUID_LENGTH = 9 Review comment: Thank you @fenglu-g. Just to clarify, do you mean that I should modify the [`DataProcHook.submit`](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L229) and [`_DataProcJob.__init__`](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L31) functions to take in the new regex param like so? ``` class _DataProcJob(LoggingMixin): def __init__(other args..., job_dedupe_regex=None): class DataProcHook(GoogleCloudBaseHook): def submit(other args..., job_dedupe_regex=[by default, regex that dedupes by matching the task ID]): ``` And then have `submit()` pass the regex along to the _DataProcJob init [here](https://github.com/apache/incubator-airflow/blob/0e8394fd23d067b7e226c011bb1825ff734219c5/airflow/contrib/hooks/gcp_dataproc_hook.py#L230 ), and the deduping happens during the init: ``` def submit(other args..., job_dedupe_regex=[by default, regex that dedupes by matching the task ID]): submitted = _DataProcJob(other args..., job_dedupe_regex=job_dedupe_regex) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yegeniy commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case
yegeniy commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case URL: https://github.com/apache/incubator-airflow/pull/2367#issuecomment-434456547 This is a great warning for those who are not on the latest version, but the default executor for SubDags is now the SequentialExecutor. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feng-tao commented on issue #3997: [AIRFLOW-3153] send dag last_run to statsd
feng-tao commented on issue #3997: [AIRFLOW-3153] send dag last_run to statsd URL: https://github.com/apache/incubator-airflow/pull/3997#issuecomment-434451104 @ashb , @Fokko , sorry for the pr slip through, will update This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3277) Invalid timezone transition handling for cron schedules
Bolke de Bruin created AIRFLOW-3277: --- Summary: Invalid timezone transition handling for cron schedules Key: AIRFLOW-3277 URL: https://issues.apache.org/jira/browse/AIRFLOW-3277 Project: Apache Airflow Issue Type: Bug Affects Versions: 1.10.0 Reporter: Bolke de Bruin Fix For: 1.10.1 `following_schedule` converts to naive time by using the local time zone. In case of a DST transition, say 3AM -> 2AM ("summer time to winter time") we generate date times that could overlap with earlier schedules. Therefore a DAG that should run every 5 minutes will not do so if it has already seen the schedule. We should not convert to naive and keep UTC. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2064) Polish timezone implementation
[ https://issues.apache.org/jira/browse/AIRFLOW-2064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669257#comment-16669257 ] Bolke de Bruin commented on AIRFLOW-2064: - [~phani8996] that is a setting in airflow.cfg (default_timezone = XXX, which defaults to UTC) > Polish timezone implementation > -- > > Key: AIRFLOW-2064 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2064 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bolke de Bruin >Assignee: Marcus Rehm >Priority: Blocker > Fix For: 1.10.0 > > > Couple of things are left over after moving to time zone support: > > # End_dates within dags should be converted to UTC by using the time zone of > start_date if naive > # Task instances that are instantiated without timezone information for > their execution_date should convert those to UTC by using the DAG's timezone > or configured > # Some doc polishing > # Tests should be added that cover more of the edge cases -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3036) Upgrading to Airflow 1.10 not possible using GCP Cloud SQL for MYSQL
[ https://issues.apache.org/jira/browse/AIRFLOW-3036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16669256#comment-16669256 ] Bolke de Bruin commented on AIRFLOW-3036: - Google is testing an alternative. Meanwhile you could try to replace `cur = conn.execute({color:#a5c261}"SELECT @@explicit_defaults_for_timestamp"{color})` in airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py by `cur = conn.execute({color:#a5c261}"SET {{explicit_defaults_for_timestamp=1}}"{color})` > Upgrading to Airflow 1.10 not possible using GCP Cloud SQL for MYSQL > > > Key: AIRFLOW-3036 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3036 > Project: Apache Airflow > Issue Type: Bug > Components: core, db >Affects Versions: 1.10.0 > Environment: Google Cloud Platform, Google Kubernetes Engine, Airflow > 1.10 on Debian Stretch, Google Cloud SQL MySQL >Reporter: Smith Mathieu >Priority: Blocker > Labels: 1.10, google, google-cloud-sql > > The upgrade path to airflow 1.10 seems impossible for users of MySQL in > Google's Cloud SQL service given new mysql requirements for 1.10. > > When executing "airflow upgradedb" > ``` > INFO [alembic.runtime.migration] Running upgrade d2ae31099d61 -> > 0e2a74e0fc9f, Add time zone awareness > Traceback (most recent call last): > File "/usr/local/bin/airflow", line 32, in > args.func(args) > File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 1002, > in initdb > db_utils.initdb(settings.RBAC) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 92, > in initdb > upgradedb() > File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 346, > in upgradedb > command.upgrade(config, 'heads') > File "/usr/local/lib/python3.6/site-packages/alembic/command.py", line 174, > in upgrade > script.run_env() > File "/usr/local/lib/python3.6/site-packages/alembic/script/base.py", line > 416, in run_env > util.load_python_file(self.dir, 'env.py') > File "/usr/local/lib/python3.6/site-packages/alembic/util/pyfiles.py", line > 93, in load_python_file > module = load_module_py(module_id, path) > File "/usr/local/lib/python3.6/site-packages/alembic/util/compat.py", line > 68, in load_module_py > module_id, path).load_module(module_id) > File "", line 399, in > _check_name_wrapper > File "", line 823, in load_module > File "", line 682, in load_module > File "", line 265, in _load_module_shim > File "", line 684, in _load > File "", line 665, in _load_unlocked > File "", line 678, in exec_module > File "", line 219, in _call_with_frames_removed > File "/usr/local/lib/python3.6/site-packages/airflow/migrations/env.py", > line 91, in > run_migrations_online() > File "/usr/local/lib/python3.6/site-packages/airflow/migrations/env.py", > line 86, in run_migrations_online > context.run_migrations() > File "", line 8, in run_migrations > File > "/usr/local/lib/python3.6/site-packages/alembic/runtime/environment.py", line > 807, in run_migrations > self.get_context().run_migrations(**kw) > File "/usr/local/lib/python3.6/site-packages/alembic/runtime/migration.py", > line 321, in run_migrations > step.migration_fn(**kw) > File > "/usr/local/lib/python3.6/site-packages/airflow/migrations/versions/0e2a74e0fc9f_add_time_zone_awareness.py", > line 46, in upgrade > raise Exception("Global variable explicit_defaults_for_timestamp needs to be > on (1) for mysql") > Exception: Global variable explicit_defaults_for_timestamp needs to be on > (1) for mysql > ``` > > Reading documentation for upgrading to airflow 1.10, it seems the requirement > for explicit_defaults_for_timestamp=1 was intentional. > > However, MySQL on Google Cloud SQL does not support configuring this > variable and it is off by default. Users of MySQL and Cloud SQL do not have > an upgrade path to 1.10. Alas, so close to the mythical Kubernetes Executor. > In GCP, Cloud SQL is _the_ hosted MySQL solution. > [https://cloud.google.com/sql/docs/mysql/flags] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io commented on issue #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
codecov-io commented on issue #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#issuecomment-434413640 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=h1) Report > Merging [#4112](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **not change** coverage. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4112/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#4112 +/- ## === Coverage 76.68% 76.68% === Files 199 199 Lines 1618916189 === Hits1241412414 Misses 3775 3775 ``` -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=footer). Last update [c4e5151...d5f6220](https://codecov.io/gh/apache/incubator-airflow/pull/4112?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#discussion_r229415906 ## File path: setup.py ## @@ -245,7 +245,7 @@ def write_version(filename=os.path.join(*['airflow', 'lxml>=4.0.0', 'mock', 'mongomock', -'moto==1.1.19', +'moto==1.3.5', Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#discussion_r229415805 ## File path: airflow/contrib/hooks/aws_glue_catalog_hook.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AwsGlueCatalogHook(AwsHook): +""" +Interact with AWS Glue Catalog + +:param aws_conn_id: ID of the Airflow connection where +credentials and extra configuration are stored +:type aws_conn_id: str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + aws_conn_id='aws_default', + region_name=None, + *args, + **kwargs): +self.region_name = region_name +super(AwsGlueCatalogHook, self).__init__(aws_conn_id=aws_conn_id, *args, **kwargs) + +def get_conn(self): +""" +Returns glue connection object. +""" +self.conn = self.get_client_type('glue', self.region_name) +return self.conn + +def get_partitions(self, + database_name, + table_name, + expression='', + page_size=None, + max_items=None): +""" +Retrieves the partition values for a table. +:param database_name: The name of the catalog database where the partitions reside. Review comment: Thanks. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
mikemole commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#discussion_r229415707 ## File path: airflow/contrib/hooks/aws_glue_catalog_hook.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AwsGlueCatalogHook(AwsHook): +""" +Interact with AWS Glue Catalog + +:param aws_conn_id: ID of the Airflow connection where +credentials and extra configuration are stored +:type aws_conn_id: str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + aws_conn_id='aws_default', + region_name=None, + *args, + **kwargs): +self.region_name = region_name +super(AwsGlueCatalogHook, self).__init__(aws_conn_id=aws_conn_id, *args, **kwargs) + +def get_conn(self): +""" +Returns glue connection object. +""" +self.conn = self.get_client_type('glue', self.region_name) +return self.conn + +def get_partitions(self, + database_name, + table_name, + expression='', + page_size=None, + max_items=None): +""" +Retrieves the partition values for a table. +:param database_name: The name of the catalog database where the partitions reside. +:type database_name: str +:param table_name: The name of the partitions' table. +:type table_name: str +:param expression: An expression filtering the partitions to be returned. +Please see official AWS documentation for further information. + https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-partitions.html#aws-glue-api-catalog-partitions-GetPartitions +:type expression: str +:param page_size: pagination size +:type page_size: int +:param max_items: maximum items to return +:type max_items: int +:return: array of partition values where each value is itself an array as +a partition may be composed of multiple columns. For example: +[['2018-01-01','1'], ['2018-01-01','2'] +""" +config = { +'PageSize': page_size, +'MaxItems': max_items, +} + +paginator = self.get_conn().get_paginator('get_partitions') +response = paginator.paginate( +DatabaseName=database_name, +TableName=table_name, +Expression=expression, +PaginationConfig=config +) + +partitions = [] Review comment: Ok, I switched to set. I also changed partition values from lists to tuples since lists are not hashable and therefore can't be added to a set. A tuple makes more sense anyway. I added a commit so you could see just the latest changes. If it looks ok, let me know, and I'll squash the commits. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229398045 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): + Review comment: @XD-DENG docstrings are added at all relevant places and documentation to new feature is added. Please check. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on issue #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync
ashb commented on issue #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync URL: https://github.com/apache/incubator-airflow/pull/3770#issuecomment-434366205 We need a Jira ticket for this (good rule of thumb: if it's a code change we should have a Jira ticket so that it gets included in the release notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync
ashb commented on a change in pull request #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync URL: https://github.com/apache/incubator-airflow/pull/3770#discussion_r229375296 ## File path: airflow/config_templates/default_airflow.cfg ## @@ -593,12 +592,24 @@ logs_volume_subpath = # A shared volume claim for the logs logs_volume_claim = +# For DAGs mounted via a hostPath volume (mutually exclusive with volume claim and git-sync) +dags_volume_host = Review comment: If we don't want people using this in prod is it worth saying so in the template config? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] noqcks commented on issue #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync
noqcks commented on issue #3770: [AIRFLOW-XXX] Fix Kubernetes operator with git-sync URL: https://github.com/apache/incubator-airflow/pull/3770#issuecomment-434361653 Any update on this? We attempted to setup Airflow on kubernetes for the first time and went through hell trying to configure git-sync correctly only to find out that it never worked in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3276) Google Cloud SQL database insert / patch / delete operators
Szymon Przedwojski created AIRFLOW-3276: --- Summary: Google Cloud SQL database insert / patch / delete operators Key: AIRFLOW-3276 URL: https://issues.apache.org/jira/browse/AIRFLOW-3276 Project: Apache Airflow Issue Type: New Feature Reporter: Szymon Przedwojski Assignee: Szymon Przedwojski Operators allowing to invoke Google Cloud SQL's database methods: - CloudSqlInstanceDatabaseInsertOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert]) - CloudSqlInstanceDatabasePatchOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch]) - CloudSqlInstanceDatabaseDeleteOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/delete]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fenglu-g commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
fenglu-g commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#discussion_r229352604 ## File path: airflow/contrib/hooks/gcp_sql_hook.py ## @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +from googleapiclient.discovery import build + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + +# Number of retries - used by googleapiclient method calls to perform retries +# For requests that are "retriable" +NUM_RETRIES = 5 + +# Time to sleep between active checks of the operation results +TIME_TO_SLEEP_IN_SECONDS = 1 + + +class CloudSqlOperationStatus: +PENDING = "PENDING" +RUNNING = "RUNNING" +DONE = "DONE" +UNKNOWN = "UNKNOWN" + + +# noinspection PyAbstractClass +class CloudSqlHook(GoogleCloudBaseHook): +""" +Hook for Google Cloud SQL APIs. +""" +_conn = None + +def __init__(self, + api_version, + gcp_conn_id='google_cloud_default', + delegate_to=None): +super(CloudSqlHook, self).__init__(gcp_conn_id, delegate_to) +self.api_version = api_version + +def get_conn(self): +""" +Retrieves connection to Cloud SQL. + +:return: Google Cloud SQL services object. +:rtype: dict +""" +if not self._conn: +http_authorized = self._authorize() +self._conn = build('sqladmin', self.api_version, + http=http_authorized, cache_discovery=False) +return self._conn + +def get_instance(self, project_id, instance): +""" +Retrieves a resource containing information about a Cloud SQL instance. + +:param project_id: Project ID of the project that contains the instance. +:type project_id: str +:param instance: Database instance ID. This does not include the project ID. +:type instance: str +:return: A Cloud SQL instance resource. +:rtype: dict +""" +return self.get_conn().instances().get( +project=project_id, +instance=instance +).execute(num_retries=NUM_RETRIES) + +def insert_instance(self, project_id, body): Review comment: "create" sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] SamWildmo commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case
SamWildmo commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case URL: https://github.com/apache/incubator-airflow/pull/2367#issuecomment-434340311 I think i also encountered this but it happened once so i cant be sure. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] SamWildmo commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server
SamWildmo commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661#issuecomment-434337255 Will this hook be in 1.10.1? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zackmeso edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
zackmeso edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-433942970 @Fokko Hello. I created another PR in order to fully comply with the contribution guidelines. As for your question "Would it be possible to test this?": If you mean test the behavior of `sort_values` with a unit test, I am not sure. the `sort` call exists within the chart_data template which is a big chunk of code that does a lot of things before rendering the template. ```python @expose('/chart_data') @data_profiling_required @wwwutils.gzipped # @cache.cached(timeout=3600, key_prefix=wwwutils.make_cache_key) def chart_data(self): from airflow import macros import pandas as pd if conf.getboolean('core', 'secure_mode'): abort(404) with create_session() as session: chart_id = request.args.get('chart_id') csv = request.args.get('csv') == "true" chart = session.query(models.Chart).filter_by(id=chart_id).first() db = session.query( models.Connection).filter_by(conn_id=chart.conn_id).first() payload = { "state": "ERROR", "error": "" } # Processing templated fields try: args = ast.literal_eval(chart.default_params) if not isinstance(args, dict): raise AirflowException('Not a dict') except Exception: args = {} payload['error'] += ( "Default params is not valid, string has to evaluate as " "a Python dictionary. ") request_dict = {k: request.args.get(k) for k in request.args} args.update(request_dict) args['macros'] = macros sandbox = ImmutableSandboxedEnvironment() sql = sandbox.from_string(chart.sql).render(**args) label = sandbox.from_string(chart.label).render(**args) payload['sql_html'] = Markup(highlight( sql, lexers.SqlLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) payload['label'] = label pd.set_option('display.max_colwidth', 100) hook = db.get_hook() try: df = hook.get_pandas_df( wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type)) df = df.fillna(0) except Exception as e: payload['error'] += "SQL execution failed. Details: " + str(e) if csv: return Response( response=df.to_csv(index=False), status=200, mimetype="application/text") if not payload['error'] and len(df) == CHART_LIMIT: payload['warning'] = ( "Data has been truncated to {0}" " rows. Expect incomplete results.").format(CHART_LIMIT) if not payload['error'] and len(df) == 0: payload['error'] += "Empty result set. " elif ( not payload['error'] and chart.sql_layout == 'series' and chart.chart_type != "datatable" and len(df.columns) < 3): payload['error'] += "SQL needs to return at least 3 columns. " elif ( not payload['error'] and chart.sql_layout == 'columns' and len(df.columns) < 2): payload['error'] += "SQL needs to return at least 2 columns. " elif not payload['error']: import numpy as np chart_type = chart.chart_type data = None if chart.show_datatable or chart_type == "datatable": data = df.to_dict(orient="split") data['columns'] = [{'title': c} for c in data['columns']] payload['data'] = data # Trying to convert time to something Highcharts likes x_col = 1 if chart.sql_layout == 'series' else 0 if chart.x_is_date: try: # From string to datetime df[df.columns[x_col]] = pd.to_datetime( df[df.columns[x_col]]) df[df.columns[x_col]] = df[df.columns[x_col]].apply( lambda x: int(x.strftime("%s")) * 1000) except Exception as e: payload['error'] = "Time conversion failed" if chart_type == 'datatable': payload['state'] = 'SUCCESS' return wwwutils.json_response(payload) else: if
[GitHub] codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#issuecomment-433705746 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=h1) Report > Merging [#4111](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **not change** coverage. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4111/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) ```diff @@ Coverage Diff @@ ## master#4111 +/- ## === Coverage 76.68% 76.68% === Files 199 199 Lines 1618916189 === Hits1241412414 Misses 3775 3775 ``` -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=footer). Last update [c4e5151...e3f888e](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-3275) Google Cloud SQL Query operator
Jarek Potiuk created AIRFLOW-3275: - Summary: Google Cloud SQL Query operator Key: AIRFLOW-3275 URL: https://issues.apache.org/jira/browse/AIRFLOW-3275 Project: Apache Airflow Issue Type: New Feature Components: contrib Reporter: Jarek Potiuk Operator that performs a DDL or DML SQL query in Google Cloud SQL instance. The DQL (retrieving data from Google Cloud SQL) is not supported - you might run the SELECT queries but results of those queries are discarded. You should be able specify various connectivity methods to connect to running instance - starting from Public IP plain connection through Public IP with SSL or both TCP and socket connection via Cloud SQL Proxy. The proxy should be downloaded and started/stopped dynamically as needed by the operator. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3207) option to stop task pushing result to xcom
[ https://issues.apache.org/jira/browse/AIRFLOW-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668661#comment-16668661 ] ASF GitHub Bot commented on AIRFLOW-3207: - itscaro closed pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4116 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index f5e6e50f06..e9c7787dd3 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -66,6 +66,8 @@ class BigQueryGetDataOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str +:param do_xcom_push: return the result which also get set in XCOM +:type do_xcom_push: bool """ template_fields = ('dataset_id', 'table_id', 'max_results') ui_color = '#e4f0e8' @@ -78,6 +80,7 @@ def __init__(self, selected_fields=None, bigquery_conn_id='bigquery_default', delegate_to=None, + do_xcom_push=True, *args, **kwargs): super(BigQueryGetDataOperator, self).__init__(*args, **kwargs) @@ -87,6 +90,7 @@ def __init__(self, self.selected_fields = selected_fields self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to +self.do_xcom_push = do_xcom_push def execute(self, context): self.log.info('Fetching Data from:') @@ -113,4 +117,5 @@ def execute(self, context): single_row.append(fields['v']) table_data.append(single_row) -return table_data +if self.do_xcom_push: +return table_data diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index 5378735f94..dfb65761a1 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -311,7 +311,6 @@ def __init__( poll_sleep=10, *args, **kwargs): - super(DataFlowPythonOperator, self).__init__(*args, **kwargs) self.py_file = py_file @@ -335,9 +334,11 @@ def execute(self, context): poll_sleep=self.poll_sleep) dataflow_options = self.dataflow_default_options.copy() dataflow_options.update(self.options) + # Convert argument names from lowerCamelCase to snake case. -camel_to_snake = lambda name: re.sub( -r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) +def camel_to_snake(name): +return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) + formatted_options = {camel_to_snake(key): dataflow_options[key] for key in dataflow_options} hook.start_python_dataflow( diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 60fc2bcf15..8823c56c30 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -229,14 +229,14 @@ def _get_cluster(self, service): cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name] if cluster: return cluster[0] -return None +return def _get_cluster_state(self, service): cluster = self._get_cluster(service) if 'status' in cluster: return cluster['status']['state'] else: -return None +return def _cluster_ready(self, state, service): if state == 'RUNNING': @@ -407,7 +407,7 @@ def execute(self, context): self.cluster_name ) self._wait_for_done(service) -return True +return cluster_data = self._build_cluster_data() try: @@ -425,7 +425,7 @@ def execute(self, context): self.cluster_name ) self._wait_for_done(service) -return True +return else: raise e diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index 959543e617..c0048681cf 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -32,6 +32,8 @@ class EmrAddStepsOperator(BaseOperator): :type
[GitHub] itscaro closed pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro closed pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4116 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/bigquery_get_data.py b/airflow/contrib/operators/bigquery_get_data.py index f5e6e50f06..e9c7787dd3 100644 --- a/airflow/contrib/operators/bigquery_get_data.py +++ b/airflow/contrib/operators/bigquery_get_data.py @@ -66,6 +66,8 @@ class BigQueryGetDataOperator(BaseOperator): For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str +:param do_xcom_push: return the result which also get set in XCOM +:type do_xcom_push: bool """ template_fields = ('dataset_id', 'table_id', 'max_results') ui_color = '#e4f0e8' @@ -78,6 +80,7 @@ def __init__(self, selected_fields=None, bigquery_conn_id='bigquery_default', delegate_to=None, + do_xcom_push=True, *args, **kwargs): super(BigQueryGetDataOperator, self).__init__(*args, **kwargs) @@ -87,6 +90,7 @@ def __init__(self, self.selected_fields = selected_fields self.bigquery_conn_id = bigquery_conn_id self.delegate_to = delegate_to +self.do_xcom_push = do_xcom_push def execute(self, context): self.log.info('Fetching Data from:') @@ -113,4 +117,5 @@ def execute(self, context): single_row.append(fields['v']) table_data.append(single_row) -return table_data +if self.do_xcom_push: +return table_data diff --git a/airflow/contrib/operators/dataflow_operator.py b/airflow/contrib/operators/dataflow_operator.py index 5378735f94..dfb65761a1 100644 --- a/airflow/contrib/operators/dataflow_operator.py +++ b/airflow/contrib/operators/dataflow_operator.py @@ -311,7 +311,6 @@ def __init__( poll_sleep=10, *args, **kwargs): - super(DataFlowPythonOperator, self).__init__(*args, **kwargs) self.py_file = py_file @@ -335,9 +334,11 @@ def execute(self, context): poll_sleep=self.poll_sleep) dataflow_options = self.dataflow_default_options.copy() dataflow_options.update(self.options) + # Convert argument names from lowerCamelCase to snake case. -camel_to_snake = lambda name: re.sub( -r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) +def camel_to_snake(name): +return re.sub(r'[A-Z]', lambda x: '_' + x.group(0).lower(), name) + formatted_options = {camel_to_snake(key): dataflow_options[key] for key in dataflow_options} hook.start_python_dataflow( diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 60fc2bcf15..8823c56c30 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -229,14 +229,14 @@ def _get_cluster(self, service): cluster = [c for c in cluster_list if c['clusterName'] == self.cluster_name] if cluster: return cluster[0] -return None +return def _get_cluster_state(self, service): cluster = self._get_cluster(service) if 'status' in cluster: return cluster['status']['state'] else: -return None +return def _cluster_ready(self, state, service): if state == 'RUNNING': @@ -407,7 +407,7 @@ def execute(self, context): self.cluster_name ) self._wait_for_done(service) -return True +return cluster_data = self._build_cluster_data() try: @@ -425,7 +425,7 @@ def execute(self, context): self.cluster_name ) self._wait_for_done(service) -return True +return else: raise e diff --git a/airflow/contrib/operators/emr_add_steps_operator.py b/airflow/contrib/operators/emr_add_steps_operator.py index 959543e617..c0048681cf 100644 --- a/airflow/contrib/operators/emr_add_steps_operator.py +++ b/airflow/contrib/operators/emr_add_steps_operator.py @@ -32,6 +32,8 @@ class EmrAddStepsOperator(BaseOperator): :type aws_conn_id: str :param steps: boto3 style steps to be added to the jobflow. (templated) :type steps: list +:param do_xcom_push: return the Step IDs which also get set in XCOM +:type do_xcom_push: bool """ template_fields =
[jira] [Created] (AIRFLOW-3274) Add run_as_user and fs_group security context options for KubernetesExecutor
Philippe Gagnon created AIRFLOW-3274: Summary: Add run_as_user and fs_group security context options for KubernetesExecutor Key: AIRFLOW-3274 URL: https://issues.apache.org/jira/browse/AIRFLOW-3274 Project: Apache Airflow Issue Type: Improvement Components: kubernetes, scheduler Reporter: Philippe Gagnon At this time it is not possible to add `run_as_user` or `fs_group` securityContext options to worker pods when using KubernetesExecutor. This makes it harder to use KubernetesExecutor on clusters with pod security policies which do not allow containers to run as root. I have already implemented this functionality for my internal use and will propose a PR soon. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229279325 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: I missed the case about operators outside of this repository. We might start to make it defaults to False for operators in this repository, for other modules the default is True for backwards compatibility until a major version? To have to disable XCOM in each operator call might be something not wanted. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] itscaro opened a new pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro opened a new pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4116 Relates to #4056 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3207) option to stop task pushing result to xcom
[ https://issues.apache.org/jira/browse/AIRFLOW-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668604#comment-16668604 ] ASF GitHub Bot commented on AIRFLOW-3207: - itscaro opened a new pull request #4116: WIP [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4116 Relates to #4056 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > option to stop task pushing result to xcom > -- > > Key: AIRFLOW-3207 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3207 > Project: Apache Airflow > Issue Type: Improvement > Components: models, operators >Reporter: Ben Marengo >Assignee: Ben Marengo >Priority: Major > > follows the completion of AIRFLOW-886, and closure (incomplete) of AIRFLOW-888 > i would actually like functionality similar to this, but i dont think it > necessitates the global config flag. > - BaseOperator should have an option to stop a task pushing the return value > of execute() to xcom. > - the default should be to push (preserves backward compat) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229268278 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: You can still make it backward compatible by adding `True` to the operators which have this behavior: As what I tried to do in https://github.com/apache/incubator-airflow/pull/3981 Or a branch over this PR: https://github.com/apache/incubator-airflow/compare/master...itscaro:do_xcom_push There is no good to introduce this default behavior as `True` in the models This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229268278 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: You can still make it backward compatible by adding `True` to the operators which have this behavior: As what I tried to do in https://github.com/apache/incubator-airflow/pull/3981 Or a branch over this PR: https://github.com/apache/incubator-airflow/compare/master...itscaro:do_xcom_push?expand=1 There is no good to introduce this default behavior as `True` in the models This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3270) Apache airflow 1.10.0 integration with LDAP anonmyously
[ https://issues.apache.org/jira/browse/AIRFLOW-3270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668592#comment-16668592 ] Hari Krishna ADDEPALLI LN commented on AIRFLOW-3270: Here - please respond! [~ashb] - please advice, we are using airflow 1.10.0 > Apache airflow 1.10.0 integration with LDAP anonmyously > --- > > Key: AIRFLOW-3270 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3270 > Project: Apache Airflow > Issue Type: Bug > Components: authentication >Affects Versions: 1.10.0 >Reporter: Hari Krishna ADDEPALLI LN >Priority: Critical > > Please advise what to include in airflow.cfg when going to integrate with > LDAP anonymously ? We are using DS389 as LDAP server vendor name. > > {noformat} > [webserver] > authenticate = True > auth_backend = airflow.contrib.auth.backends.ldap_auth > {noformat} > > And > > {noformat} > [ldap] > uri = ldap://nsp-daf178e8.ad1.prd.us-phx.odc.im:389 > user_filter = memberOf=cn=rvs-all-prd_usphx,ou=groups,dc=odc,dc=im > user_name_attr = uid > group_member_attr = > superuser_filter = memberOf=cn=rvd-sudo_all-prd_usphx,ou=groups,dc=odc,dc=im > data_profiler_filter = > bind_user = > bind_password = > basedn = ou=people,dc=odc,dc=im > cacert = /opt/orchestration/airflow/ldap_ca.crt > search_scope = LEVEL > {noformat} > I am hitting below exception: > {noformat} > File "/usr/local/lib/python3.5/site-packages/ldap3/operation/search.py", > line 215, in parse_filter > raise LDAPInvalidFilterError('malformed filter') > ldap3.core.exceptions.LDAPInvalidFilterError: malformed filter > {noformat} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ashb commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
ashb commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229271642 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: Airflow currently operates as If this value has True by default -- if I write a custom operator and `return "foo"` from it that will make it to XCom. This is the behaviour we need to keep. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229268278 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: You can still make it backward compatible by adding `True` to the operators which have this behavior: As what I tried to do in https://github.com/apache/incubator-airflow/pull/3981 There is no good to introduce this default behavior as `True` in the models This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229268278 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: You can still make it backward compatible by adding True to the operators which have this behavior: As what I tried to do in https://github.com/apache/incubator-airflow/pull/3981 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom
itscaro commented on a change in pull request #4056: [AIRFLOW-3207] option to stop task pushing result to xcom URL: https://github.com/apache/incubator-airflow/pull/4056#discussion_r229268278 ## File path: airflow/models.py ## @@ -2473,6 +2476,7 @@ def __init__( run_as_user=None, task_concurrency=None, executor_config=None, +do_xcom_push=True, Review comment: You can still make it backward compatible by adding `True` to the operators which have this behavior: As what I tried to do in https://github.com/apache/incubator-airflow/pull/3981 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
ashb commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#discussion_r229245346 ## File path: airflow/contrib/hooks/aws_glue_catalog_hook.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AwsGlueCatalogHook(AwsHook): +""" +Interact with AWS Glue Catalog + +:param aws_conn_id: ID of the Airflow connection where +credentials and extra configuration are stored +:type aws_conn_id: str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + aws_conn_id='aws_default', + region_name=None, + *args, + **kwargs): +self.region_name = region_name +super(AwsGlueCatalogHook, self).__init__(aws_conn_id=aws_conn_id, *args, **kwargs) + +def get_conn(self): +""" +Returns glue connection object. +""" +self.conn = self.get_client_type('glue', self.region_name) +return self.conn + +def get_partitions(self, + database_name, + table_name, + expression='', + page_size=None, + max_items=None): +""" +Retrieves the partition values for a table. +:param database_name: The name of the catalog database where the partitions reside. +:type database_name: str +:param table_name: The name of the partitions' table. +:type table_name: str +:param expression: An expression filtering the partitions to be returned. +Please see official AWS documentation for further information. + https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-partitions.html#aws-glue-api-catalog-partitions-GetPartitions +:type expression: str +:param page_size: pagination size +:type page_size: int +:param max_items: maximum items to return +:type max_items: int +:return: array of partition values where each value is itself an array as +a partition may be composed of multiple columns. For example: +[['2018-01-01','1'], ['2018-01-01','2'] +""" +config = { +'PageSize': page_size, +'MaxItems': max_items, +} + +paginator = self.get_conn().get_paginator('get_partitions') +response = paginator.paginate( +DatabaseName=database_name, +TableName=table_name, +Expression=expression, +PaginationConfig=config +) + +partitions = [] Review comment: (Unless the order is important, but I don't think it is, not to the Glue API itself?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor
ashb commented on a change in pull request #4112: [AIRFLOW-3212] Add AwsGlueCatalogPartitionSensor URL: https://github.com/apache/incubator-airflow/pull/4112#discussion_r229244536 ## File path: airflow/contrib/hooks/aws_glue_catalog_hook.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from airflow.contrib.hooks.aws_hook import AwsHook + + +class AwsGlueCatalogHook(AwsHook): +""" +Interact with AWS Glue Catalog + +:param aws_conn_id: ID of the Airflow connection where +credentials and extra configuration are stored +:type aws_conn_id: str +:param region_name: aws region name (example: us-east-1) +:type region_name: str +""" + +def __init__(self, + aws_conn_id='aws_default', + region_name=None, + *args, + **kwargs): +self.region_name = region_name +super(AwsGlueCatalogHook, self).__init__(aws_conn_id=aws_conn_id, *args, **kwargs) + +def get_conn(self): +""" +Returns glue connection object. +""" +self.conn = self.get_client_type('glue', self.region_name) +return self.conn + +def get_partitions(self, + database_name, + table_name, + expression='', + page_size=None, + max_items=None): +""" +Retrieves the partition values for a table. +:param database_name: The name of the catalog database where the partitions reside. +:type database_name: str +:param table_name: The name of the partitions' table. +:type table_name: str +:param expression: An expression filtering the partitions to be returned. +Please see official AWS documentation for further information. + https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-catalog-partitions.html#aws-glue-api-catalog-partitions-GetPartitions +:type expression: str +:param page_size: pagination size +:type page_size: int +:param max_items: maximum items to return +:type max_items: int +:return: array of partition values where each value is itself an array as +a partition may be composed of multiple columns. For example: +[['2018-01-01','1'], ['2018-01-01','2'] +""" +config = { +'PageSize': page_size, +'MaxItems': max_items, +} + +paginator = self.get_conn().get_paginator('get_partitions') +response = paginator.paginate( +DatabaseName=database_name, +TableName=table_name, +Expression=expression, +PaginationConfig=config +) + +partitions = [] Review comment: You can have a set of lists can't you. i.e. so the end result is `{['string']}`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator
kaxil commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator URL: https://github.com/apache/incubator-airflow/pull/4102#issuecomment-434239916 Well, you can call me blind :D Thanks @Fokko , I will try this later today. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Fokko commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator
Fokko commented on issue #4102: [AIRFLOW-3262] Add param to log response when using SimpleHttpOperator URL: https://github.com/apache/incubator-airflow/pull/4102#issuecomment-434238775 An example here: https://github.com/apache/incubator-airflow/blob/master/tests/contrib/hooks/test_spark_sql_hook.py#L96 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kaxil closed pull request #4115: [AIRFLOW-XXX] Minor fix of docs/scheduler.rst
kaxil closed pull request #4115: [AIRFLOW-XXX] Minor fix of docs/scheduler.rst URL: https://github.com/apache/incubator-airflow/pull/4115 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/scheduler.rst b/docs/scheduler.rst index dc43c111fe..eb09be8d3c 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -23,7 +23,7 @@ start date, at the END of the period. The scheduler starts an instance of the executor specified in the your ``airflow.cfg``. If it happens to be the ``LocalExecutor``, tasks will be -executed as subprocesses; in the case of ``CeleryExecutor`` and +executed as subprocesses; in the case of ``CeleryExecutor``, ``DaskExecutor``, and ``MesosExecutor``, tasks are executed remotely. To start a scheduler, simply run the command: This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229209605 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): + +ui_color = '#44b5e2' +template_fields = ('query', 'database', 'output_location') + +@apply_defaults +def __init__(self, query, database, output_location, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaOperator, self).__init__(*args, **kwargs) +self.query = query +self.database = database +self.output_location = output_location +self.aws_conn_id = aws_conn_id +self.client_request_token = kwargs.get('client_request_token') +self.query_execution_context = kwargs.get('query_execution_context') or {} +self.result_configuration = kwargs.get('result_configuration') or {} +self.query_execution_context['Database'] = self.database +self.result_configuration['OutputLocation'] = self.output_location +self.query_execution_id = None +self.hook = self.get_hook(*args, **kwargs) + +def get_hook(self, *args, **kwargs): +return AWSAthenaHook(self.aws_conn_id, *args, **kwargs) + +def execute(self, context): +self.query_execution_id = self.hook.run_query(self.query, self.query_execution_context, + self.result_configuration, self.client_request_token) + +def post_execute(self, context, result=None): +self.hook.poll_query_status(self.query_execution_id) + +def on_kill(self): +self.log.info('⚰️⚰️⚰️ Received a kill Signal. Time to Die.️') Review comment: This will be rendered as it is since our logging supports unicode. Please see attached screenshot of above log ![img-20181030-wa](https://user-images.githubusercontent.com/12140904/47704786-cc6a2880-dc4a-11e8-96e0-30027db92f45.jpg) ![img-20181030-wa0001](https://user-images.githubusercontent.com/12140904/47705619-0dfbd300-dc4d-11e8-9245-f6c9e60ca4be.jpg) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229209605 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): + +ui_color = '#44b5e2' +template_fields = ('query', 'database', 'output_location') + +@apply_defaults +def __init__(self, query, database, output_location, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaOperator, self).__init__(*args, **kwargs) +self.query = query +self.database = database +self.output_location = output_location +self.aws_conn_id = aws_conn_id +self.client_request_token = kwargs.get('client_request_token') +self.query_execution_context = kwargs.get('query_execution_context') or {} +self.result_configuration = kwargs.get('result_configuration') or {} +self.query_execution_context['Database'] = self.database +self.result_configuration['OutputLocation'] = self.output_location +self.query_execution_id = None +self.hook = self.get_hook(*args, **kwargs) + +def get_hook(self, *args, **kwargs): +return AWSAthenaHook(self.aws_conn_id, *args, **kwargs) + +def execute(self, context): +self.query_execution_id = self.hook.run_query(self.query, self.query_execution_context, + self.result_configuration, self.client_request_token) + +def post_execute(self, context, result=None): +self.hook.poll_query_status(self.query_execution_id) + +def on_kill(self): +self.log.info('⚰️⚰️⚰️ Received a kill Signal. Time to Die.️') Review comment: This will be rendered as it is since our logging supports unicode. Please see attached screenshot of above log ![img-20181030-wa](https://user-images.githubusercontent.com/12140904/47704786-cc6a2880-dc4a-11e8-96e0-30027db92f45.jpg) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229209605 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): + +ui_color = '#44b5e2' +template_fields = ('query', 'database', 'output_location') + +@apply_defaults +def __init__(self, query, database, output_location, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaOperator, self).__init__(*args, **kwargs) +self.query = query +self.database = database +self.output_location = output_location +self.aws_conn_id = aws_conn_id +self.client_request_token = kwargs.get('client_request_token') +self.query_execution_context = kwargs.get('query_execution_context') or {} +self.result_configuration = kwargs.get('result_configuration') or {} +self.query_execution_context['Database'] = self.database +self.result_configuration['OutputLocation'] = self.output_location +self.query_execution_id = None +self.hook = self.get_hook(*args, **kwargs) + +def get_hook(self, *args, **kwargs): +return AWSAthenaHook(self.aws_conn_id, *args, **kwargs) + +def execute(self, context): +self.query_execution_id = self.hook.run_query(self.query, self.query_execution_context, + self.result_configuration, self.client_request_token) + +def post_execute(self, context, result=None): +self.hook.poll_query_status(self.query_execution_id) + +def on_kill(self): +self.log.info('⚰️⚰️⚰️ Received a kill Signal. Time to Die.️') Review comment: This will be rendered as it is since our logging supports unicode. Please see attached screenshot of above log This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services