[GitHub] [airflow] abdulbasitds commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
abdulbasitds commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625666407 > I ran some end-to-end tests using this code in one of my dag files. It's working ok if you make those changes following my comments. > > Right now I'm trying to retrieve the logs generated by Glue jobs so that we could print out the logs in Airflow. Without the logs in Airflow, debugging failed jobs is a hassle. > > Update: > Obtaining logs is easy, just add this in the operator class: > > ```python > GLUE_LOGS_GROUP = "/aws-glue/jobs/output" > GLUE_ERRS_GROUP = "/aws-glue/jobs/error" > ``` > > as class attributes > > ```python > def get_glue_logs(self, log_group_name, log_stream_name): > """Glue logs are too chatty, only get the ones that have errors""" > self.log.info('Glue job logs output from group %s:', log_group_name) > for event in self.get_logs_hook().get_log_events( > log_group_name, > log_stream_name, > ): > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) > event_msg = event['message'] > # Glue logs are extremely chatty, we only get log entries that have "error" > if "error" in event_msg: > self.log.info("[%s] %s", event_dt.isoformat(), event_msg) > > def get_logs_hook(self): > """Create and return an AwsLogsHook.""" > return AwsLogsHook( > aws_conn_id=self.aws_conn_id, > region_name=self.awslogs_region > ) > ``` > > as class methods, and call it in `execute()` > > ```python > glue_job_run_id = glue_job_run['JobRunId'] > > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id) > ``` > I ran some end-to-end tests using this code in one of my dag files. It's working ok if you make those changes following my comments. > > Right now I'm trying to retrieve the logs generated by Glue jobs so that we could print out the logs in Airflow. Without the logs in Airflow, debugging failed jobs is a hassle. > > Update: > Obtaining logs is easy, just add this in the operator class: > > ```python > GLUE_LOGS_GROUP = "/aws-glue/jobs/output" > GLUE_ERRS_GROUP = "/aws-glue/jobs/error" > ``` > > as class attributes > > ```python > def get_glue_logs(self, log_group_name, log_stream_name): > """Glue logs are too chatty, only get the ones that have errors""" > self.log.info('Glue job logs output from group %s:', log_group_name) > for event in self.get_logs_hook().get_log_events( > log_group_name, > log_stream_name, > ): > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) > event_msg = event['message'] > # Glue logs are extremely chatty, we only get log entries that have "error" > if "error" in event_msg: > self.log.info("[%s] %s", event_dt.isoformat(), event_msg) > > def get_logs_hook(self): > """Create and return an AwsLogsHook.""" > return AwsLogsHook( > aws_conn_id=self.aws_conn_id, > region_name=self.awslogs_region > ) > ``` > > as class methods, and call it in `execute()` > > ```python > glue_job_run_id = glue_job_run['JobRunId'] > > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id) > ``` @zachliu may be we can extend logging latter, for now it would be difficult for me to provide description to the argument and make sure if the code is correct as I havent used loggin much This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration
[ https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102317#comment-17102317 ] ASF GitHub Bot commented on AIRFLOW-2310: - abdulbasitds commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625666407 > I ran some end-to-end tests using this code in one of my dag files. It's working ok if you make those changes following my comments. > > Right now I'm trying to retrieve the logs generated by Glue jobs so that we could print out the logs in Airflow. Without the logs in Airflow, debugging failed jobs is a hassle. > > Update: > Obtaining logs is easy, just add this in the operator class: > > ```python > GLUE_LOGS_GROUP = "/aws-glue/jobs/output" > GLUE_ERRS_GROUP = "/aws-glue/jobs/error" > ``` > > as class attributes > > ```python > def get_glue_logs(self, log_group_name, log_stream_name): > """Glue logs are too chatty, only get the ones that have errors""" > self.log.info('Glue job logs output from group %s:', log_group_name) > for event in self.get_logs_hook().get_log_events( > log_group_name, > log_stream_name, > ): > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) > event_msg = event['message'] > # Glue logs are extremely chatty, we only get log entries that have "error" > if "error" in event_msg: > self.log.info("[%s] %s", event_dt.isoformat(), event_msg) > > def get_logs_hook(self): > """Create and return an AwsLogsHook.""" > return AwsLogsHook( > aws_conn_id=self.aws_conn_id, > region_name=self.awslogs_region > ) > ``` > > as class methods, and call it in `execute()` > > ```python > glue_job_run_id = glue_job_run['JobRunId'] > > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id) > ``` > I ran some end-to-end tests using this code in one of my dag files. It's working ok if you make those changes following my comments. > > Right now I'm trying to retrieve the logs generated by Glue jobs so that we could print out the logs in Airflow. Without the logs in Airflow, debugging failed jobs is a hassle. > > Update: > Obtaining logs is easy, just add this in the operator class: > > ```python > GLUE_LOGS_GROUP = "/aws-glue/jobs/output" > GLUE_ERRS_GROUP = "/aws-glue/jobs/error" > ``` > > as class attributes > > ```python > def get_glue_logs(self, log_group_name, log_stream_name): > """Glue logs are too chatty, only get the ones that have errors""" > self.log.info('Glue job logs output from group %s:', log_group_name) > for event in self.get_logs_hook().get_log_events( > log_group_name, > log_stream_name, > ): > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0) > event_msg = event['message'] > # Glue logs are extremely chatty, we only get log entries that have "error" > if "error" in event_msg: > self.log.info("[%s] %s", event_dt.isoformat(), event_msg) > > def get_logs_hook(self): > """Create and return an AwsLogsHook.""" > return AwsLogsHook( > aws_conn_id=self.aws_conn_id, > region_name=self.awslogs_region > ) > ``` > > as class methods, and call it in `execute()` > > ```python > glue_job_run_id = glue_job_run['JobRunId'] > > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id) > ``` @zachliu may be we can extend logging latter, for now it would be difficult for me to provide description to the argument and make sure if the code is correct as I havent used loggin much This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable AWS Glue Job Integration > --- > > Key: AIRFLOW-2310 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2310 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Olalekan Elesin >Assignee: Olalekan Elesin >Priority: Major > Labels: AWS > > Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs > and ETL pipelines can be orchestrated with Airflow -- T
[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration
[ https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102313#comment-17102313 ] ASF GitHub Bot commented on AIRFLOW-2310: - abdulbasitds commented on a change in pull request #6007: URL: https://github.com/apache/airflow/pull/6007#discussion_r421971459 ## File path: airflow/providers/amazon/aws/operators/glue.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import unicode_literals + +from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +import os.path + + +class AWSGlueJobOperator(BaseOperator): +""" +Creates an AWS Glue Job. AWS Glue is a serverless Spark +ETL service for running Spark Jobs on the AWS cloud. +Language support: Python and Scala + +:param job_name: unique job name per AWS Account +:type job_name: Optional[str] +:param script_location: location of ETL script. Must be a local or S3 path +:type script_location: Optional[str] +:param job_desc: job description details +:type job_desc: Optional[str] +:param concurrent_run_limit: The maximum number of concurrent runs allowed for a job +:type concurrent_run_limit: Optional[int] +:param script_args: etl script arguments and AWS Glue arguments +:type script_args: dict +:param connections: AWS Glue connections to be used by the job. +:type connections: list +:param retry_limit: The maximum number of times to retry this job if it fails +:type retry_limit:Optional[int] +:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. +:type num_of_dpus: int +:param region_name: aws region name (example: us-east-1) +:type region_name: str +:param s3_bucket: S3 bucket where logs and local etl script will be uploaded +:type s3_bucket: Optional[str] +:param iam_role_name: AWS IAM Role for Glue Job Execution +:type iam_role_name: Optional[str] +""" +template_fields = () +template_ext = () +ui_color = '#ededed' + +@apply_defaults +def __init__(self, + job_name='aws_glue_default_job', + job_desc='AWS Glue Job with Airflow', + script_location=None, + concurrent_run_limit=None, + script_args=None, + connections=None, + retry_limit=None, + num_of_dpus=6, + aws_conn_id='aws_default', + region_name=None, + s3_bucket=None, + iam_role_name=None, + *args, **kwargs + ): +super(AWSGlueJobOperator, self).__init__(*args, **kwargs) +self.job_name = job_name +self.job_desc = job_desc +self.script_location = script_location +self.concurrent_run_limit = concurrent_run_limit +self.script_args = script_args or {} +self.connections = connections or [] +self.retry_limit = retry_limit +self.num_of_dpus = num_of_dpus +self.aws_conn_id = aws_conn_id, +self.region_name = region_name +self.s3_bucket = s3_bucket +self.iam_role_name = iam_role_name +self.S3_PROTOCOL = "s3://" +self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' + +def execute(self, context): +""" +Executes AWS Glue Job from Airflow + +:return: the id of the current glue job. +""" +if not self.script_location.startsWith(self.S3_PROTOCOL): Review comment: @zachliu I have made this change. @feluelle scrpt_location is already Optional[str] This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable AWS Gl
[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
abdulbasitds commented on a change in pull request #6007: URL: https://github.com/apache/airflow/pull/6007#discussion_r421971459 ## File path: airflow/providers/amazon/aws/operators/glue.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import unicode_literals + +from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +import os.path + + +class AWSGlueJobOperator(BaseOperator): +""" +Creates an AWS Glue Job. AWS Glue is a serverless Spark +ETL service for running Spark Jobs on the AWS cloud. +Language support: Python and Scala + +:param job_name: unique job name per AWS Account +:type job_name: Optional[str] +:param script_location: location of ETL script. Must be a local or S3 path +:type script_location: Optional[str] +:param job_desc: job description details +:type job_desc: Optional[str] +:param concurrent_run_limit: The maximum number of concurrent runs allowed for a job +:type concurrent_run_limit: Optional[int] +:param script_args: etl script arguments and AWS Glue arguments +:type script_args: dict +:param connections: AWS Glue connections to be used by the job. +:type connections: list +:param retry_limit: The maximum number of times to retry this job if it fails +:type retry_limit:Optional[int] +:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. +:type num_of_dpus: int +:param region_name: aws region name (example: us-east-1) +:type region_name: str +:param s3_bucket: S3 bucket where logs and local etl script will be uploaded +:type s3_bucket: Optional[str] +:param iam_role_name: AWS IAM Role for Glue Job Execution +:type iam_role_name: Optional[str] +""" +template_fields = () +template_ext = () +ui_color = '#ededed' + +@apply_defaults +def __init__(self, + job_name='aws_glue_default_job', + job_desc='AWS Glue Job with Airflow', + script_location=None, + concurrent_run_limit=None, + script_args=None, + connections=None, + retry_limit=None, + num_of_dpus=6, + aws_conn_id='aws_default', + region_name=None, + s3_bucket=None, + iam_role_name=None, + *args, **kwargs + ): +super(AWSGlueJobOperator, self).__init__(*args, **kwargs) +self.job_name = job_name +self.job_desc = job_desc +self.script_location = script_location +self.concurrent_run_limit = concurrent_run_limit +self.script_args = script_args or {} +self.connections = connections or [] +self.retry_limit = retry_limit +self.num_of_dpus = num_of_dpus +self.aws_conn_id = aws_conn_id, +self.region_name = region_name +self.s3_bucket = s3_bucket +self.iam_role_name = iam_role_name +self.S3_PROTOCOL = "s3://" +self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/' + +def execute(self, context): +""" +Executes AWS Glue Job from Airflow + +:return: the id of the current glue job. +""" +if not self.script_location.startsWith(self.S3_PROTOCOL): Review comment: @zachliu I have made this change. @feluelle scrpt_location is already Optional[str] This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #8779: Webserver OOM killed by Kubernetes when using KubernetesExecutor
boring-cyborg[bot] commented on issue #8779: URL: https://github.com/apache/airflow/issues/8779#issuecomment-625652250 Thanks for opening your first issue here! Be sure to follow the issue template! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] LarsAlmgren opened a new issue #8779: Webserver OOM killed by Kubernetes when using KubernetesExecutor
LarsAlmgren opened a new issue #8779: URL: https://github.com/apache/airflow/issues/8779 **Apache Airflow version**: **Kubernetes version (if you are using kubernetes)** (use `kubectl version`): 1.17.5 **Environment**: - **Cloud provider or hardware configuration**: AWS - **OS** (e.g. from /etc/os-release): Ubuntu 16.04.6 LTS - **Kernel** (e.g. `uname -a`): `Linux salt 4.4.0-1106-aws #117-Ubuntu SMP Wed Apr 8 09:52:02 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux` **What happened**: We switched from `LocalExecutor` to `KubernetesExecutor` and noticed that the webserver started to get OOM killed by Kubernetes. We solved it by increasing the memory resources / limit in Kubernetes. **What you expected to happen**: I did not expect the webserver to be dependent on what executor is used. **How to reproduce it**: We ran the webserver before using `KubernetesExecutor` with: ``` resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "1Gi" cpu: "500m" ``` Run the webserver with minimal resources and then switch to `KubernetesExecutor`. **Anything else we need to know**: We have noticed this in both environments where we started using `KubernetesExecutor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy opened a new pull request #8778: Add separate example DAGs and system tests for google cloud speech
ephraimbuddy opened a new pull request #8778: URL: https://github.com/apache/airflow/pull/8778 --- This PR fixes some of the issues in #8280 - Separated the speech system tests into different modules - Separated the guides into different files - Added different examples for the speech operators Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch master updated (6e4f5fa -> b7566e1)
This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git. from 6e4f5fa [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509) add b7566e1 Add SQL query tracking for pytest (#8754) No new revisions were added by this update. Summary of changes: TESTING.rst | 30 scripts/perf/perf_kit/__init__.py | 2 +- scripts/perf/perf_kit/sqlalchemy.py | 91 +++-- tests/conftest.py | 70 4 files changed, 168 insertions(+), 25 deletions(-)
[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: Sharp eyes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102228#comment-17102228 ] ASF GitHub Bot commented on AIRFLOW-4549: - TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: Sharp eyes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102227#comment-17102227 ] ASF GitHub Bot commented on AIRFLOW-4549: - TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: Sharp eyes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102226#comment-17102226 ] ASF GitHub Bot commented on AIRFLOW-4549: - TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923688 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: OK got it. Since the tests pass anyways without any scheduler deadlock, there shouldn't be any problem. I will remove the comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: Sharp eyes! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
TeddyHartanto commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421923688 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: OK got it. Since the tests pass anyways without any scheduler deadlock, there shouldn't be any problem. I will remove the comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] schnie opened a new pull request #8777: Add helm chart
schnie opened a new pull request #8777: URL: https://github.com/apache/airflow/pull/8777 This PR adds a default helm chart for Airflow. This chart is based on the one we use at Astronomer to manage hundreds of production deployments. The chart has been cleaned up to remove any references to the Astronomer platform or anything specific to the surrounding infrastructure. The `bin` directory contains some scripts to help run this chart locally in [kind](https://kind.sigs.k8s.io/docs/user/quick-start/), as well as packaging and releasing it to a helm repository. The scripts reference the Astronomer helm repo right now, but I figured we could use it as a starting point and tweak these scripts, or remove them completely to fit into how we want to manage this going forward. Opening this as a draft PR for now to start some discussion. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [ ] Unit tests coverage for changes (not needed for documentation changes) - [ ] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] tag nightly-master updated (bd29ee3 -> 6e4f5fa)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to tag nightly-master in repository https://gitbox.apache.org/repos/asf/airflow.git. *** WARNING: tag nightly-master was modified! *** from bd29ee3 (commit) to 6e4f5fa (commit) from bd29ee3 Ensure test_logging_config.test_reload_module works in spawn mode. (#8741) add d15839d Latest debian-buster release broke image build (#8758) add ff5b701 Add google_api_to_s3_transfer example dags and system tests (#8581) add 7c04604 Add google_api_to_s3_transfer docs howto link (#8761) add 723c52c Add documentation for SpannerDeployInstanceOperator (#8750) add 6e4f5fa [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509) No new revisions were added by this update. Summary of changes: Dockerfile.ci | 1 + TESTING.rst| 16 +-- .../example_external_task_marker_dag.py| 14 ++ .../example_google_api_to_s3_transfer_advanced.py | 132 +++ .../example_google_api_to_s3_transfer_basic.py | 56 .../providers/google/cloud/operators/spanner.py| 4 + airflow/sensors/external_task_sensor.py| 68 -- backport_packages/setup_backport_packages.py | 7 + .../amazon/aws/google_api_to_s3_transfer.rst | 141 + docs/howto/operator/external_task_sensor.rst | 5 + docs/howto/operator/gcp/spanner.rst| 38 ++ docs/operators-and-hooks-ref.rst | 4 +- .../test_google_api_to_s3_transfer_system.py | 65 ++ tests/sensors/test_external_task_sensor.py | 58 - tests/test_utils/amazon_system_helpers.py | 83 15 files changed, 667 insertions(+), 25 deletions(-) create mode 100644 airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py create mode 100644 airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py create mode 100644 docs/howto/operator/amazon/aws/google_api_to_s3_transfer.rst create mode 100644 tests/providers/amazon/aws/operators/test_google_api_to_s3_transfer_system.py create mode 100644 tests/test_utils/amazon_system_helpers.py
[airflow] tag nightly-master updated (bd29ee3 -> 6e4f5fa)
This is an automated email from the ASF dual-hosted git repository. github-bot pushed a change to tag nightly-master in repository https://gitbox.apache.org/repos/asf/airflow.git. *** WARNING: tag nightly-master was modified! *** from bd29ee3 (commit) to 6e4f5fa (commit) from bd29ee3 Ensure test_logging_config.test_reload_module works in spawn mode. (#8741) add d15839d Latest debian-buster release broke image build (#8758) add ff5b701 Add google_api_to_s3_transfer example dags and system tests (#8581) add 7c04604 Add google_api_to_s3_transfer docs howto link (#8761) add 723c52c Add documentation for SpannerDeployInstanceOperator (#8750) add 6e4f5fa [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed (#8509) No new revisions were added by this update. Summary of changes: Dockerfile.ci | 1 + TESTING.rst| 16 +-- .../example_external_task_marker_dag.py| 14 ++ .../example_google_api_to_s3_transfer_advanced.py | 132 +++ .../example_google_api_to_s3_transfer_basic.py | 56 .../providers/google/cloud/operators/spanner.py| 4 + airflow/sensors/external_task_sensor.py| 68 -- backport_packages/setup_backport_packages.py | 7 + .../amazon/aws/google_api_to_s3_transfer.rst | 141 + docs/howto/operator/external_task_sensor.rst | 5 + docs/howto/operator/gcp/spanner.rst| 38 ++ docs/operators-and-hooks-ref.rst | 4 +- .../test_google_api_to_s3_transfer_system.py | 65 ++ tests/sensors/test_external_task_sensor.py | 58 - tests/test_utils/amazon_system_helpers.py | 83 15 files changed, 667 insertions(+), 25 deletions(-) create mode 100644 airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py create mode 100644 airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py create mode 100644 docs/howto/operator/amazon/aws/google_api_to_s3_transfer.rst create mode 100644 tests/providers/amazon/aws/operators/test_google_api_to_s3_transfer_system.py create mode 100644 tests/test_utils/amazon_system_helpers.py
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102169#comment-17102169 ] ASF GitHub Bot commented on AIRFLOW-4549: - kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421894092 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: yes, remove the comments in the PR, should be fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102165#comment-17102165 ] ASF GitHub Bot commented on AIRFLOW-4549: - kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421893773 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: Scheduler deadlock in which case, there are known issues of Scheduler deadlock if MySql. Other than that I am not sure if there are any This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421894092 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: yes, remove the comments in the PR, should be fine This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421893773 ## File path: tests/models/test_dagrun.py ## @@ -552,3 +561,131 @@ def with_all_tasks_removed(dag): dagrun.verify_integrity() flaky_ti.refresh_from_db() self.assertEqual(State.NONE, flaky_ti.state) + +def test_depends_on_past(self): +# dag_id = 'test_depends_on_past' Review comment: Scheduler deadlock in which case, there are known issues of Scheduler deadlock if MySql. Other than that I am not sure if there are any This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks
[ https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102164#comment-17102164 ] ASF GitHub Bot commented on AIRFLOW-4549: - kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421893536 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: ```suggestion # dag2 has been moved to test_prev_dagrun_deps.py ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > wait_for_downstream does not respect skipped tasks > -- > > Key: AIRFLOW-4549 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4549 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Dima Kamalov >Assignee: Teddy Hartanto >Priority: Major > > See > [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream
kaxil commented on a change in pull request #7735: URL: https://github.com/apache/airflow/pull/7735#discussion_r421893536 ## File path: tests/dags/test_issue_1225.py ## @@ -47,12 +47,7 @@ def fail(): dag=dag1, pool='test_backfill_pooled_task_pool',) -# DAG tests depends_on_past dependencies -dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args) -dag2_task1 = DummyOperator( -task_id='test_dop_task', -dag=dag2, -depends_on_past=True,) +# dag2 has been moved to test_past_dagrun_deps.py Review comment: ```suggestion # dag2 has been moved to test_prev_dagrun_deps.py ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2
[ https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102162#comment-17102162 ] ASF GitHub Bot commented on AIRFLOW-4543: - kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892844 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: Pushed a commit to PR: https://github.com/apache/airflow/pull/5519/commits/d81d6e947ba364b0dd9e730507e51ff2a0d3d3e2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update slack operator to support slackclient v2 > --- > > Key: AIRFLOW-4543 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4543 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks, operators >Reporter: Sergio Kef >Assignee: Sergio Kef >Priority: Major > > Official [Slack API for python|https://pypi.org/project/slackclient/] has > recently released > [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0] > Among others some important points: > * Async IO > * SSL and Proxy > * Dropping 2.7 support > Opening this ticket to work on the upgrade. Current functionalities will be > migrated and will try to extend functionalities, if possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2
kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892844 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: Pushed a commit to PR: https://github.com/apache/airflow/pull/5519/commits/d81d6e947ba364b0dd9e730507e51ff2a0d3d3e2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2
kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892088 ## File path: airflow/providers/slack/hooks/slack.py ## @@ -27,40 +28,88 @@ # noinspection PyAbstractClass class SlackHook(BaseHook): """ +Creates a Slack connection, to be used for calls. Takes both Slack API token directly and connection that has Slack API token. - If both supplied, Slack API token will be used. +Exposes also the rest of slack.WebClient args. + +Examples: + +.. code-block:: python + +# Create hook +slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack") + +# Call generic API with parameters (errors are handled by hook) +# For more details check https://api.slack.com/methods/chat.postMessage +slack_hook.call("chat.postMessage", json={"channel": "#random", "text": "Hello world!"}) + +# Call method from Slack SDK (you have to handle errors yourself) +# For more details check https://slack.dev/python-slackclient/basic_usage.html#sending-a-message +slack_hook.client.chat_postMessage(channel="#random", text="Hello world!") :param token: Slack API token +:type token: str :param slack_conn_id: connection that has Slack API token in the password field +:type slack_conn_id: str +:param use_session: A boolean specifying if the client should take advantage of +connection pooling. Default is True. +:type use_session: bool +:param base_url: A string representing the Slack API base URL. Default is +``https://www.slack.com/api/`` +:type base_url: str +:param timeout: The maximum number of seconds the client will wait +to connect and receive a response from Slack. Default is 30 seconds. +:type timeout: int """ -def __init__(self, token: Optional[str] = None, slack_conn_id: Optional[str] = None) -> None: + +def __init__( +self, +token: Optional[str] = None, +slack_conn_id: Optional[str] = None, +**client_args: Any, +) -> None: super().__init__() -self.token = self.__get_token(token, slack_conn_id) +token = self.__get_token(token, slack_conn_id) Review comment: ```suggestion self.token = self.__get_token(token, slack_conn_id) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2
[ https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102157#comment-17102157 ] ASF GitHub Bot commented on AIRFLOW-4543: - kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892088 ## File path: airflow/providers/slack/hooks/slack.py ## @@ -27,40 +28,88 @@ # noinspection PyAbstractClass class SlackHook(BaseHook): """ +Creates a Slack connection, to be used for calls. Takes both Slack API token directly and connection that has Slack API token. - If both supplied, Slack API token will be used. +Exposes also the rest of slack.WebClient args. + +Examples: + +.. code-block:: python + +# Create hook +slack_hook = SlackHook(token="xxx") # or slack_hook = SlackHook(slack_conn_id="slack") + +# Call generic API with parameters (errors are handled by hook) +# For more details check https://api.slack.com/methods/chat.postMessage +slack_hook.call("chat.postMessage", json={"channel": "#random", "text": "Hello world!"}) + +# Call method from Slack SDK (you have to handle errors yourself) +# For more details check https://slack.dev/python-slackclient/basic_usage.html#sending-a-message +slack_hook.client.chat_postMessage(channel="#random", text="Hello world!") :param token: Slack API token +:type token: str :param slack_conn_id: connection that has Slack API token in the password field +:type slack_conn_id: str +:param use_session: A boolean specifying if the client should take advantage of +connection pooling. Default is True. +:type use_session: bool +:param base_url: A string representing the Slack API base URL. Default is +``https://www.slack.com/api/`` +:type base_url: str +:param timeout: The maximum number of seconds the client will wait +to connect and receive a response from Slack. Default is 30 seconds. +:type timeout: int """ -def __init__(self, token: Optional[str] = None, slack_conn_id: Optional[str] = None) -> None: + +def __init__( +self, +token: Optional[str] = None, +slack_conn_id: Optional[str] = None, +**client_args: Any, +) -> None: super().__init__() -self.token = self.__get_token(token, slack_conn_id) +token = self.__get_token(token, slack_conn_id) Review comment: ```suggestion self.token = self.__get_token(token, slack_conn_id) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update slack operator to support slackclient v2 > --- > > Key: AIRFLOW-4543 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4543 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks, operators >Reporter: Sergio Kef >Assignee: Sergio Kef >Priority: Major > > Official [Slack API for python|https://pypi.org/project/slackclient/] has > recently released > [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0] > Among others some important points: > * Async IO > * SSL and Proxy > * Dropping 2.7 support > Opening this ticket to work on the upgrade. Current functionalities will be > migrated and will try to extend functionalities, if possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2
[ https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102156#comment-17102156 ] ASF GitHub Bot commented on AIRFLOW-4543: - kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892012 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: There is no need of dummy subclasses, you can initialize the Slack directly, it is not calling slack API in the __init__ is it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update slack operator to support slackclient v2 > --- > > Key: AIRFLOW-4543 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4543 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks, operators >Reporter: Sergio Kef >Assignee: Sergio Kef >Priority: Major > > Official [Slack API for python|https://pypi.org/project/slackclient/] has > recently released > [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0] > Among others some important points: > * Async IO > * SSL and Proxy > * Dropping 2.7 support > Opening this ticket to work on the upgrade. Current functionalities will be > migrated and will try to extend functionalities, if possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2
kaxil commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421892012 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: There is no need of dummy subclasses, you can initialize the Slack directly, it is not calling slack API in the __init__ is it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state
jaketf commented on issue #8673: URL: https://github.com/apache/airflow/issues/8673#issuecomment-625581362 I will reach out to my team and see if there are folks interested in joining airflow community by patching this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state
mik-laj commented on issue #8673: URL: https://github.com/apache/airflow/issues/8673#issuecomment-625579505 We also currently have priorities set for other tasks. If it is very, we can change our plans. If it is very important, we can change our plans, but we would prefer to avoid it. Every new contributor is very welcome in this project and I will be able to find time to review the patch, verify it using system tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Work started] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
[ https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-3369 started by Kaxil Naik. --- > Un-pausing a DAG with catchup =False creates an extra DAG run (1.10) > > > Key: AIRFLOW-3369 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3369 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.0 >Reporter: Andrew Harmon >Assignee: Kaxil Naik >Priority: Major > Attachments: image.png > > > If you create a DAG with catchup=False, when it is un-paused, it creates 2 > dag runs. One for the most recent scheduled interval (expected) and one for > the interval before that (unexpected). > *Sample DAG* > {code:java} > from airflow import DAG > from datetime import datetime > from airflow.operators.dummy_operator import DummyOperator > dag = DAG( > dag_id='DummyTest', > start_date=datetime(2018,1,1), > catchup=False > ) > do = DummyOperator( > task_id='dummy_task', > dag=dag > ) > {code} > *Result:* > 2 DAG runs are created. 2018-11-18 and 108-11-17 > *Expected Result:* > Only 1 DAG run should have been created (2018-11-18) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
[ https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik reassigned AIRFLOW-3369: --- Assignee: Kaxil Naik > Un-pausing a DAG with catchup =False creates an extra DAG run (1.10) > > > Key: AIRFLOW-3369 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3369 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.0 >Reporter: Andrew Harmon >Assignee: Kaxil Naik >Priority: Major > Attachments: image.png > > > If you create a DAG with catchup=False, when it is un-paused, it creates 2 > dag runs. One for the most recent scheduled interval (expected) and one for > the interval before that (unexpected). > *Sample DAG* > {code:java} > from airflow import DAG > from datetime import datetime > from airflow.operators.dummy_operator import DummyOperator > dag = DAG( > dag_id='DummyTest', > start_date=datetime(2018,1,1), > catchup=False > ) > do = DummyOperator( > task_id='dummy_task', > dag=dag > ) > {code} > *Result:* > 2 DAG runs are created. 2018-11-18 and 108-11-17 > *Expected Result:* > Only 1 DAG run should have been created (2018-11-18) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
[ https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102146#comment-17102146 ] ASF GitHub Bot commented on AIRFLOW-3369: - kaxil opened a new pull request #8776: URL: https://github.com/apache/airflow/pull/8776 https://issues.apache.org/jira/browse/AIRFLOW-3369 --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Un-pausing a DAG with catchup =False creates an extra DAG run (1.10) > > > Key: AIRFLOW-3369 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3369 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.10.0 >Reporter: Andrew Harmon >Priority: Major > Attachments: image.png > > > If you create a DAG with catchup=False, when it is un-paused, it creates 2 > dag runs. One for the most recent scheduled interval (expected) and one for > the interval before that (unexpected). > *Sample DAG* > {code:java} > from airflow import DAG > from datetime import datetime > from airflow.operators.dummy_operator import DummyOperator > dag = DAG( > dag_id='DummyTest', > start_date=datetime(2018,1,1), > catchup=False > ) > do = DummyOperator( > task_id='dummy_task', > dag=dag > ) > {code} > *Result:* > 2 DAG runs are created. 2018-11-18 and 108-11-17 > *Expected Result:* > Only 1 DAG run should have been created (2018-11-18) > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil opened a new pull request #8776: WIP: [AIRFLOW-3369] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run
kaxil opened a new pull request #8776: URL: https://github.com/apache/airflow/pull/8776 https://issues.apache.org/jira/browse/AIRFLOW-3369 --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jaketf commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state
jaketf commented on issue #8673: URL: https://github.com/apache/airflow/issues/8673#issuecomment-625578250 CDAP / Data Fusion is not a priority for me right now. I will not be able to find cycles for this in the next few weeks. Perhaps you could find a user who cares about this integration, suggest it for a first time contributor, or reach out to data fusion engineering if your team does not have cycles for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102098#comment-17102098 ] ASF GitHub Bot commented on AIRFLOW-7104: - sethvargo commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421850039 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hey @kaxil - [here's the protocol](https://github.com/spring-cloud/spring-cloud-gcp/pull/2302) we'd like most integrators to try and align on. Latest is a floating alias that resolves to "the highest numbered secret version" for the secret. Often times, the team creating the secret
[GitHub] [airflow] sethvargo commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
sethvargo commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421850039 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hey @kaxil - [here's the protocol](https://github.com/spring-cloud/spring-cloud-gcp/pull/2302) we'd like most integrators to try and align on. Latest is a floating alias that resolves to "the highest numbered secret version" for the secret. Often times, the team creating the secret is separate from the team consuming the secret. In these cases, the latest version may not be active or usable yet. Generally, you want a workload to be coupled to a specific secret so you can roll forward and roll backward in the event something bre
[GitHub] [airflow] ashb commented on a change in pull request #8739: Test that DagFileProcessor can operator against on a Serialized DAG
ashb commented on a change in pull request #8739: URL: https://github.com/apache/airflow/pull/8739#discussion_r421844181 ## File path: tests/jobs/test_scheduler_job.py ## @@ -2253,7 +2307,17 @@ def evaluate_dagrun( self.null_exec.mock_task_fail(dag_id, tid, ex_date) try: -dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs) +# This needs a _REAL_ dag, not the serialized version +if not isinstance(dag, SerializedDAG): +real_dag = dag +else: +# It may not be loaded. This "could" live in DagBag, but it's +# only really needed here in tests, not in normal code. +if dag_id not in self.non_serialized_dagbag.dag_ids: +self.non_serialized_dagbag.process_file(dag.fileloc) + +real_dag = self.non_serialized_dagbag.get_dag(dag_id) Review comment: Probably fixed by https://github.com/apache/airflow/pull/8775 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8775: Correctly restore upstream_task_ids when deserializing Operators
ashb commented on a change in pull request #8775: URL: https://github.com/apache/airflow/pull/8775#discussion_r421843524 ## File path: airflow/providers/google/cloud/example_dags/example_gcs.py ## @@ -126,8 +126,8 @@ ) # [START howto_operator_gcs_delete_bucket] -delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1) -delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_2) +delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket_1", bucket_name=BUCKET_1) +delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket_2", bucket_name=BUCKET_2) Review comment: @potiuk @mik-laj This change should be okay, right? (I can't think of any reason why it wouldn't be) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new pull request #8775: Correctly restore upstream_task_ids when deserializing Operators
ashb opened a new pull request #8775: URL: https://github.com/apache/airflow/pull/8775 This test exposed a bug in one of the example dags, that wasn't caught by #6549. That will be a fixed in a separate issue, but it caused the round-trip tests to fail here Should fix #8720, (but I haven't tested this. I was actually fixing another bug caused by the same place) (Depends on #8774 for the test rework only. Ignore the first commit in this PR when reviewing for now.) --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on issue #8703: Support for set in XCom serialization
ashb commented on issue #8703: URL: https://github.com/apache/airflow/issues/8703#issuecomment-625537935 XCom used to be pickle by default (and still is on 1.10?), that'll be why it has gone un-noticed for so long. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8754: Add SQL query tracking for pytest
ashb commented on a change in pull request #8754: URL: https://github.com/apache/airflow/pull/8754#discussion_r421835369 ## File path: scripts/perf/perf_kit/sqlalchemy.py ## @@ -15,31 +15,59 @@ # specific language governing permissions and limitations # under the License. -import contextlib import os import time import traceback +from typing import Callable from sqlalchemy import event -@contextlib.contextmanager -def trace_queries(display_time=True, display_trace=True, display_sql=False, display_parameters=True): +def _pretty_format_sql(text: str): +import pygments + +from pygments.formatters.terminal import TerminalFormatter +from pygments.lexers.sql import SqlLexer +text = pygments.highlight( +code=text, formatter=TerminalFormatter(), lexer=SqlLexer() +).rstrip() +return text + + +class TraceQueries: """ -Tracking SQL queries in a code block. The result is displayed directly on the screen - ``print`` +Tracking SQL queries in a code block. +:param display_no: If True, displays the query number. :param display_time: If True, displays the query execution time. :param display_trace: If True, displays the simplified (one-line) stack trace :param display_sql: If True, displays the SQL statements :param display_parameters: If True, display SQL statement parameters -:return: +:param print_fn: The function used to display the text. By default,``builtins.print`` """ -import airflow.settings - -def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): +def __init__( +self, +*, Review comment: Yeah, very useful. Py3 only of course. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek edited a comment on pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next
turbaszek edited a comment on pull request #8652: URL: https://github.com/apache/airflow/pull/8652#issuecomment-625532608 @jonathanshir it seems that some of the tests are flaky or something: ``` File "/opt/airflow/tests/models/test_xcom_arg.py", line 130 in test_xcom_pass_to_op ``` In general, we are not running DAGs on CI. So I would suggest either mark the `TestXComArgRuntime` with system marker or remove/mock this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next
turbaszek commented on pull request #8652: URL: https://github.com/apache/airflow/pull/8652#issuecomment-625532608 It seems that some of the test are flaky or something: ``` File "/opt/airflow/tests/models/test_xcom_arg.py", line 130 in test_xcom_pass_to_op ``` In general, we are not running DAGs on CI. So I would suggest either mark the `TestXComArgRuntime` with system marker or remove/mock this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on issue #8774: Move singularity out of main CI tests and into separate docker image/system test
ashb commented on issue #8774: URL: https://github.com/apache/airflow/issues/8774#issuecomment-625532382 Wait, I've just looked at the singularity operator tests - it already seems to mock it, so does anyone know why we include this in our CI image? @potiuk ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new issue #8774: Move singularity out of main CI tests and into separate docker image/system test
ashb opened a new issue #8774: URL: https://github.com/apache/airflow/issues/8774 A CI build failed because we couldn't download a release from https://github.com/sylabs/singularity -- which while it was a random network blib, there is no _need_ for the tests for the singularity operators to live in our CI image. We should instead create "system tests" for singularity, and ensure that the unit tests use mocking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8728: Show Deprecation warning on duplicate Task ids
ashb commented on pull request #8728: URL: https://github.com/apache/airflow/pull/8728#issuecomment-625528936 Only cos it's related to this PR, not that it's a bug with this PR And I think I've just found a problem in our example dags caused by us using `!=` in master: https://github.com/apache/airflow/blob/6e4f5fa66ebe2d8252829c67e79f895fa5029b5a/airflow/providers/google/cloud/example_dags/example_gcs.py#L129-L130 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on pull request #8718: Set store_serialized_dags from config in UI
ashb commented on pull request #8718: URL: https://github.com/apache/airflow/pull/8718#issuecomment-625526747 No problem @anitakar, it's good to check these things, and it's often easier to talk in code! (And sorry for being a bit curt!) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] houqp opened a new pull request #8773: fix typing errors reported by dmypy
houqp opened a new pull request #8773: URL: https://github.com/apache/airflow/pull/8773 fixes 2 new errors reported by dmypy. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8772: Correctly store non-default Nones in serialized tasks/dags
ashb commented on a change in pull request #8772: URL: https://github.com/apache/airflow/pull/8772#discussion_r421826962 ## File path: airflow/serialization/serialized_objects.py ## @@ -395,6 +411,59 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: return op +@classmethod +def _is_constcutor_param(cls, attrname: str, instance: Any) -> bool: +# Check all super classes too +return any( +attrname in cls.__constructor_params_for_subclass(typ) +for typ in type(instance).mro() +) + +@classmethod +def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: Any) -> bool: +""" +Check if ``value`` is the default value for ``attrname`` as set by the +constructor of ``instance``, or any of it's parent classes up +to-and-including BaseOperator. + +.. seealso:: + +:py:meth:`BaseSerialization._value_is_hardcoded_default` +""" + +def _is_default(): +nonlocal ctor_params, attrname, value Review comment: I went through a number of iterations of way of doing this, it may be clearer just to pass them in anyway on Py3. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8772: Correctly store non-default Nones in serialized tasks/dags
ashb commented on a change in pull request #8772: URL: https://github.com/apache/airflow/pull/8772#discussion_r421826729 ## File path: airflow/serialization/serialized_objects.py ## @@ -395,6 +411,59 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) -> BaseOperator: return op +@classmethod +def _is_constcutor_param(cls, attrname: str, instance: Any) -> bool: +# Check all super classes too +return any( +attrname in cls.__constructor_params_for_subclass(typ) +for typ in type(instance).mro() +) + +@classmethod +def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: Any) -> bool: +""" +Check if ``value`` is the default value for ``attrname`` as set by the +constructor of ``instance``, or any of it's parent classes up +to-and-including BaseOperator. + +.. seealso:: + +:py:meth:`BaseSerialization._value_is_hardcoded_default` +""" + +def _is_default(): +nonlocal ctor_params, attrname, value Review comment: This won't work on python2, instead we'd have to do ``` def _is_default(ctor_params, attrname, value): ``` and pass em in at the call site This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb opened a new pull request #8772: Correctly store non-default Nones in serialized tasks/dags
ashb opened a new pull request #8772: URL: https://github.com/apache/airflow/pull/8772 The default schedule_interval for a DAG is `@daily`, so `schedule_interval=None` is actually not the default, but we were not storing _any_ null attributes previously. This meant that upon re-inflating the DAG the schedule_interval would become @daily. This fixes that problem, and extends the test to look at _all_ the serialized attributes in our round-trip tests, rather than just the few that the webserver cared about. It doesn't change the serialization format, it just changes what/when values were stored. This solution was more complex than I hoped for, but the test case in test_operator_subclass_changing_base_defaults is a real one that the round trip tests discovered from the DatabricksSubmitRunOperator -- I have just captured it in this test in case that specific operator changes in future. --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102064#comment-17102064 ] ASF GitHub Bot commented on AIRFLOW-7104: - kaxil commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421825954 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: @sethvargo Thanks for letting us know. Why is it not recommended, at least from Airflow's perspective, our users would want to pull the secrets with the most recent version (hence 'latest'). Can you give us some context to let us know how a user who does not know how many versions of a
[GitHub] [airflow] kaxil commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
kaxil commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421825954 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: @sethvargo Thanks for letting us know. Why is it not recommended, at least from Airflow's perspective, our users would want to pull the secrets with the most recent version (hence 'latest'). Can you give us some context to let us know how a user who does not know how many versions of a secret exists pull it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For q
[GitHub] [airflow] Acehaidrey commented on pull request #8702: Add context to execution_date_fn in ExternalTaskSensor
Acehaidrey commented on pull request #8702: URL: https://github.com/apache/airflow/pull/8702#issuecomment-625516604 @jhtimmins @dimberman can you please comment when you have a minute? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduce. special parameter syntax that lets you reference a specific version. But I will explain the small context of how this change works so that you can make better design decisions. In previous versions of Airflow, all data was stored in a database (th
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102054#comment-17102054 ] ASF GitHub Bot commented on AIRFLOW-7104: - mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduc
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102055#comment-17102055 ] ASF GitHub Bot commented on AIRFLOW-7104: - mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduc
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102053#comment-17102053 ] ASF GitHub Bot commented on AIRFLOW-7104: - mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduc
[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduce. special parameter syntax that lets you reference a specific version. But I will explain the small context of how this change works so that you can make better design decisions. In previous versions of Airflow, all data was stored in a database (th
[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
mik-laj commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hello. I'm Kamil from Polidea. We are a Google partner and we are building many integrations for Google services at the request of [Cloud Composer](https://cloud.google.com/composer). We have an abstraction layer that does not provide for this possibility, but I think we can introduce. special parameter syntax that lets you reference a specific version. But I will explain the small context of how this change works so that you can make better design decisions. In previous versions of Airflow, all data was stored in a database (th
[GitHub] [airflow] Acehaidrey commented on pull request #8680: Add metric for start/end task run
Acehaidrey commented on pull request #8680: URL: https://github.com/apache/airflow/pull/8680#issuecomment-625514762 @jhtimmins when you get a chance This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] houqp opened a new pull request #8771: add metric for monitoring email notification failure
houqp opened a new pull request #8771: URL: https://github.com/apache/airflow/pull/8771 This should help catch unexpected email notification failures. We need to monitor the monitoring system :) --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager
[ https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102034#comment-17102034 ] ASF GitHub Bot commented on AIRFLOW-7104: - sethvargo commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421795771 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hey there. I'm Seth from the Secret Manager team. Thanks for building this integration. Feel free to reach out if you have future questions. We strongly discourage users from pinning to "latest" in production workloads. Users should specifically pin to a version (e.g. 1, 2, 3). Is t
[GitHub] [airflow] sethvargo commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager
sethvargo commented on a change in pull request #7795: URL: https://github.com/apache/airflow/pull/7795#discussion_r421795771 ## File path: airflow/providers/google/cloud/secrets/secrets_manager.py ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Objects relating to sourcing connections from GCP Secrets Manager +""" +from typing import List, Optional + +from cached_property import cached_property +from google.api_core.exceptions import NotFound +from google.api_core.gapic_v1.client_info import ClientInfo +from google.cloud.secretmanager_v1 import SecretManagerServiceClient + +from airflow import version +from airflow.models import Connection +from airflow.providers.google.cloud.utils.credentials_provider import ( +_get_scopes, get_credentials_and_project_id, +) +from airflow.secrets import BaseSecretsBackend +from airflow.utils.log.logging_mixin import LoggingMixin + + +class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin): +""" +Retrieves Connection object from GCP Secrets Manager + +Configurable via ``airflow.cfg`` as follows: + +.. code-block:: ini + +[secrets] +backend = airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend +backend_kwargs = {"connections_prefix": "airflow/connections"} + +For example, if secret id is ``airflow/connections/smtp_default``, this would be accessible +if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. + +:param connections_prefix: Specifies the prefix of the secret to read to get Connections. +:type connections_prefix: str +:param gcp_key_path: Path to GCP Credential JSON file; +use default credentials in the current environment if not provided. +:type gcp_key_path: str +:param gcp_scopes: Comma-separated string containing GCP scopes +:type gcp_scopes: str +""" +def __init__( +self, +connections_prefix: str = "airflow/connections", +gcp_key_path: Optional[str] = None, +gcp_scopes: Optional[str] = None, +**kwargs +): +self.connections_prefix = connections_prefix.rstrip("/") +self.gcp_key_path = gcp_key_path +self.gcp_scopes = gcp_scopes +self.credentials: Optional[str] = None +self.project_id: Optional[str] = None +super().__init__(**kwargs) + +@cached_property +def client(self) -> SecretManagerServiceClient: +""" +Create an authenticated KMS client +""" +scopes = _get_scopes(self.gcp_scopes) +self.credentials, self.project_id = get_credentials_and_project_id( +key_path=self.gcp_key_path, +scopes=scopes +) +_client = SecretManagerServiceClient( +credentials=self.credentials, +client_info=ClientInfo(client_library_version='airflow_v' + version.version) +) +return _client + +def build_secret_id(self, conn_id: str) -> str: +""" +Given conn_id, build path for Secrets Manager + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = f"{self.connections_prefix}/{conn_id}" +return secret_id + +def get_conn_uri(self, conn_id: str) -> Optional[str]: +""" +Get secret value from Secrets Manager. + +:param conn_id: connection id +:type conn_id: str +""" +secret_id = self.build_secret_id(conn_id=conn_id) +# always return the latest version of the secret +secret_version = "latest" Review comment: Hey there. I'm Seth from the Secret Manager team. Thanks for building this integration. Feel free to reach out if you have future questions. We strongly discourage users from pinning to "latest" in production workloads. Users should specifically pin to a version (e.g. 1, 2, 3). Is there a way we can update this to allow the user to specify a version id? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use t
[GitHub] [airflow] casassg commented on a change in pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next
casassg commented on a change in pull request #8652: URL: https://github.com/apache/airflow/pull/8652#discussion_r421782519 ## File path: airflow/models/xcom_arg.py ## @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import Any, Dict, List, Union + +from airflow.exceptions import AirflowException +from airflow.models.baseoperator import BaseOperator # pylint: disable=R0401 +from airflow.models.xcom import XCOM_RETURN_KEY + + +class XComArg: +""" +Class that represents a XCom push from a previous operator. +Defaults to "return_value" as only key. + +Current implementations supports +xcomarg >> op +xcomarg << op +op >> xcomarg (by BaseOperator code) +op << xcomarg (by BaseOperator code) + +**Example**: The moment you get a result from any operator (functional or regular) you can :: + +any_op = AnyOperator() +xcomarg = XComArg(any_op) OR xcomarg = any_op.output +my_op = MyOperator() +my_op >> xcomarg + +This object can be used in legacy Operators via Jinja. + +**Example**: You can make this result to be part of any generated string :: + +any_op = AnyOperator() +xcomarg = any_op.output +op1 = MyOperator(my_text_message=f"the value is {xcomarg}") +op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}") + +:param operator: operator to which the XComArg belongs to +:type operator: airflow.models.baseoperator.BaseOperator +:param key: key value which is used for xcom_pull (key in the XCom table) +:type key: str +""" + +def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY): +self._operator = operator +self._key = key + +def __eq__(self, other): +return (self.operator == other.operator +and self.key == other.key) + +def __lshift__(self, other): +""" +Implements xcomresult << op +""" +self.set_upstream(other) +return self + +def __rshift__(self, other): +""" +Implements xcomresult >> op +""" +self.set_downstream(other) +return self + +def __getitem__(self, item): +""" +Implements xcomresult['some_result_key'] +""" +return XComArg(operator=self.operator, key=item) + +def __str__(self): +""" +Backward compatibility for old-style jinja used in Airflow Operators + +**Example**: to use XArg at BashOperator:: + +BashOperator(cmd=f"... { xcomarg } ...") + +:return: +""" +xcom_pull_kwargs = [f"task_ids='{self.operator.task_id}'", +f"dag_id='{self.operator.dag.dag_id}'", +] +if self.key is not None: +xcom_pull_kwargs.append(f"key='{self.key}'") Review comment: sgtm. Maybe we should still print that key value is None. But not blocking for me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on issue #8760: Webserver becomes unusable after 100,000 tasks were completed
kaxil commented on issue #8760: URL: https://github.com/apache/airflow/issues/8760#issuecomment-625474431 Is there a particular endpoint that is slow in the Webserver ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on pull request #8754: Add SQL query tracking for pytest
mik-laj commented on pull request #8754: URL: https://github.com/apache/airflow/pull/8754#issuecomment-625468574 @kaxil @ashb and our pytest expert - @turbaszek Can I ask for look? All non-quarantined check are green. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8754: Add SQL query tracking for pytest
mik-laj commented on a change in pull request #8754: URL: https://github.com/apache/airflow/pull/8754#discussion_r421760872 ## File path: TESTING.rst ## @@ -902,6 +902,36 @@ You should also consider running it with ``restart`` command when you change the This will clean-up the database so that you start with a clean DB and not DB installed in a previous version. So typically you'd run it like ``breeze --install-airflow-version=1.10.9 restart``. +Tracking SQL statements +=== + +You can run tests with SQL statements tracking. To do this, use the ``--trace-sql`` option and pass the +columns to be displayed as an argument. Each query will be displayed on a separate line. +Supported values: + +* ``no`` - displays the query number; +* ``time`` - displays the query execution time; +* ``trace`` - displays the simplified (one-line) stack trace; +* ``sql`` - displays the SQL statements; +* ``parameters`` - display SQL statement parameters. + +If you only provide ``no``, then only the final number of queries in the test will be displayed. + +By default, pytest does not display output for successful tests, if you still want to see them, you must +pass the ``--capture=no`` option. + +If you run the following command: + +.. code-block:: bash + +pytest --debug-sql=no,sql,parameters --capture=no \ Review comment: Changed. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] mik-laj commented on a change in pull request #8754: Add SQL query tracking for pytest
mik-laj commented on a change in pull request #8754: URL: https://github.com/apache/airflow/pull/8754#discussion_r421760814 ## File path: scripts/perf/perf_kit/sqlalchemy.py ## @@ -52,28 +80,40 @@ def after_cursor_execute(conn, cursor, statement, parameters, context, executema conn.info.setdefault("query_start_time", []).append(time.monotonic()) output_parts = [] -if display_time: +if self.display_no: +output_parts.append(f"{self.query_count:>3}") + +if self.display_time: output_parts.append(f"{total:.5f}") -if display_trace: +if self.display_time: +output_parts.append(f"{total:.5f}") + +if self.display_trace: output_parts.extend([f"{file_name}", f"{stack_info}"]) -if display_sql: +if self.display_sql: sql_oneline = statement.replace("\n", " ") -output_parts.append(f"{sql_oneline}") +output_parts.append(f"{_pretty_format_sql(sql_oneline)}") -if display_parameters: +if self.display_parameters : Review comment: Fixed. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-249) Refactor the SLA mechanism
[ https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101971#comment-17101971 ] ASF GitHub Bot commented on AIRFLOW-249: houqp commented on pull request #8545: URL: https://github.com/apache/airflow/pull/8545#issuecomment-625434121 > My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs. Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored? With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor the SLA mechanism > -- > > Key: AIRFLOW-249 > URL: https://issues.apache.org/jira/browse/AIRFLOW-249 > Project: Apache Airflow > Issue Type: Improvement >Reporter: dud >Priority: Major > > Hello > I've noticed the SLA feature is currently behaving as follow : > - it doesn't work on DAG scheduled @once or None because they have no > dag.followwing_schedule property > - it keeps endlessly checking for SLA misses without ever worrying about any > end_date. Worse I noticed that emails are still being sent for runs that are > never happening because of end_date > - it keeps checking for recent TIs even if SLA notification has been already > been sent for them > - the SLA logic is only being fired after following_schedule + sla has > elapsed, in other words one has to wait for the next TI before having a > chance of getting any email. Also the email reports dag.following_schedule > time (I guess because it is close of TI.start_date), but unfortunately that > doesn't match what the task instances shows nor the log filename > - the SLA logic is based on max(TI.execution_date) for the starting point of > its checks, that means that for a DAG whose SLA is longer than its schedule > period if half of the TIs are running longer than expected it will go > unnoticed. This could be demonstrated with a DAG like this one : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 6, 16, 12, 20), > 'email': my_email > 'sla': timedelta(minutes=2), > } > dag = DAG('unnoticed_sla', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='sla_miss', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > I've tried to rework the SLA triggering mechanism by addressing the above > points., please [have a look on > it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] > I made some tests with this patch : > - the fluctuent DAG shown above no longer make Airflow skip any SLA event : > {code} > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 > 15:08:26.058631 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 > 15:10:06.093253 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 > 15:12:06.241773 | | t > {code} > - on a normal DAG, the SLA is being triggred more quickly : > {code} > // start_date = 2016-06-16 15:55:00 > // end_date = 2016-06-16 16:00:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --
[GitHub] [airflow] houqp commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )
houqp commented on pull request #8545: URL: https://github.com/apache/airflow/pull/8545#issuecomment-625434121 > My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs. Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored? With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] anitakar commented on pull request #8718: Set store_serialized_dags from config in UI
anitakar commented on pull request #8718: URL: https://github.com/apache/airflow/pull/8718#issuecomment-625428708 Closing the issue as only difference seems to be calling set_paused and its implementation has changed. Sorry for taking your time without checking everything first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-249) Refactor the SLA mechanism
[ https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101952#comment-17101952 ] ASF GitHub Bot commented on AIRFLOW-249: houqp commented on a change in pull request #8545: URL: https://github.com/apache/airflow/pull/8545#discussion_r421708159 ## File path: airflow/models/baseoperator.py ## @@ -392,10 +442,79 @@ def __init__( % (self.task_id, dag.dag_id)) self.sla = sla self.execution_timeout = execution_timeout + +# Warn about use of the deprecated SLA parameter +if sla and expected_finish: +warnings.warn( +"Both sla and expected_finish provided as task " +"parameters to {}; using expected_finish and ignoring " +"deprecated sla parameter.".format(self), +category=PendingDeprecationWarning +) +elif sla: +warnings.warn( +"sla is deprecated as a task parameter for {}; converting to " +"expected_finish instead.".format(self), +category=PendingDeprecationWarning +) +expected_finish = sla + +# Set SLA parameters, batching invalid type messages into a +# single exception. +sla_param_errs: List = [] +if expected_duration and not isinstance(expected_duration, timedelta): Review comment: it should be enforced in the CI pipeline at build time. If you tested mypy and it's not complaining about it, then it's because mypy is still an evolving project, so it can't catch all the type errors yet, but it's safe to assume that it will eventually catch up. i wouldn't worry too much about these edge-cases to be honest. we are not doing runtime type check anywhere else where type hint is defined in the code base, so it's a little bit odd to leave this one as a special case. On top of that, this runtime type check will result in an exception. The end result is the same as without this check because they will all lead to unrecoverable crashes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Refactor the SLA mechanism > -- > > Key: AIRFLOW-249 > URL: https://issues.apache.org/jira/browse/AIRFLOW-249 > Project: Apache Airflow > Issue Type: Improvement >Reporter: dud >Priority: Major > > Hello > I've noticed the SLA feature is currently behaving as follow : > - it doesn't work on DAG scheduled @once or None because they have no > dag.followwing_schedule property > - it keeps endlessly checking for SLA misses without ever worrying about any > end_date. Worse I noticed that emails are still being sent for runs that are > never happening because of end_date > - it keeps checking for recent TIs even if SLA notification has been already > been sent for them > - the SLA logic is only being fired after following_schedule + sla has > elapsed, in other words one has to wait for the next TI before having a > chance of getting any email. Also the email reports dag.following_schedule > time (I guess because it is close of TI.start_date), but unfortunately that > doesn't match what the task instances shows nor the log filename > - the SLA logic is based on max(TI.execution_date) for the starting point of > its checks, that means that for a DAG whose SLA is longer than its schedule > period if half of the TIs are running longer than expected it will go > unnoticed. This could be demonstrated with a DAG like this one : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 6, 16, 12, 20), > 'email': my_email > 'sla': timedelta(minutes=2), > } > dag = DAG('unnoticed_sla', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='sla_miss', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > I've tried to rework the SLA triggering mechanism by addressing the above > points., please [have a look on > it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] > I made some tests with this
[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )
houqp commented on a change in pull request #8545: URL: https://github.com/apache/airflow/pull/8545#discussion_r421708159 ## File path: airflow/models/baseoperator.py ## @@ -392,10 +442,79 @@ def __init__( % (self.task_id, dag.dag_id)) self.sla = sla self.execution_timeout = execution_timeout + +# Warn about use of the deprecated SLA parameter +if sla and expected_finish: +warnings.warn( +"Both sla and expected_finish provided as task " +"parameters to {}; using expected_finish and ignoring " +"deprecated sla parameter.".format(self), +category=PendingDeprecationWarning +) +elif sla: +warnings.warn( +"sla is deprecated as a task parameter for {}; converting to " +"expected_finish instead.".format(self), +category=PendingDeprecationWarning +) +expected_finish = sla + +# Set SLA parameters, batching invalid type messages into a +# single exception. +sla_param_errs: List = [] +if expected_duration and not isinstance(expected_duration, timedelta): Review comment: it should be enforced in the CI pipeline at build time. If you tested mypy and it's not complaining about it, then it's because mypy is still an evolving project, so it can't catch all the type errors yet, but it's safe to assume that it will eventually catch up. i wouldn't worry too much about these edge-cases to be honest. we are not doing runtime type check anywhere else where type hint is defined in the code base, so it's a little bit odd to leave this one as a special case. On top of that, this runtime type check will result in an exception. The end result is the same as without this check because they will all lead to unrecoverable crashes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] boring-cyborg[bot] commented on issue #8770: Airflow 1.10.7 logs from S3 won't load, just hanging
boring-cyborg[bot] commented on issue #8770: URL: https://github.com/apache/airflow/issues/8770#issuecomment-625422513 Thanks for opening your first issue here! Be sure to follow the issue template! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] aarrtteemmuuss opened a new issue #8770: Airflow 1.10.7 logs from S3 won't load, just hanging
aarrtteemmuuss opened a new issue #8770: URL: https://github.com/apache/airflow/issues/8770 **Apache Airflow version**: 1.10.7 **Environment**: python 3.7, running locally **What happened**: I have following configuration in airflow.cfg: `remote_logging = True` `remote_log_conn_id = "S3Connection"` `remote_base_log_folder = s3://bucket/logs` `encrypt_s3_logs = False` I have setup connection in UI with type S3 and following settings: `{"aws_access_key_id":"xxx", "aws_secret_access_key": "xxx"}` Executor is LocalExecutor. Scheduler is able to write logs to S3, but when I open UI to look for task's logs, I can see its just hanging and nothing is happening. The spin spins forever and I can't even see that is the error. `http://127.0.0.1:8080/admin/airflow/get_logs_with_metadata?dag_id=example_bash_operator&task_id=run_after_loop&execution_date=2020-05-07T18%3A08%3A51.232255%2B00%3A00&try_number=1&metadata=null` - returns Empty Response Error and nothing works. Am I doing something wrong? Because S3 configuration is not good documented and I see bunch of reports that it is not working. **What you expected to happen**: I expected logs to be pulled from S3 and shown in UI admin. **How to reproduce it**: Install airflow 1.10.7 locally and run any example dag with remote settings enables for S3 bucket. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] Sinsin1367 commented on pull request #8734: Added optional logging for pod container statuses once a pod fails. T…
Sinsin1367 commented on pull request #8734: URL: https://github.com/apache/airflow/pull/8734#issuecomment-625412537 @ashb @chrismclennon I would appreciate if you guys take a look and share your thoughts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] anitakar opened a new pull request #8769: Set store_serialized_dags from config instead of defaulting it to false
anitakar opened a new pull request #8769: URL: https://github.com/apache/airflow/pull/8769 Fix running dag from non-RBAC (old, deprecated) UI This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] anitakar commented on a change in pull request #8764: Store dags cleanup 1 10
anitakar commented on a change in pull request #8764: URL: https://github.com/apache/airflow/pull/8764#discussion_r421691736 ## File path: airflow/www_rbac/views.py ## @@ -564,7 +568,8 @@ def dag_details(self, session=None): dag_id = request.args.get('dag_id') dag_orm = DagModel.get_dagmodel(dag_id, session=session) # FIXME: items needed for this view should move to the database -dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS) +dag = dag_orm.get_dag( +conf.getboolean('core', 'store_serialized_dags', fallback=False)) Review comment: Sure. Airflow does not support dynamic settings. And even if it did, it should not be one of them. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] anitakar commented on a change in pull request #8764: Store dags cleanup 1 10
anitakar commented on a change in pull request #8764: URL: https://github.com/apache/airflow/pull/8764#discussion_r421691284 ## File path: airflow/www/views.py ## @@ -94,7 +94,16 @@ UTF8_READER = codecs.getreader('utf-8') -dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) +try: +async_dagbag_loader = conf.getboolean('webserver', 'async_dagbag_loader') Review comment: Sorry for that. I thought it got lost or was removed. It is not necessary with dag serialization and definitely such big functionalities should be first discussed via. AIP This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration
[ https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101913#comment-17101913 ] ASF GitHub Bot commented on AIRFLOW-2310: - paulsyl commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625403415 > @paulsyl could you possibly share your local Glue implementation (custom operator, hook, etc) as a Gist or otherwise that we could run in the meantime? If so I'd be happy to test it out and bring some of your improvements into this PR as well. Sure, here you go. I don't claim these to be perfectly aligned to airflow principles, so please use them and please add to this PR once you've had chance to correct my errors, I would be more than happy to replace my own with contributions from the wider community. Thanks to the excellent work by @abdulbasitds that enabled me to get this far with Airflow. [Glue Hook](https://gist.github.com/paulsyl/34ee457518689d120d64173f44f89b45) [Glue Operator](https://gist.github.com/paulsyl/bbed05dc0cc6146c36cc6b26ba9bd886) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable AWS Glue Job Integration > --- > > Key: AIRFLOW-2310 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2310 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Olalekan Elesin >Assignee: Olalekan Elesin >Priority: Major > Labels: AWS > > Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs > and ETL pipelines can be orchestrated with Airflow -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] paulsyl commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
paulsyl commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625403415 > @paulsyl could you possibly share your local Glue implementation (custom operator, hook, etc) as a Gist or otherwise that we could run in the meantime? If so I'd be happy to test it out and bring some of your improvements into this PR as well. Sure, here you go. I don't claim these to be perfectly aligned to airflow principles, so please use them and please add to this PR once you've had chance to correct my errors, I would be more than happy to replace my own with contributions from the wider community. Thanks to the excellent work by @abdulbasitds that enabled me to get this far with Airflow. [Glue Hook](https://gist.github.com/paulsyl/34ee457518689d120d64173f44f89b45) [Glue Operator](https://gist.github.com/paulsyl/bbed05dc0cc6146c36cc6b26ba9bd886) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] XD-DENG commented on pull request #8742: Avoid color info in response of /dag_stats & /task_stats
XD-DENG commented on pull request #8742: URL: https://github.com/apache/airflow/pull/8742#issuecomment-625392001 Rebased to the latest master This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration
[ https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101899#comment-17101899 ] ASF GitHub Bot commented on AIRFLOW-2310: - pdeardorff-r7 commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625389437 @paulsyl could you possibly share your local Glue implementation (custom operator, hook, etc) as a Gist or otherwise that we could run in the meantime? If so I'd be happy to test it out and bring some of your improvements into this PR as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable AWS Glue Job Integration > --- > > Key: AIRFLOW-2310 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2310 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib >Reporter: Olalekan Elesin >Assignee: Olalekan Elesin >Priority: Major > Labels: AWS > > Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs > and ETL pipelines can be orchestrated with Airflow -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] pdeardorff-r7 commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
pdeardorff-r7 commented on pull request #6007: URL: https://github.com/apache/airflow/pull/6007#issuecomment-625389437 @paulsyl could you possibly share your local Glue implementation (custom operator, hook, etc) as a Gist or otherwise that we could run in the meantime? If so I'd be happy to test it out and bring some of your improvements into this PR as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[airflow] branch v1-10-test updated: Show Deprecation warning on duplicate Task ids (#8728)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git The following commit(s) were added to refs/heads/v1-10-test by this push: new 1ef5f13 Show Deprecation warning on duplicate Task ids (#8728) 1ef5f13 is described below commit 1ef5f13377802eec91179d62630426b10b60f1ba Author: Kaxil Naik AuthorDate: Thu May 7 18:18:52 2020 +0100 Show Deprecation warning on duplicate Task ids (#8728) --- airflow/models/baseoperator.py | 3 ++- airflow/models/dag.py | 2 +- tests/models/test_dag.py | 40 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 828fdb1..0f76770 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -546,7 +546,8 @@ class BaseOperator(LoggingMixin): "The DAG assigned to {} can not be changed.".format(self)) elif self.task_id not in dag.task_dict: dag.add_task(self) - +elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is not self: +dag.add_task(self) self._dag = dag def has_dag(self): diff --git a/airflow/models/dag.py b/airflow/models/dag.py index c6b9171..4d7eef8 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -1329,7 +1329,7 @@ class DAG(BaseDag, LoggingMixin): elif task.end_date and self.end_date: task.end_date = min(task.end_date, self.end_date) -if task.task_id in self.task_dict: +if task.task_id in self.task_dict and self.task_dict[task.task_id] is not task: # TODO: raise an error in Airflow 2.0 warnings.warn( 'The requested task could not be added to the DAG because a ' diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 580a3c7..cdbe1ee 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -25,6 +25,7 @@ import re import unittest from tempfile import NamedTemporaryFile +import pytest from parameterized import parameterized from tests.compat import mock @@ -36,6 +37,7 @@ from airflow import models, settings from airflow.configuration import conf from airflow.exceptions import AirflowException, AirflowDagCycleException from airflow.models import DAG, DagModel, TaskInstance as TI +from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator from airflow.utils import timezone @@ -901,3 +903,41 @@ class DagTest(unittest.TestCase): self.assertEqual(dag.normalized_schedule_interval, expected_n_schedule_interval) self.assertEqual(dag.schedule_interval, schedule_interval) + +def test_duplicate_task_ids_raise_warning_with_dag_context_manager(self): +"""Verify tasks with Duplicate task_id show warning""" + +deprecation_msg = "The requested task could not be added to the DAG because a task with " \ + "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \ + "overwrite a task will raise an exception." + +with pytest.warns(PendingDeprecationWarning) as record: +with DAG("test_dag", start_date=DEFAULT_DATE) as dag: +t1 = DummyOperator(task_id="t1") +t2 = BashOperator(task_id="t1", bash_command="sleep 1") +t1 >> t2 + +warning = record[0] +assert str(warning.message) == deprecation_msg +assert issubclass(PendingDeprecationWarning, warning.category) + +self.assertEqual(dag.task_dict, {t1.task_id: t1}) + +def test_duplicate_task_ids_raise_warning(self): +"""Verify tasks with Duplicate task_id show warning""" + +deprecation_msg = "The requested task could not be added to the DAG because a task with " \ + "task_id t1 is already in the DAG. Starting in Airflow 2.0, trying to " \ + "overwrite a task will raise an exception." + +with pytest.warns(PendingDeprecationWarning) as record: +dag = DAG("test_dag", start_date=DEFAULT_DATE) +t1 = DummyOperator(task_id="t1", dag=dag) +t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag) +t1 >> t2 + +warning = record[0] +assert str(warning.message) == deprecation_msg +assert issubclass(PendingDeprecationWarning, warning.category) + +self.assertEqual(dag.task_dict, {t1.task_id: t1})
[GitHub] [airflow] mik-laj opened a new pull request #8768: [POC] Mark keywords-only arguments in method signatures
mik-laj opened a new pull request #8768: URL: https://github.com/apache/airflow/pull/8768 Hello, I am not sure if this feature is supported by all our tools, especially flake8, so I run the CI task to check it. Best regards, Kamil --- Make sure to mark the boxes below before creating PR: [x] - [X] Description above provides context of the change - [X] Unit tests coverage for changes (not needed for documentation changes) - [X] Target Github ISSUE in description if exists - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [X] Relevant documentation is updated including usage instructions. - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ipeluffo edited a comment on issue #8311: Task Duration Missing from the Graph View Tool-tip for Running Tasks
ipeluffo edited a comment on issue #8311: URL: https://github.com/apache/airflow/issues/8311#issuecomment-625382511 I noticed this issue also in (some) tasks on finished DAGs. Has anyone else noticed this? ![Screenshot 2020-05-07 at 18 10 31](https://user-images.githubusercontent.com/889705/81324130-69065580-908e-11ea-8f74-d424b3709eb1.png) ![Screenshot 2020-05-07 at 18 09 54](https://user-images.githubusercontent.com/889705/81324138-6b68af80-908e-11ea-88a7-ca28aa7ba643.png) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ipeluffo commented on issue #8311: Task Duration Missing from the Graph View Tool-tip for Running Tasks
ipeluffo commented on issue #8311: URL: https://github.com/apache/airflow/issues/8311#issuecomment-625382511 I noticed this issue also in (some) tasks on finished DAGs. Has anyone else noticed this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] anitakar commented on pull request #8718: Set store_serialized_dags from config in UI
anitakar commented on pull request #8718: URL: https://github.com/apache/airflow/pull/8718#issuecomment-625364951 > > Hi, > > I will collectively answer the questions here. > > Many of the changes that I have made have been a result of setting store_serialized_dags from conf in DagBag init method in one of squashed together commits. > > Before I decided that it is better to leave it with False default I started to change it to False in all places where it broke things. > > And it broke things in CLI, dag parsing code and this migration: airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py > > What error exactly? And is that error reproducible in Airflow 1.10.10, or is it only a problem in the Cloud Composer version? > No, it is not a problem in Cloud Composer. At some point I made the change in DagBag to read store_serialized_dags instead of setting it to false but it was intermediate state. It was never released like that without a fix. > > This change is in fact noop so maybe we should drop it. > > It actually is about not reading core.store_serialized_dags on server startup. I had also problems with tests because of that. But if that operation is expensive and people will not probably be turning dag serialization on and off, then maybe it is better to drop it. > > Airflow does not support changing config at runtime -- it is not possible. It is only the tests that are able to change settings. > Then I shall drop the change of removing STORE_SERIALIZED_DAGS from settings. But the change is not a no-op in the end. It had different code for accessing dag code in UI that you fixed later on. Composer for a moment in Airflow 1.10.6 used storing, deleting and saving one by one but community version uses batch operations that I saw you had to fix. But there is an actual fix for non-RBAC www for setting paused on DAG. I shall create a test for it. I can also remember that we had problems with calling run from non-RBAC UI but I am sure if it was true for open sourced version somebody would have noticed it by now. > > Here is a similar change to Airflow 1.10.10: > > #8764 > > This one actually fixes starting dag from UI. > > In this change I can also see dag async dag bag loading was dropped altogether in Airflow 1.10.10. Am I correct? > > No, async dag bag loading was _never in_ Airflow. Either dag serialization is enabled, in which case the webserver loads it form the serialized tables in the DB, or it's not and the existing mechnaism is in place. Having a _third_ method of loading dags when dag serialization exists and fixes the performance overhead seems like extra code to maintain for little benefit. > > Also: there is a reasonable chance that in Airflow 2.0 serializing dags may be the _only_ option. Oh, so it was Composer internal feature for clients with lots of DAGs. Anyways, our intention was to always contribute back to community, so I guess it is not a problem. But as there is serialized dags solution fully working this workaround can be dropped. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #8746: Remove old airflow logger causing side effects in tests
kaxil commented on a change in pull request #8746: URL: https://github.com/apache/airflow/pull/8746#discussion_r421625121 ## File path: tests/test_logging_config.py ## @@ -97,6 +93,33 @@ SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings' +def reset_logging(): +"""Reset Logging""" +manager = logging.root.manager +manager.disabled = logging.NOTSET +airflow_loggers = [ +logger for logger_name, logger in manager.loggerDict.items() if logger_name.startswith('airflow') +] +for logger in airflow_loggers: # pylint: disable=too-many-nested-blocks +if isinstance(logger, logging.Logger): Review comment: We already do that in `settings_context` but it does seem to clear/reset everything: https://github.com/apache/airflow/blob/7285e61c42c89799e967ce9f0d391707ee02664d/tests/test_logging_config.py#L186-L187 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ashb commented on a change in pull request #8746: Remove old airflow logger causing side effects in tests
ashb commented on a change in pull request #8746: URL: https://github.com/apache/airflow/pull/8746#discussion_r421621414 ## File path: tests/test_logging_config.py ## @@ -97,6 +93,33 @@ SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings' +def reset_logging(): +"""Reset Logging""" +manager = logging.root.manager +manager.disabled = logging.NOTSET +airflow_loggers = [ +logger for logger_name, logger in manager.loggerDict.items() if logger_name.startswith('airflow') +] +for logger in airflow_loggers: # pylint: disable=too-many-nested-blocks +if isinstance(logger, logging.Logger): Review comment: could we not just reload the logging config? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature
turbaszek commented on a change in pull request #8651: URL: https://github.com/apache/airflow/pull/8651#discussion_r421604020 ## File path: docs/howto/use-additional-execute-contextmanager.rst ## @@ -0,0 +1,47 @@ + .. Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + + + +Defining Additional Execute Context Manager +=== + +Creating new context manager + + +Users can create their own execution context manager to allow context management on a higher level. +To do so, one must define a new context manager in one of their files: + +.. code-block:: python + + import contextlib + + + @contextlib.contextmanager + def example_user_context_manager(task_instance, execution_context): + """ + An example of a context manager that a user may provide. + """ + in_context = True + try: + yield in_context + finally: + in_context = False Review comment: Can we make this example more detailed? For example with authenticating or checking something before/after execution? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature
turbaszek commented on a change in pull request #8651: URL: https://github.com/apache/airflow/pull/8651#discussion_r421602145 ## File path: airflow/models/taskinstance.py ## @@ -1110,6 +1116,27 @@ def signal_handler(signum, frame): session.merge(self) session.commit() +def get_additional_execution_contextmanager(self, execution_context): +""" +Retrieves the user defined execution context callback from the configuration, +and validates that it is indeed a context manager + +:param execution_context: the current execution context to be passed to user ctx +""" +additional_execution_contextmanager = conf.getimport("core", "additional_execute_contextmanager") +if additional_execution_contextmanager: +try: +user_ctx_obj = additional_execution_contextmanager(self, execution_context) +if hasattr(user_ctx_obj, "__enter__") and hasattr(user_ctx_obj, "__exit__"): +return user_ctx_obj +else: +raise AirflowException(f"Loaded function {additional_execution_contextmanager} " + f"as additional execution contextmanager, but it does not have " + f"__enter__ or __exit__ method!") Review comment: What about `@contextlib.contextmanager`? I think this check works only for context managers defined as a class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] potiuk opened a new pull request #8767: Backport packages are renamed to include backport in their name
potiuk opened a new pull request #8767: URL: https://github.com/apache/airflow/pull/8767 --- Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Target Github ISSUE in description if exists - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature
jonathanshir commented on a change in pull request #8651: URL: https://github.com/apache/airflow/pull/8651#discussion_r421592500 ## File path: airflow/config_templates/config.yml ## @@ -366,6 +366,17 @@ type: string example: "path.to.CustomXCom" default: "airflow.models.xcom.BaseXCom" +- name: additional_execute_contextmanager + description: | +Custom user function that returns a context manager. Syntax is "package.method". +Context is entered when operator starts executing task. __enter__() will be called +before the operator's execute method, and __exit__() shortly after. +Function's signature should accept two positional parameters - task instance +and execution context + version_added: 2.0.0 + type: string + example: ~ Review comment: Sorry, I was wrong - example field instead of default field :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2
[ https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101727#comment-17101727 ] ASF GitHub Bot commented on AIRFLOW-4543: - serkef commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421559066 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: I want to test the `get_token` which is called in the `__init__` so this is why I'm creating a dummy subclass with empty init. Is there a better way to do it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Update slack operator to support slackclient v2 > --- > > Key: AIRFLOW-4543 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4543 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks, operators >Reporter: Sergio Kef >Assignee: Sergio Kef >Priority: Major > > Official [Slack API for python|https://pypi.org/project/slackclient/] has > recently released > [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0] > Among others some important points: > * Async IO > * SSL and Proxy > * Dropping 2.7 support > Opening this ticket to work on the upgrade. Current functionalities will be > migrated and will try to extend functionalities, if possible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] serkef commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2
serkef commented on a change in pull request #5519: URL: https://github.com/apache/airflow/pull/5519#discussion_r421559066 ## File path: tests/providers/slack/hooks/test_slack.py ## @@ -19,82 +19,154 @@ import unittest import mock +from slack.errors import SlackApiError from airflow.exceptions import AirflowException from airflow.providers.slack.hooks.slack import SlackHook class TestSlackHook(unittest.TestCase): -def test_init_with_token_only(self): + +def test___get_token_with_token_only(self): +""" tests `__get_token` method when only token is provided """ +# Given test_token = 'test_token' -slack_hook = SlackHook(token=test_token, slack_conn_id=None) +test_conn_id = None + +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass -self.assertEqual(slack_hook.token, test_token) +# Run +hook = DummySlackHook() + +# Assert +output = hook._SlackHook__get_token(test_token, test_conn_id) # pylint: disable=E1101 +expected = test_token +self.assertEqual(output, expected) @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection') -def test_init_with_valid_slack_conn_id_only(self, get_connection_mock): +def test___get_token_with_valid_slack_conn_id_only(self, get_connection_mock): +""" tests `__get_token` method when only connection is provided """ +# Given +test_token = None +test_conn_id = 'x' test_password = 'test_password' + +# Mock get_connection_mock.return_value = mock.Mock(password=test_password) -test_slack_conn_id = 'test_slack_conn_id' -slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id) +# Creating a dummy subclass is the easiest way to avoid running init +# which is actually using the method we are testing +class DummySlackHook(SlackHook): +def __init__(self, *args, **kwargs): # pylint: disable=W0231 +pass Review comment: I want to test the `get_token` which is called in the `__init__` so this is why I'm creating a dummy subclass with empty init. Is there a better way to do it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org