[GitHub] [airflow] abdulbasitds commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-07 Thread GitBox


abdulbasitds commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625666407


   > I ran some end-to-end tests using this code in one of my dag files. It's 
working ok if you make those changes following my comments.
   > 
   > Right now I'm trying to retrieve the logs generated by Glue jobs so that 
we could print out the logs in Airflow. Without the logs in Airflow, debugging 
failed jobs is a hassle.
   > 
   > Update:
   > Obtaining logs is easy, just add this in the operator class:
   > 
   > ```python
   > GLUE_LOGS_GROUP = "/aws-glue/jobs/output"
   > GLUE_ERRS_GROUP = "/aws-glue/jobs/error"
   > ```
   > 
   > as class attributes
   > 
   > ```python
   > def get_glue_logs(self, log_group_name, log_stream_name):
   > """Glue logs are too chatty, only get the ones that have errors"""
   > self.log.info('Glue job logs output from group %s:', 
log_group_name)
   > for event in self.get_logs_hook().get_log_events(
   > log_group_name,
   > log_stream_name,
   > ):
   > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
   > event_msg = event['message']
   > # Glue logs are extremely chatty, we only get log entries that 
have "error"
   > if "error" in event_msg:
   > self.log.info("[%s] %s", event_dt.isoformat(), event_msg)
   > 
   > def get_logs_hook(self):
   > """Create and return an AwsLogsHook."""
   > return AwsLogsHook(
   > aws_conn_id=self.aws_conn_id,
   > region_name=self.awslogs_region
   > )
   > ```
   > 
   > as class methods, and call it in `execute()`
   > 
   > ```python
   > glue_job_run_id = glue_job_run['JobRunId']
   > 
   > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id)
   > ```
   
   
   
   > I ran some end-to-end tests using this code in one of my dag files. It's 
working ok if you make those changes following my comments.
   > 
   > Right now I'm trying to retrieve the logs generated by Glue jobs so that 
we could print out the logs in Airflow. Without the logs in Airflow, debugging 
failed jobs is a hassle.
   > 
   > Update:
   > Obtaining logs is easy, just add this in the operator class:
   > 
   > ```python
   > GLUE_LOGS_GROUP = "/aws-glue/jobs/output"
   > GLUE_ERRS_GROUP = "/aws-glue/jobs/error"
   > ```
   > 
   > as class attributes
   > 
   > ```python
   > def get_glue_logs(self, log_group_name, log_stream_name):
   > """Glue logs are too chatty, only get the ones that have errors"""
   > self.log.info('Glue job logs output from group %s:', 
log_group_name)
   > for event in self.get_logs_hook().get_log_events(
   > log_group_name,
   > log_stream_name,
   > ):
   > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
   > event_msg = event['message']
   > # Glue logs are extremely chatty, we only get log entries that 
have "error"
   > if "error" in event_msg:
   > self.log.info("[%s] %s", event_dt.isoformat(), event_msg)
   > 
   > def get_logs_hook(self):
   > """Create and return an AwsLogsHook."""
   > return AwsLogsHook(
   > aws_conn_id=self.aws_conn_id,
   > region_name=self.awslogs_region
   > )
   > ```
   > 
   > as class methods, and call it in `execute()`
   > 
   > ```python
   > glue_job_run_id = glue_job_run['JobRunId']
   > 
   > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id)
   > ```
   
   @zachliu may be we can extend logging latter, for now it would be difficult 
for me to provide description to the argument and make sure if the code is 
correct as I havent used loggin much



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102317#comment-17102317
 ] 

ASF GitHub Bot commented on AIRFLOW-2310:
-

abdulbasitds commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625666407


   > I ran some end-to-end tests using this code in one of my dag files. It's 
working ok if you make those changes following my comments.
   > 
   > Right now I'm trying to retrieve the logs generated by Glue jobs so that 
we could print out the logs in Airflow. Without the logs in Airflow, debugging 
failed jobs is a hassle.
   > 
   > Update:
   > Obtaining logs is easy, just add this in the operator class:
   > 
   > ```python
   > GLUE_LOGS_GROUP = "/aws-glue/jobs/output"
   > GLUE_ERRS_GROUP = "/aws-glue/jobs/error"
   > ```
   > 
   > as class attributes
   > 
   > ```python
   > def get_glue_logs(self, log_group_name, log_stream_name):
   > """Glue logs are too chatty, only get the ones that have errors"""
   > self.log.info('Glue job logs output from group %s:', 
log_group_name)
   > for event in self.get_logs_hook().get_log_events(
   > log_group_name,
   > log_stream_name,
   > ):
   > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
   > event_msg = event['message']
   > # Glue logs are extremely chatty, we only get log entries that 
have "error"
   > if "error" in event_msg:
   > self.log.info("[%s] %s", event_dt.isoformat(), event_msg)
   > 
   > def get_logs_hook(self):
   > """Create and return an AwsLogsHook."""
   > return AwsLogsHook(
   > aws_conn_id=self.aws_conn_id,
   > region_name=self.awslogs_region
   > )
   > ```
   > 
   > as class methods, and call it in `execute()`
   > 
   > ```python
   > glue_job_run_id = glue_job_run['JobRunId']
   > 
   > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id)
   > ```
   
   
   
   > I ran some end-to-end tests using this code in one of my dag files. It's 
working ok if you make those changes following my comments.
   > 
   > Right now I'm trying to retrieve the logs generated by Glue jobs so that 
we could print out the logs in Airflow. Without the logs in Airflow, debugging 
failed jobs is a hassle.
   > 
   > Update:
   > Obtaining logs is easy, just add this in the operator class:
   > 
   > ```python
   > GLUE_LOGS_GROUP = "/aws-glue/jobs/output"
   > GLUE_ERRS_GROUP = "/aws-glue/jobs/error"
   > ```
   > 
   > as class attributes
   > 
   > ```python
   > def get_glue_logs(self, log_group_name, log_stream_name):
   > """Glue logs are too chatty, only get the ones that have errors"""
   > self.log.info('Glue job logs output from group %s:', 
log_group_name)
   > for event in self.get_logs_hook().get_log_events(
   > log_group_name,
   > log_stream_name,
   > ):
   > event_dt = datetime.fromtimestamp(event['timestamp'] / 1000.0)
   > event_msg = event['message']
   > # Glue logs are extremely chatty, we only get log entries that 
have "error"
   > if "error" in event_msg:
   > self.log.info("[%s] %s", event_dt.isoformat(), event_msg)
   > 
   > def get_logs_hook(self):
   > """Create and return an AwsLogsHook."""
   > return AwsLogsHook(
   > aws_conn_id=self.aws_conn_id,
   > region_name=self.awslogs_region
   > )
   > ```
   > 
   > as class methods, and call it in `execute()`
   > 
   > ```python
   > glue_job_run_id = glue_job_run['JobRunId']
   > 
   > self.get_glue_logs(self.GLUE_LOGS_GROUP, glue_job_run_id)
   > ```
   
   @zachliu may be we can extend logging latter, for now it would be difficult 
for me to provide description to the argument and make sure if the code is 
correct as I havent used loggin much



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable AWS Glue Job Integration
> ---
>
> Key: AIRFLOW-2310
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2310
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Olalekan Elesin
>Assignee: Olalekan Elesin
>Priority: Major
>  Labels: AWS
>
> Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs 
> and ETL pipelines can be orchestrated with Airflow



--
T

[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102313#comment-17102313
 ] 

ASF GitHub Bot commented on AIRFLOW-2310:
-

abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r421971459



##
File path: airflow/providers/amazon/aws/operators/glue.py
##
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type retry_limit:Optional[int]
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type iam_role_name: Optional[str]
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args=None,
+ connections=None,
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args or {}
+self.connections = connections or []
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+
+:return: the id of the current glue job.
+"""
+if not self.script_location.startsWith(self.S3_PROTOCOL):

Review comment:
   @zachliu
   I have made this change. 
   @feluelle  scrpt_location is already Optional[str]





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable AWS Gl

[GitHub] [airflow] abdulbasitds commented on a change in pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-07 Thread GitBox


abdulbasitds commented on a change in pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#discussion_r421971459



##
File path: airflow/providers/amazon/aws/operators/glue.py
##
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import unicode_literals
+
+from airflow.providers.amazon.aws.hooks.glue import AwsGlueJobHook
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+import os.path
+
+
+class AWSGlueJobOperator(BaseOperator):
+"""
+Creates an AWS Glue Job. AWS Glue is a serverless Spark
+ETL service for running Spark Jobs on the AWS cloud.
+Language support: Python and Scala
+
+:param job_name: unique job name per AWS Account
+:type job_name: Optional[str]
+:param script_location: location of ETL script. Must be a local or S3 path
+:type script_location: Optional[str]
+:param job_desc: job description details
+:type job_desc: Optional[str]
+:param concurrent_run_limit: The maximum number of concurrent runs allowed 
for a job
+:type concurrent_run_limit: Optional[int]
+:param script_args: etl script arguments and AWS Glue arguments
+:type script_args: dict
+:param connections: AWS Glue connections to be used by the job.
+:type connections: list
+:param retry_limit: The maximum number of times to retry this job if it 
fails
+:type retry_limit:Optional[int]
+:param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job.
+:type num_of_dpus: int
+:param region_name: aws region name (example: us-east-1)
+:type region_name: str
+:param s3_bucket: S3 bucket where logs and local etl script will be 
uploaded
+:type s3_bucket: Optional[str]
+:param iam_role_name: AWS IAM Role for Glue Job Execution
+:type iam_role_name: Optional[str]
+"""
+template_fields = ()
+template_ext = ()
+ui_color = '#ededed'
+
+@apply_defaults
+def __init__(self,
+ job_name='aws_glue_default_job',
+ job_desc='AWS Glue Job with Airflow',
+ script_location=None,
+ concurrent_run_limit=None,
+ script_args=None,
+ connections=None,
+ retry_limit=None,
+ num_of_dpus=6,
+ aws_conn_id='aws_default',
+ region_name=None,
+ s3_bucket=None,
+ iam_role_name=None,
+ *args, **kwargs
+ ):
+super(AWSGlueJobOperator, self).__init__(*args, **kwargs)
+self.job_name = job_name
+self.job_desc = job_desc
+self.script_location = script_location
+self.concurrent_run_limit = concurrent_run_limit
+self.script_args = script_args or {}
+self.connections = connections or []
+self.retry_limit = retry_limit
+self.num_of_dpus = num_of_dpus
+self.aws_conn_id = aws_conn_id,
+self.region_name = region_name
+self.s3_bucket = s3_bucket
+self.iam_role_name = iam_role_name
+self.S3_PROTOCOL = "s3://"
+self.S3_ARTIFACTS_PREFIX = 'artifacts/glue-scripts/'
+
+def execute(self, context):
+"""
+Executes AWS Glue Job from Airflow
+
+:return: the id of the current glue job.
+"""
+if not self.script_location.startsWith(self.S3_PROTOCOL):

Review comment:
   @zachliu
   I have made this change. 
   @feluelle  scrpt_location is already Optional[str]





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #8779: Webserver OOM killed by Kubernetes when using KubernetesExecutor

2020-05-07 Thread GitBox


boring-cyborg[bot] commented on issue #8779:
URL: https://github.com/apache/airflow/issues/8779#issuecomment-625652250


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] LarsAlmgren opened a new issue #8779: Webserver OOM killed by Kubernetes when using KubernetesExecutor

2020-05-07 Thread GitBox


LarsAlmgren opened a new issue #8779:
URL: https://github.com/apache/airflow/issues/8779


   
   
   
   
   **Apache Airflow version**:
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`): 1.17.5
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS
   - **OS** (e.g. from /etc/os-release): Ubuntu 16.04.6 LTS
   - **Kernel** (e.g. `uname -a`): `Linux salt 4.4.0-1106-aws #117-Ubuntu SMP 
Wed Apr 8 09:52:02 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux`
   
   **What happened**:
   
   We switched from `LocalExecutor` to `KubernetesExecutor` and noticed that 
the webserver started to get OOM killed by Kubernetes. We solved it by 
increasing the memory resources / limit in Kubernetes.
   
   
   
   **What you expected to happen**:
   
   I did not expect the webserver to be dependent on what executor is used.
   
   
   
   **How to reproduce it**:
   
   We ran the webserver before using `KubernetesExecutor` with:
   ```
   resources:
   requests:
 memory: "1Gi"
 cpu: "500m"
   limits:
 memory: "1Gi"
 cpu: "500m"
   ```
   
   Run the webserver with minimal resources and then switch to 
`KubernetesExecutor`.
   
   
   
   
   **Anything else we need to know**:
   
   We have noticed this in both environments where we started using 
`KubernetesExecutor`.
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy opened a new pull request #8778: Add separate example DAGs and system tests for google cloud speech

2020-05-07 Thread GitBox


ephraimbuddy opened a new pull request #8778:
URL: https://github.com/apache/airflow/pull/8778


   ---
   This PR fixes some of the issues in #8280 
   
   - Separated the speech system tests into different modules
   - Separated the guides into different files
   - Added different examples for the speech operators
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated (6e4f5fa -> b7566e1)

2020-05-07 Thread kamilbregula
This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from 6e4f5fa  [AIRFLOW-4568]The ExternalTaskSensor should be configurable 
to raise an Airflow Exception in case the poked external task reaches a 
disallowed state, such as f.i. failed (#8509)
 add b7566e1  Add SQL query tracking for pytest (#8754)

No new revisions were added by this update.

Summary of changes:
 TESTING.rst | 30 
 scripts/perf/perf_kit/__init__.py   |  2 +-
 scripts/perf/perf_kit/sqlalchemy.py | 91 +++--
 tests/conftest.py   | 70 
 4 files changed, 168 insertions(+), 25 deletions(-)



[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   Sharp eyes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102228#comment-17102228
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   Sharp eyes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102227#comment-17102227
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   Sharp eyes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102226#comment-17102226
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923688



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   OK got it. Since the tests pass anyways without any scheduler deadlock, 
there shouldn't be any problem. I will remove the comments





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923733



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   Sharp eyes!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] TeddyHartanto commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


TeddyHartanto commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421923688



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   OK got it. Since the tests pass anyways without any scheduler deadlock, 
there shouldn't be any problem. I will remove the comments





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] schnie opened a new pull request #8777: Add helm chart

2020-05-07 Thread GitBox


schnie opened a new pull request #8777:
URL: https://github.com/apache/airflow/pull/8777


   This PR adds a default helm chart for Airflow. This chart is based on the 
one we use at Astronomer to manage hundreds of production deployments.
   
   The chart has been cleaned up to remove any references to the Astronomer 
platform or anything specific to the surrounding infrastructure.
   
   The `bin` directory contains some scripts to help run this chart locally in 
[kind](https://kind.sigs.k8s.io/docs/user/quick-start/), as well as packaging 
and releasing it to a helm repository. The scripts reference the Astronomer 
helm repo right now, but I figured we could use it as a starting point and 
tweak these scripts, or remove them completely to fit into how we want to 
manage this going forward. Opening this as a draft PR for now to start some 
discussion.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] tag nightly-master updated (bd29ee3 -> 6e4f5fa)

2020-05-07 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to tag nightly-master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


*** WARNING: tag nightly-master was modified! ***

from bd29ee3  (commit)
  to 6e4f5fa  (commit)
from bd29ee3  Ensure test_logging_config.test_reload_module works in spawn 
mode. (#8741)
 add d15839d  Latest debian-buster release broke image build (#8758)
 add ff5b701  Add google_api_to_s3_transfer example dags and system tests 
(#8581)
 add 7c04604  Add google_api_to_s3_transfer docs howto link (#8761)
 add 723c52c  Add documentation for SpannerDeployInstanceOperator (#8750)
 add 6e4f5fa  [AIRFLOW-4568]The ExternalTaskSensor should be configurable 
to raise an Airflow Exception in case the poked external task reaches a 
disallowed state, such as f.i. failed (#8509)

No new revisions were added by this update.

Summary of changes:
 Dockerfile.ci  |   1 +
 TESTING.rst|  16 +--
 .../example_external_task_marker_dag.py|  14 ++
 .../example_google_api_to_s3_transfer_advanced.py  | 132 +++
 .../example_google_api_to_s3_transfer_basic.py |  56 
 .../providers/google/cloud/operators/spanner.py|   4 +
 airflow/sensors/external_task_sensor.py|  68 --
 backport_packages/setup_backport_packages.py   |   7 +
 .../amazon/aws/google_api_to_s3_transfer.rst   | 141 +
 docs/howto/operator/external_task_sensor.rst   |   5 +
 docs/howto/operator/gcp/spanner.rst|  38 ++
 docs/operators-and-hooks-ref.rst   |   4 +-
 .../test_google_api_to_s3_transfer_system.py   |  65 ++
 tests/sensors/test_external_task_sensor.py |  58 -
 tests/test_utils/amazon_system_helpers.py  |  83 
 15 files changed, 667 insertions(+), 25 deletions(-)
 create mode 100644 
airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
 create mode 100644 
airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py
 create mode 100644 docs/howto/operator/amazon/aws/google_api_to_s3_transfer.rst
 create mode 100644 
tests/providers/amazon/aws/operators/test_google_api_to_s3_transfer_system.py
 create mode 100644 tests/test_utils/amazon_system_helpers.py



[airflow] tag nightly-master updated (bd29ee3 -> 6e4f5fa)

2020-05-07 Thread github-bot
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to tag nightly-master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


*** WARNING: tag nightly-master was modified! ***

from bd29ee3  (commit)
  to 6e4f5fa  (commit)
from bd29ee3  Ensure test_logging_config.test_reload_module works in spawn 
mode. (#8741)
 add d15839d  Latest debian-buster release broke image build (#8758)
 add ff5b701  Add google_api_to_s3_transfer example dags and system tests 
(#8581)
 add 7c04604  Add google_api_to_s3_transfer docs howto link (#8761)
 add 723c52c  Add documentation for SpannerDeployInstanceOperator (#8750)
 add 6e4f5fa  [AIRFLOW-4568]The ExternalTaskSensor should be configurable 
to raise an Airflow Exception in case the poked external task reaches a 
disallowed state, such as f.i. failed (#8509)

No new revisions were added by this update.

Summary of changes:
 Dockerfile.ci  |   1 +
 TESTING.rst|  16 +--
 .../example_external_task_marker_dag.py|  14 ++
 .../example_google_api_to_s3_transfer_advanced.py  | 132 +++
 .../example_google_api_to_s3_transfer_basic.py |  56 
 .../providers/google/cloud/operators/spanner.py|   4 +
 airflow/sensors/external_task_sensor.py|  68 --
 backport_packages/setup_backport_packages.py   |   7 +
 .../amazon/aws/google_api_to_s3_transfer.rst   | 141 +
 docs/howto/operator/external_task_sensor.rst   |   5 +
 docs/howto/operator/gcp/spanner.rst|  38 ++
 docs/operators-and-hooks-ref.rst   |   4 +-
 .../test_google_api_to_s3_transfer_system.py   |  65 ++
 tests/sensors/test_external_task_sensor.py |  58 -
 tests/test_utils/amazon_system_helpers.py  |  83 
 15 files changed, 667 insertions(+), 25 deletions(-)
 create mode 100644 
airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py
 create mode 100644 
airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_basic.py
 create mode 100644 docs/howto/operator/amazon/aws/google_api_to_s3_transfer.rst
 create mode 100644 
tests/providers/amazon/aws/operators/test_google_api_to_s3_transfer_system.py
 create mode 100644 tests/test_utils/amazon_system_helpers.py



[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102169#comment-17102169
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421894092



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   yes, remove the comments in the PR, should be fine





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102165#comment-17102165
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421893773



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   Scheduler deadlock in which case, there are known issues of Scheduler 
deadlock if MySql. Other than that I am not sure if there are any





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421894092



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   yes, remove the comments in the PR, should be fine





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421893773



##
File path: tests/models/test_dagrun.py
##
@@ -552,3 +561,131 @@ def with_all_tasks_removed(dag):
 dagrun.verify_integrity()
 flaky_ti.refresh_from_db()
 self.assertEqual(State.NONE, flaky_ti.state)
+
+def test_depends_on_past(self):
+# dag_id = 'test_depends_on_past'

Review comment:
   Scheduler deadlock in which case, there are known issues of Scheduler 
deadlock if MySql. Other than that I am not sure if there are any





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4549) wait_for_downstream does not respect skipped tasks

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102164#comment-17102164
 ] 

ASF GitHub Bot commented on AIRFLOW-4549:
-

kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421893536



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   ```suggestion
   # dag2 has been moved to test_prev_dagrun_deps.py
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> wait_for_downstream does not respect skipped tasks
> --
>
> Key: AIRFLOW-4549
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4549
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Dima Kamalov
>Assignee: Teddy Hartanto
>Priority: Major
>
> See 
> [http://mail-archives.apache.org/mod_mbox/airflow-dev/201609.mbox/%3ccaheep7utgpjvkgww9_9n5fupnu+pskf3rmbvxugk5dxb6bh...@mail.gmail.com%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #7735: [AIRFLOW-4549] Allow skipped tasks to satisfy wait_for_downstream

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #7735:
URL: https://github.com/apache/airflow/pull/7735#discussion_r421893536



##
File path: tests/dags/test_issue_1225.py
##
@@ -47,12 +47,7 @@ def fail():
 dag=dag1,
 pool='test_backfill_pooled_task_pool',)
 
-# DAG tests depends_on_past dependencies
-dag2 = DAG(dag_id='test_depends_on_past', default_args=default_args)
-dag2_task1 = DummyOperator(
-task_id='test_dop_task',
-dag=dag2,
-depends_on_past=True,)
+# dag2 has been moved to test_past_dagrun_deps.py

Review comment:
   ```suggestion
   # dag2 has been moved to test_prev_dagrun_deps.py
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102162#comment-17102162
 ] 

ASF GitHub Bot commented on AIRFLOW-4543:
-

kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892844



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   Pushed a commit to PR: 
https://github.com/apache/airflow/pull/5519/commits/d81d6e947ba364b0dd9e730507e51ff2a0d3d3e2
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update slack operator to support slackclient v2
> ---
>
> Key: AIRFLOW-4543
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4543
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks, operators
>Reporter: Sergio Kef
>Assignee: Sergio Kef
>Priority: Major
>
> Official [Slack API for python|https://pypi.org/project/slackclient/] has 
> recently released 
> [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0]
> Among others some important points:
>  * Async IO
>  * SSL and Proxy
>  * Dropping 2.7 support
> Opening this ticket to work on the upgrade. Current functionalities will be 
> migrated and will try to extend functionalities, if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892844



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   Pushed a commit to PR: 
https://github.com/apache/airflow/pull/5519/commits/d81d6e947ba364b0dd9e730507e51ff2a0d3d3e2
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892088



##
File path: airflow/providers/slack/hooks/slack.py
##
@@ -27,40 +28,88 @@
 # noinspection PyAbstractClass
 class SlackHook(BaseHook):
 """
+Creates a Slack connection, to be used for calls.
 Takes both Slack API token directly and connection that has Slack API 
token.
-
 If both supplied, Slack API token will be used.
+Exposes also the rest of slack.WebClient args.
+
+Examples:
+
+.. code-block:: python
+
+# Create hook
+slack_hook = SlackHook(token="xxx")  # or slack_hook = 
SlackHook(slack_conn_id="slack")
+
+# Call generic API with parameters (errors are handled by hook)
+#  For more details check 
https://api.slack.com/methods/chat.postMessage
+slack_hook.call("chat.postMessage", json={"channel": "#random", 
"text": "Hello world!"})
+
+# Call method from Slack SDK (you have to handle errors yourself)
+#  For more details check 
https://slack.dev/python-slackclient/basic_usage.html#sending-a-message
+slack_hook.client.chat_postMessage(channel="#random", text="Hello 
world!")
 
 :param token: Slack API token
+:type token: str
 :param slack_conn_id: connection that has Slack API token in the password 
field
+:type slack_conn_id: str
+:param use_session: A boolean specifying if the client should take 
advantage of
+connection pooling. Default is True.
+:type use_session: bool
+:param base_url: A string representing the Slack API base URL. Default is
+``https://www.slack.com/api/``
+:type base_url: str
+:param timeout: The maximum number of seconds the client will wait
+to connect and receive a response from Slack. Default is 30 seconds.
+:type timeout: int
 """
-def __init__(self, token: Optional[str] = None, slack_conn_id: 
Optional[str] = None) -> None:
+
+def __init__(
+self,
+token: Optional[str] = None,
+slack_conn_id: Optional[str] = None,
+**client_args: Any,
+) -> None:
 super().__init__()
-self.token = self.__get_token(token, slack_conn_id)
+token = self.__get_token(token, slack_conn_id)

Review comment:
   ```suggestion
   self.token = self.__get_token(token, slack_conn_id)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102157#comment-17102157
 ] 

ASF GitHub Bot commented on AIRFLOW-4543:
-

kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892088



##
File path: airflow/providers/slack/hooks/slack.py
##
@@ -27,40 +28,88 @@
 # noinspection PyAbstractClass
 class SlackHook(BaseHook):
 """
+Creates a Slack connection, to be used for calls.
 Takes both Slack API token directly and connection that has Slack API 
token.
-
 If both supplied, Slack API token will be used.
+Exposes also the rest of slack.WebClient args.
+
+Examples:
+
+.. code-block:: python
+
+# Create hook
+slack_hook = SlackHook(token="xxx")  # or slack_hook = 
SlackHook(slack_conn_id="slack")
+
+# Call generic API with parameters (errors are handled by hook)
+#  For more details check 
https://api.slack.com/methods/chat.postMessage
+slack_hook.call("chat.postMessage", json={"channel": "#random", 
"text": "Hello world!"})
+
+# Call method from Slack SDK (you have to handle errors yourself)
+#  For more details check 
https://slack.dev/python-slackclient/basic_usage.html#sending-a-message
+slack_hook.client.chat_postMessage(channel="#random", text="Hello 
world!")
 
 :param token: Slack API token
+:type token: str
 :param slack_conn_id: connection that has Slack API token in the password 
field
+:type slack_conn_id: str
+:param use_session: A boolean specifying if the client should take 
advantage of
+connection pooling. Default is True.
+:type use_session: bool
+:param base_url: A string representing the Slack API base URL. Default is
+``https://www.slack.com/api/``
+:type base_url: str
+:param timeout: The maximum number of seconds the client will wait
+to connect and receive a response from Slack. Default is 30 seconds.
+:type timeout: int
 """
-def __init__(self, token: Optional[str] = None, slack_conn_id: 
Optional[str] = None) -> None:
+
+def __init__(
+self,
+token: Optional[str] = None,
+slack_conn_id: Optional[str] = None,
+**client_args: Any,
+) -> None:
 super().__init__()
-self.token = self.__get_token(token, slack_conn_id)
+token = self.__get_token(token, slack_conn_id)

Review comment:
   ```suggestion
   self.token = self.__get_token(token, slack_conn_id)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update slack operator to support slackclient v2
> ---
>
> Key: AIRFLOW-4543
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4543
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks, operators
>Reporter: Sergio Kef
>Assignee: Sergio Kef
>Priority: Major
>
> Official [Slack API for python|https://pypi.org/project/slackclient/] has 
> recently released 
> [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0]
> Among others some important points:
>  * Async IO
>  * SSL and Proxy
>  * Dropping 2.7 support
> Opening this ticket to work on the upgrade. Current functionalities will be 
> migrated and will try to extend functionalities, if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102156#comment-17102156
 ] 

ASF GitHub Bot commented on AIRFLOW-4543:
-

kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892012



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   There is no need of dummy subclasses, you can initialize the Slack 
directly, it is not calling slack API in the __init__ is it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update slack operator to support slackclient v2
> ---
>
> Key: AIRFLOW-4543
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4543
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks, operators
>Reporter: Sergio Kef
>Assignee: Sergio Kef
>Priority: Major
>
> Official [Slack API for python|https://pypi.org/project/slackclient/] has 
> recently released 
> [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0]
> Among others some important points:
>  * Async IO
>  * SSL and Proxy
>  * Dropping 2.7 support
> Opening this ticket to work on the upgrade. Current functionalities will be 
> migrated and will try to extend functionalities, if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421892012



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   There is no need of dummy subclasses, you can initialize the Slack 
directly, it is not calling slack API in the __init__ is it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state

2020-05-07 Thread GitBox


jaketf commented on issue #8673:
URL: https://github.com/apache/airflow/issues/8673#issuecomment-625581362


   I will reach out to my team and see if there are folks interested in joining 
airflow community by patching this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state

2020-05-07 Thread GitBox


mik-laj commented on issue #8673:
URL: https://github.com/apache/airflow/issues/8673#issuecomment-625579505


   We also currently have priorities set for other tasks. If it is very, we can 
change our plans. If it is very important, we can change our plans, but we 
would prefer to avoid it. Every new contributor is very welcome in this project 
and I will be able to find time to review the patch, verify it using system 
tests. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Work started] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)

2020-05-07 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-3369 started by Kaxil Naik.
---
> Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
> 
>
> Key: AIRFLOW-3369
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3369
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.10.0
>Reporter: Andrew Harmon
>Assignee: Kaxil Naik
>Priority: Major
> Attachments: image.png
>
>
> If you create a DAG with catchup=False, when it is un-paused, it creates 2 
> dag runs. One for the most recent scheduled interval (expected) and one for 
> the interval before that (unexpected).
> *Sample DAG*
> {code:java}
> from airflow import DAG
> from datetime import datetime
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(
> dag_id='DummyTest',
> start_date=datetime(2018,1,1),
> catchup=False
> )
> do = DummyOperator(
> task_id='dummy_task',
> dag=dag
> )
> {code}
> *Result:*
> 2 DAG runs are created. 2018-11-18 and 108-11-17
> *Expected Result:*
> Only 1 DAG run should have been created (2018-11-18)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)

2020-05-07 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik reassigned AIRFLOW-3369:
---

Assignee: Kaxil Naik

> Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
> 
>
> Key: AIRFLOW-3369
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3369
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.10.0
>Reporter: Andrew Harmon
>Assignee: Kaxil Naik
>Priority: Major
> Attachments: image.png
>
>
> If you create a DAG with catchup=False, when it is un-paused, it creates 2 
> dag runs. One for the most recent scheduled interval (expected) and one for 
> the interval before that (unexpected).
> *Sample DAG*
> {code:java}
> from airflow import DAG
> from datetime import datetime
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(
> dag_id='DummyTest',
> start_date=datetime(2018,1,1),
> catchup=False
> )
> do = DummyOperator(
> task_id='dummy_task',
> dag=dag
> )
> {code}
> *Result:*
> 2 DAG runs are created. 2018-11-18 and 108-11-17
> *Expected Result:*
> Only 1 DAG run should have been created (2018-11-18)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-3369) Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102146#comment-17102146
 ] 

ASF GitHub Bot commented on AIRFLOW-3369:
-

kaxil opened a new pull request #8776:
URL: https://github.com/apache/airflow/pull/8776


   https://issues.apache.org/jira/browse/AIRFLOW-3369
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Un-pausing a DAG with catchup =False creates an extra DAG run (1.10)
> 
>
> Key: AIRFLOW-3369
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3369
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Affects Versions: 1.10.0
>Reporter: Andrew Harmon
>Priority: Major
> Attachments: image.png
>
>
> If you create a DAG with catchup=False, when it is un-paused, it creates 2 
> dag runs. One for the most recent scheduled interval (expected) and one for 
> the interval before that (unexpected).
> *Sample DAG*
> {code:java}
> from airflow import DAG
> from datetime import datetime
> from airflow.operators.dummy_operator import DummyOperator
> dag = DAG(
> dag_id='DummyTest',
> start_date=datetime(2018,1,1),
> catchup=False
> )
> do = DummyOperator(
> task_id='dummy_task',
> dag=dag
> )
> {code}
> *Result:*
> 2 DAG runs are created. 2018-11-18 and 108-11-17
> *Expected Result:*
> Only 1 DAG run should have been created (2018-11-18)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil opened a new pull request #8776: WIP: [AIRFLOW-3369] BugFix: Unpausing a DAG with catchup=False creates an extra DAG run

2020-05-07 Thread GitBox


kaxil opened a new pull request #8776:
URL: https://github.com/apache/airflow/pull/8776


   https://issues.apache.org/jira/browse/AIRFLOW-3369
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on issue #8673: Data Fusion Hook Start pipeline will succeed before pipeline is in RUNNING state

2020-05-07 Thread GitBox


jaketf commented on issue #8673:
URL: https://github.com/apache/airflow/issues/8673#issuecomment-625578250


   CDAP / Data Fusion is not a priority for me right now. I will not be able to 
find cycles for this in the next few weeks.
   
   Perhaps you could find a user who cares about this integration, suggest it 
for a first time contributor, or reach out to data fusion engineering if your 
team does not have cycles for it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102098#comment-17102098
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

sethvargo commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421850039



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hey @kaxil - [here's the 
protocol](https://github.com/spring-cloud/spring-cloud-gcp/pull/2302) we'd like 
most integrators to try and align on.
   
   Latest is a floating alias that resolves to "the highest numbered secret 
version" for the secret. Often times, the team creating the secret

[GitHub] [airflow] sethvargo commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


sethvargo commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421850039



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hey @kaxil - [here's the 
protocol](https://github.com/spring-cloud/spring-cloud-gcp/pull/2302) we'd like 
most integrators to try and align on.
   
   Latest is a floating alias that resolves to "the highest numbered secret 
version" for the secret. Often times, the team creating the secret is separate 
from the team consuming the secret. In these cases, the latest version may not 
be active or usable yet.
   
   Generally, you want a workload to be coupled to a specific secret so you can 
roll forward and roll backward in the event something bre

[GitHub] [airflow] ashb commented on a change in pull request #8739: Test that DagFileProcessor can operator against on a Serialized DAG

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8739:
URL: https://github.com/apache/airflow/pull/8739#discussion_r421844181



##
File path: tests/jobs/test_scheduler_job.py
##
@@ -2253,7 +2307,17 @@ def evaluate_dagrun(
 self.null_exec.mock_task_fail(dag_id, tid, ex_date)
 
 try:
-dag.run(start_date=ex_date, end_date=ex_date, 
executor=self.null_exec, **run_kwargs)
+# This needs a _REAL_ dag, not the serialized version
+if not isinstance(dag, SerializedDAG):
+real_dag = dag
+else:
+# It may not be loaded. This "could" live in DagBag, but it's
+# only really needed here in tests, not in normal code.
+if dag_id not in self.non_serialized_dagbag.dag_ids:
+self.non_serialized_dagbag.process_file(dag.fileloc)
+
+real_dag = self.non_serialized_dagbag.get_dag(dag_id)

Review comment:
   Probably fixed by https://github.com/apache/airflow/pull/8775





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8775: Correctly restore upstream_task_ids when deserializing Operators

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8775:
URL: https://github.com/apache/airflow/pull/8775#discussion_r421843524



##
File path: airflow/providers/google/cloud/example_dags/example_gcs.py
##
@@ -126,8 +126,8 @@
 )
 
 # [START howto_operator_gcs_delete_bucket]
-delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket", 
bucket_name=BUCKET_1)
-delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket", 
bucket_name=BUCKET_2)
+delete_bucket_1 = GCSDeleteBucketOperator(task_id="delete_bucket_1", 
bucket_name=BUCKET_1)
+delete_bucket_2 = GCSDeleteBucketOperator(task_id="delete_bucket_2", 
bucket_name=BUCKET_2)

Review comment:
   @potiuk @mik-laj This change should be okay, right? (I can't think of 
any reason why it wouldn't be)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb opened a new pull request #8775: Correctly restore upstream_task_ids when deserializing Operators

2020-05-07 Thread GitBox


ashb opened a new pull request #8775:
URL: https://github.com/apache/airflow/pull/8775


   This test exposed a bug in one of the example dags, that wasn't caught by 
#6549. That will be a fixed in a separate issue, but it caused the  round-trip 
tests to fail here
   
   Should fix #8720, (but I haven't tested this. I was actually fixing another 
bug caused by the same place)
   
   (Depends on #8774 for the test rework only. Ignore the first commit in this 
PR when reviewing for now.)
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on issue #8703: Support for set in XCom serialization

2020-05-07 Thread GitBox


ashb commented on issue #8703:
URL: https://github.com/apache/airflow/issues/8703#issuecomment-625537935


   XCom used to be pickle by default (and still is on 1.10?), that'll be why it 
has gone un-noticed for so long. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8754: Add SQL query tracking for pytest

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8754:
URL: https://github.com/apache/airflow/pull/8754#discussion_r421835369



##
File path: scripts/perf/perf_kit/sqlalchemy.py
##
@@ -15,31 +15,59 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import contextlib
 import os
 import time
 import traceback
+from typing import Callable
 
 from sqlalchemy import event
 
 
-@contextlib.contextmanager
-def trace_queries(display_time=True, display_trace=True, display_sql=False, 
display_parameters=True):
+def _pretty_format_sql(text: str):
+import pygments
+
+from pygments.formatters.terminal import TerminalFormatter
+from pygments.lexers.sql import SqlLexer
+text = pygments.highlight(
+code=text, formatter=TerminalFormatter(), lexer=SqlLexer()
+).rstrip()
+return text
+
+
+class TraceQueries:
 """
-Tracking SQL queries in a code block. The result is displayed directly on 
the screen - ``print``
+Tracking SQL queries in a code block.
 
+:param display_no: If True, displays the query number.
 :param display_time: If True, displays the query execution time.
 :param display_trace: If True, displays the simplified (one-line) stack 
trace
 :param display_sql: If True, displays the SQL statements
 :param display_parameters: If True, display SQL statement parameters
-:return:
+:param print_fn: The function used to display the text. By 
default,``builtins.print``
 """
-import airflow.settings
-
-def before_cursor_execute(conn, cursor, statement, parameters, context, 
executemany):
+def __init__(
+self,
+*,

Review comment:
   Yeah, very useful. Py3 only of course.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek edited a comment on pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next

2020-05-07 Thread GitBox


turbaszek edited a comment on pull request #8652:
URL: https://github.com/apache/airflow/pull/8652#issuecomment-625532608


   @jonathanshir it seems that some of the tests are flaky or something:
   ```
   File "/opt/airflow/tests/models/test_xcom_arg.py", line 130 in 
test_xcom_pass_to_op
   ```
   In general, we are not running DAGs on CI. So I would suggest either mark 
the `TestXComArgRuntime` with system marker or remove/mock this test. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next

2020-05-07 Thread GitBox


turbaszek commented on pull request #8652:
URL: https://github.com/apache/airflow/pull/8652#issuecomment-625532608


   It seems that some of the test are flaky or something:
   ```
   File "/opt/airflow/tests/models/test_xcom_arg.py", line 130 in 
test_xcom_pass_to_op
   ```
   In general, we are not running DAGs on CI. So I would suggest either mark 
the `TestXComArgRuntime` with system marker or remove/mock this test. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on issue #8774: Move singularity out of main CI tests and into separate docker image/system test

2020-05-07 Thread GitBox


ashb commented on issue #8774:
URL: https://github.com/apache/airflow/issues/8774#issuecomment-625532382


   Wait, I've just looked at the singularity operator tests - it already seems 
to mock it, so does anyone know why we include this in our CI image? @potiuk ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb opened a new issue #8774: Move singularity out of main CI tests and into separate docker image/system test

2020-05-07 Thread GitBox


ashb opened a new issue #8774:
URL: https://github.com/apache/airflow/issues/8774


   A CI build failed because we couldn't download a release from 
https://github.com/sylabs/singularity -- which while it was a random network 
blib, there is no _need_ for the tests for the singularity operators to live in 
our CI image.
   
   We should instead create "system tests" for singularity, and ensure that the 
unit tests use mocking.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on pull request #8728: Show Deprecation warning on duplicate Task ids

2020-05-07 Thread GitBox


ashb commented on pull request #8728:
URL: https://github.com/apache/airflow/pull/8728#issuecomment-625528936


   Only cos it's related to this PR, not that it's a bug with this PR
   
   And I think I've just found a problem in our example dags caused by us using 
`!=` in master:
   
   
https://github.com/apache/airflow/blob/6e4f5fa66ebe2d8252829c67e79f895fa5029b5a/airflow/providers/google/cloud/example_dags/example_gcs.py#L129-L130



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on pull request #8718: Set store_serialized_dags from config in UI

2020-05-07 Thread GitBox


ashb commented on pull request #8718:
URL: https://github.com/apache/airflow/pull/8718#issuecomment-625526747


   No problem @anitakar, it's good to check these things, and it's often easier 
to talk in code! (And sorry for being a bit curt!)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] houqp opened a new pull request #8773: fix typing errors reported by dmypy

2020-05-07 Thread GitBox


houqp opened a new pull request #8773:
URL: https://github.com/apache/airflow/pull/8773


   fixes 2 new errors reported by dmypy.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8772: Correctly store non-default Nones in serialized tasks/dags

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8772:
URL: https://github.com/apache/airflow/pull/8772#discussion_r421826962



##
File path: airflow/serialization/serialized_objects.py
##
@@ -395,6 +411,59 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) 
-> BaseOperator:
 
 return op
 
+@classmethod
+def _is_constcutor_param(cls, attrname: str, instance: Any) -> bool:
+# Check all super classes too
+return any(
+attrname in cls.__constructor_params_for_subclass(typ)
+for typ in type(instance).mro()
+)
+
+@classmethod
+def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: 
Any) -> bool:
+"""
+Check if ``value`` is the default value for ``attrname`` as set by the
+constructor of ``instance``, or any of it's parent classes up
+to-and-including BaseOperator.
+
+.. seealso::
+
+:py:meth:`BaseSerialization._value_is_hardcoded_default`
+"""
+
+def _is_default():
+nonlocal ctor_params, attrname, value

Review comment:
   I went through a number of iterations of way of doing this, it may be 
clearer just to pass them in anyway on Py3. WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8772: Correctly store non-default Nones in serialized tasks/dags

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8772:
URL: https://github.com/apache/airflow/pull/8772#discussion_r421826729



##
File path: airflow/serialization/serialized_objects.py
##
@@ -395,6 +411,59 @@ def deserialize_operator(cls, encoded_op: Dict[str, Any]) 
-> BaseOperator:
 
 return op
 
+@classmethod
+def _is_constcutor_param(cls, attrname: str, instance: Any) -> bool:
+# Check all super classes too
+return any(
+attrname in cls.__constructor_params_for_subclass(typ)
+for typ in type(instance).mro()
+)
+
+@classmethod
+def _value_is_hardcoded_default(cls, attrname: str, value: Any, instance: 
Any) -> bool:
+"""
+Check if ``value`` is the default value for ``attrname`` as set by the
+constructor of ``instance``, or any of it's parent classes up
+to-and-including BaseOperator.
+
+.. seealso::
+
+:py:meth:`BaseSerialization._value_is_hardcoded_default`
+"""
+
+def _is_default():
+nonlocal ctor_params, attrname, value

Review comment:
   This won't work on python2, instead we'd have to do
   
   ```
   def _is_default(ctor_params, attrname, value):
   ```
   
   and pass em in at the call site





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb opened a new pull request #8772: Correctly store non-default Nones in serialized tasks/dags

2020-05-07 Thread GitBox


ashb opened a new pull request #8772:
URL: https://github.com/apache/airflow/pull/8772


   The default schedule_interval for a DAG is `@daily`, so
   `schedule_interval=None` is actually not the default, but we were not
   storing _any_ null attributes previously.
   
   This meant that upon re-inflating the DAG the schedule_interval would
   become @daily.
   
   This fixes that problem, and extends the test to look at _all_ the
   serialized attributes in our round-trip tests, rather than just the few
   that the webserver cared about.
   
   It doesn't change the serialization format, it just changes what/when
   values were stored.
   
   This solution was more complex than I hoped for, but the test case in
   test_operator_subclass_changing_base_defaults is a real one that the
   round trip tests discovered from the DatabricksSubmitRunOperator -- I
   have just captured it in this test in case that specific operator
   changes in future.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102064#comment-17102064
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

kaxil commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421825954



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   @sethvargo Thanks for letting us know. Why is it not recommended, at 
least from Airflow's perspective, our users would want to pull the secrets with 
the most recent version (hence 'latest').
   
   Can you give us some context to let us know how a user who does not know how 
many versions of a

[GitHub] [airflow] kaxil commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421825954



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   @sethvargo Thanks for letting us know. Why is it not recommended, at 
least from Airflow's perspective, our users would want to pull the secrets with 
the most recent version (hence 'latest').
   
   Can you give us some context to let us know how a user who does not know how 
many versions of a secret exists pull it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For q

[GitHub] [airflow] Acehaidrey commented on pull request #8702: Add context to execution_date_fn in ExternalTaskSensor

2020-05-07 Thread GitBox


Acehaidrey commented on pull request #8702:
URL: https://github.com/apache/airflow/pull/8702#issuecomment-625516604


   @jhtimmins @dimberman can you please comment when you have a minute?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduce. special parameter syntax that lets you reference a 
specific version. 
   
   But I will explain the small context of how this change works so that you 
can make better design decisions. In previous versions of Airflow, all data was 
stored in a database (th

[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102054#comment-17102054
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduc

[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102055#comment-17102055
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduc

[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102053#comment-17102053
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduc

[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduce. special parameter syntax that lets you reference a 
specific version. 
   
   But I will explain the small context of how this change works so that you 
can make better design decisions. In previous versions of Airflow, all data was 
stored in a database (th

[GitHub] [airflow] mik-laj commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


mik-laj commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421816660



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hello. I'm Kamil from Polidea. We are a Google partner and we are 
building many integrations for Google services at the request of [Cloud 
Composer](https://cloud.google.com/composer).
   
   We have an abstraction layer that does not provide for this possibility, but 
I think we can introduce. special parameter syntax that lets you reference a 
specific version. 
   
   But I will explain the small context of how this change works so that you 
can make better design decisions. In previous versions of Airflow, all data was 
stored in a database (th

[GitHub] [airflow] Acehaidrey commented on pull request #8680: Add metric for start/end task run

2020-05-07 Thread GitBox


Acehaidrey commented on pull request #8680:
URL: https://github.com/apache/airflow/pull/8680#issuecomment-625514762


   @jhtimmins when you get a chance



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] houqp opened a new pull request #8771: add metric for monitoring email notification failure

2020-05-07 Thread GitBox


houqp opened a new pull request #8771:
URL: https://github.com/apache/airflow/pull/8771


   This should help catch unexpected email notification failures. We need to 
monitor the monitoring system :)
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7104) Add Secret backend for GCP Secrets Manager

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102034#comment-17102034
 ] 

ASF GitHub Bot commented on AIRFLOW-7104:
-

sethvargo commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421795771



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hey there. I'm Seth from the Secret Manager team. Thanks for building 
this integration. Feel free to reach out if you have future questions.
   
   We strongly discourage users from pinning to "latest" in production 
workloads. Users should specifically pin to a version (e.g. 1, 2, 3). Is t

[GitHub] [airflow] sethvargo commented on a change in pull request #7795: [AIRFLOW-7104] Add Secret backend for GCP Secrets Manager

2020-05-07 Thread GitBox


sethvargo commented on a change in pull request #7795:
URL: https://github.com/apache/airflow/pull/7795#discussion_r421795771



##
File path: airflow/providers/google/cloud/secrets/secrets_manager.py
##
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Objects relating to sourcing connections from GCP Secrets Manager
+"""
+from typing import List, Optional
+
+from cached_property import cached_property
+from google.api_core.exceptions import NotFound
+from google.api_core.gapic_v1.client_info import ClientInfo
+from google.cloud.secretmanager_v1 import SecretManagerServiceClient
+
+from airflow import version
+from airflow.models import Connection
+from airflow.providers.google.cloud.utils.credentials_provider import (
+_get_scopes, get_credentials_and_project_id,
+)
+from airflow.secrets import BaseSecretsBackend
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class CloudSecretsManagerSecretsBackend(BaseSecretsBackend, LoggingMixin):
+"""
+Retrieves Connection object from GCP Secrets Manager
+
+Configurable via ``airflow.cfg`` as follows:
+
+.. code-block:: ini
+
+[secrets]
+backend = 
airflow.providers.google.cloud.secrets.secrets_manager.CloudSecretsManagerSecretsBackend
+backend_kwargs = {"connections_prefix": "airflow/connections"}
+
+For example, if secret id is ``airflow/connections/smtp_default``, this 
would be accessible
+if you provide ``{"connections_prefix": "airflow/connections"}`` and 
request conn_id ``smtp_default``.
+
+:param connections_prefix: Specifies the prefix of the secret to read to 
get Connections.
+:type connections_prefix: str
+:param gcp_key_path: Path to GCP Credential JSON file;
+use default credentials in the current environment if not provided.
+:type gcp_key_path: str
+:param gcp_scopes: Comma-separated string containing GCP scopes
+:type gcp_scopes: str
+"""
+def __init__(
+self,
+connections_prefix: str = "airflow/connections",
+gcp_key_path: Optional[str] = None,
+gcp_scopes: Optional[str] = None,
+**kwargs
+):
+self.connections_prefix = connections_prefix.rstrip("/")
+self.gcp_key_path = gcp_key_path
+self.gcp_scopes = gcp_scopes
+self.credentials: Optional[str] = None
+self.project_id: Optional[str] = None
+super().__init__(**kwargs)
+
+@cached_property
+def client(self) -> SecretManagerServiceClient:
+"""
+Create an authenticated KMS client
+"""
+scopes = _get_scopes(self.gcp_scopes)
+self.credentials, self.project_id = get_credentials_and_project_id(
+key_path=self.gcp_key_path,
+scopes=scopes
+)
+_client = SecretManagerServiceClient(
+credentials=self.credentials,
+client_info=ClientInfo(client_library_version='airflow_v' + 
version.version)
+)
+return _client
+
+def build_secret_id(self, conn_id: str) -> str:
+"""
+Given conn_id, build path for Secrets Manager
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = f"{self.connections_prefix}/{conn_id}"
+return secret_id
+
+def get_conn_uri(self, conn_id: str) -> Optional[str]:
+"""
+Get secret value from Secrets Manager.
+
+:param conn_id: connection id
+:type conn_id: str
+"""
+secret_id = self.build_secret_id(conn_id=conn_id)
+# always return the latest version of the secret
+secret_version = "latest"

Review comment:
   Hey there. I'm Seth from the Secret Manager team. Thanks for building 
this integration. Feel free to reach out if you have future questions.
   
   We strongly discourage users from pinning to "latest" in production 
workloads. Users should specifically pin to a version (e.g. 1, 2, 3). Is there 
a way we can update this to allow the user to specify a version id?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use t

[GitHub] [airflow] casassg commented on a change in pull request #8652: [AIP-31] Implement XComArg model to functionally pass output from one operator to the next

2020-05-07 Thread GitBox


casassg commented on a change in pull request #8652:
URL: https://github.com/apache/airflow/pull/8652#discussion_r421782519



##
File path: airflow/models/xcom_arg.py
##
@@ -0,0 +1,147 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any, Dict, List, Union
+
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator  # pylint: disable=R0401
+from airflow.models.xcom import XCOM_RETURN_KEY
+
+
+class XComArg:
+"""
+Class that represents a XCom push from a previous operator.
+Defaults to "return_value" as only key.
+
+Current implementations supports
+xcomarg >> op
+xcomarg << op
+op >> xcomarg   (by BaseOperator code)
+op << xcomarg   (by BaseOperator code)
+
+**Example**: The moment you get a result from any operator (functional or 
regular) you can ::
+
+any_op = AnyOperator()
+xcomarg = XComArg(any_op) OR xcomarg = any_op.output
+my_op = MyOperator()
+my_op >> xcomarg
+
+This object can be used in legacy Operators via Jinja.
+
+**Example**: You can make this result to be part of any generated string ::
+
+any_op = AnyOperator()
+xcomarg = any_op.output
+op1 = MyOperator(my_text_message=f"the value is {xcomarg}")
+op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}")
+
+:param operator: operator to which the XComArg belongs to
+:type operator: airflow.models.baseoperator.BaseOperator
+:param key: key value which is used for xcom_pull (key in the XCom table)
+:type key: str
+"""
+
+def __init__(self, operator: BaseOperator, key: str = XCOM_RETURN_KEY):
+self._operator = operator
+self._key = key
+
+def __eq__(self, other):
+return (self.operator == other.operator
+and self.key == other.key)
+
+def __lshift__(self, other):
+"""
+Implements xcomresult << op
+"""
+self.set_upstream(other)
+return self
+
+def __rshift__(self, other):
+"""
+Implements xcomresult >> op
+"""
+self.set_downstream(other)
+return self
+
+def __getitem__(self, item):
+"""
+Implements xcomresult['some_result_key']
+"""
+return XComArg(operator=self.operator, key=item)
+
+def __str__(self):
+"""
+Backward compatibility for old-style jinja used in Airflow Operators
+
+**Example**: to use XArg at BashOperator::
+
+BashOperator(cmd=f"... { xcomarg } ...")
+
+:return:
+"""
+xcom_pull_kwargs = [f"task_ids='{self.operator.task_id}'",
+f"dag_id='{self.operator.dag.dag_id}'",
+]
+if self.key is not None:
+xcom_pull_kwargs.append(f"key='{self.key}'")

Review comment:
   sgtm. Maybe we should still print that key value is None. But not 
blocking for me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on issue #8760: Webserver becomes unusable after 100,000 tasks were completed

2020-05-07 Thread GitBox


kaxil commented on issue #8760:
URL: https://github.com/apache/airflow/issues/8760#issuecomment-625474431


   Is there a particular endpoint that is slow in the Webserver ? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8754: Add SQL query tracking for pytest

2020-05-07 Thread GitBox


mik-laj commented on pull request #8754:
URL: https://github.com/apache/airflow/pull/8754#issuecomment-625468574


   @kaxil @ashb  and our pytest expert - @turbaszek  Can I ask for look? All 
non-quarantined check are green.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8754: Add SQL query tracking for pytest

2020-05-07 Thread GitBox


mik-laj commented on a change in pull request #8754:
URL: https://github.com/apache/airflow/pull/8754#discussion_r421760872



##
File path: TESTING.rst
##
@@ -902,6 +902,36 @@ You should also consider running it with ``restart`` 
command when you change the
 This will clean-up the database so that you start with a clean DB and not DB 
installed in a previous version.
 So typically you'd run it like ``breeze --install-airflow-version=1.10.9 
restart``.
 
+Tracking SQL statements
+===
+
+You can run tests with SQL statements tracking. To do this, use the 
``--trace-sql`` option and pass the
+columns to be displayed as an argument. Each query will be displayed on a 
separate line.
+Supported values:
+
+* ``no`` -  displays the query number;
+* ``time`` - displays the query execution time;
+* ``trace`` - displays the simplified (one-line) stack trace;
+* ``sql`` - displays the SQL statements;
+* ``parameters`` - display SQL statement parameters.
+
+If you only provide ``no``, then only the final number of queries in the test 
will be displayed.
+
+By default, pytest does not display output for successful tests, if you still 
want to see them, you must
+pass the ``--capture=no`` option.
+
+If you run the following command:
+
+.. code-block:: bash
+
+pytest --debug-sql=no,sql,parameters --capture=no \

Review comment:
   Changed. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8754: Add SQL query tracking for pytest

2020-05-07 Thread GitBox


mik-laj commented on a change in pull request #8754:
URL: https://github.com/apache/airflow/pull/8754#discussion_r421760814



##
File path: scripts/perf/perf_kit/sqlalchemy.py
##
@@ -52,28 +80,40 @@ def after_cursor_execute(conn, cursor, statement, 
parameters, context, executema
 conn.info.setdefault("query_start_time", []).append(time.monotonic())
 
 output_parts = []
-if display_time:
+if self.display_no:
+output_parts.append(f"{self.query_count:>3}")
+
+if self.display_time:
 output_parts.append(f"{total:.5f}")
 
-if display_trace:
+if self.display_time:
+output_parts.append(f"{total:.5f}")
+
+if self.display_trace:
 output_parts.extend([f"{file_name}", f"{stack_info}"])
 
-if display_sql:
+if self.display_sql:
 sql_oneline = statement.replace("\n", " ")
-output_parts.append(f"{sql_oneline}")
+output_parts.append(f"{_pretty_format_sql(sql_oneline)}")
 
-if display_parameters:
+if self.display_parameters :

Review comment:
   Fixed. Thanks. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-249) Refactor the SLA mechanism

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101971#comment-17101971
 ] 

ASF GitHub Bot commented on AIRFLOW-249:


houqp commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-625434121


   > My conclusion is that option 1 is a better trade-off, because one has to 
go through all TIs in a DagRun to determine if a DR can be free from further 
checking (e.g., if a DR has 10 TIs, then each TI has to checked for all 
possible SLA violations before the DR is sla_checked). This is not a cheap 
operation since a single TI could have 3 SLAs, hence the additional computation 
and IO could easily outweigh the benefit of filtering out sla_checked DRs.
   
   Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns 
that need to be checked than the preset limit, some of them will be ignored?
   
   With regards to performance comparison between option 1 and option 2, aren't 
we already checking all the TIs for the 100 fetched dag runs in option 1?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor the SLA mechanism
> --
>
> Key: AIRFLOW-249
> URL: https://issues.apache.org/jira/browse/AIRFLOW-249
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: dud
>Priority: Major
>
> Hello
> I've noticed the SLA feature is currently behaving as follow :
> - it doesn't work on DAG scheduled @once or None because they have no 
> dag.followwing_schedule property
> - it keeps endlessly checking for SLA misses without ever worrying about any 
> end_date. Worse I noticed that emails are still being sent for runs that are 
> never happening because of end_date
> - it keeps checking for recent TIs even if SLA notification has been already 
> been sent for them
> - the SLA logic is only being fired after following_schedule + sla has 
> elapsed, in other words one has to wait for the next TI before having a 
> chance of getting any email. Also the email reports dag.following_schedule 
> time (I guess because it is close of TI.start_date), but unfortunately that 
> doesn't match what the task instances shows nor the log filename
> - the SLA logic is based on max(TI.execution_date) for the starting point of 
> its checks, that means that for a DAG whose SLA is longer than its schedule 
> period if half of the TIs are running longer than expected it will go 
> unnoticed. This could be demonstrated with a DAG like this one :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 6, 16, 12, 20),
> 'email': my_email
> 'sla': timedelta(minutes=2),
> }
> dag = DAG('unnoticed_sla', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
> minute = kwargs['execution_date'].strftime("%M")
> is_odd = int(minute) % 2
> if is_odd:
> sleep(300)
> else:
> sleep(10)
> return True
> PythonOperator(
> task_id='sla_miss',
> python_callable=alternating_sleep,
> provide_context=True,
> dag=dag)
> {code}
> I've tried to rework the SLA triggering mechanism by addressing the above 
> points., please [have a look on 
> it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
> I made some tests with this patch :
> - the fluctuent DAG shown above no longer make Airflow skip any SLA event :
> {code}
>  task_id  |dag_id |   execution_date| email_sent | 
> timestamp  | description | notification_sent 
> --+---+-+++-+---
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t  | 2016-06-16 
> 15:08:26.058631 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t  | 2016-06-16 
> 15:10:06.093253 | | t
>  sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t  | 2016-06-16 
> 15:12:06.241773 | | t
> {code}
> - on a normal DAG, the SLA is being triggred more quickly :
> {code}
> // start_date = 2016-06-16 15:55:00
> // end_date = 2016-06-16 16:00:00
> // schedule_interval =  timedelta(minutes=1)
> // sla = timedelta(minutes=2)
>  task_id  |dag_id |   execution_date| email_sent | 
> timestamp  | description | notification_sent 
> --

[GitHub] [airflow] houqp commented on pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

2020-05-07 Thread GitBox


houqp commented on pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#issuecomment-625434121


   > My conclusion is that option 1 is a better trade-off, because one has to 
go through all TIs in a DagRun to determine if a DR can be free from further 
checking (e.g., if a DR has 10 TIs, then each TI has to checked for all 
possible SLA violations before the DR is sla_checked). This is not a cheap 
operation since a single TI could have 3 SLAs, hence the additional computation 
and IO could easily outweigh the benefit of filtering out sla_checked DRs.
   
   Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns 
that need to be checked than the preset limit, some of them will be ignored?
   
   With regards to performance comparison between option 1 and option 2, aren't 
we already checking all the TIs for the 100 fetched dag runs in option 1?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] anitakar commented on pull request #8718: Set store_serialized_dags from config in UI

2020-05-07 Thread GitBox


anitakar commented on pull request #8718:
URL: https://github.com/apache/airflow/pull/8718#issuecomment-625428708


   Closing the issue as only difference seems to be calling set_paused and its 
implementation has changed.
   Sorry for taking your time without checking everything first.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-249) Refactor the SLA mechanism

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101952#comment-17101952
 ] 

ASF GitHub Bot commented on AIRFLOW-249:


houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r421708159



##
File path: airflow/models/baseoperator.py
##
@@ -392,10 +442,79 @@ def __init__(
% (self.task_id, dag.dag_id))
 self.sla = sla
 self.execution_timeout = execution_timeout
+
+# Warn about use of the deprecated SLA parameter
+if sla and expected_finish:
+warnings.warn(
+"Both sla and expected_finish provided as task "
+"parameters to {}; using expected_finish and ignoring "
+"deprecated sla parameter.".format(self),
+category=PendingDeprecationWarning
+)
+elif sla:
+warnings.warn(
+"sla is deprecated as a task parameter for {}; converting to "
+"expected_finish instead.".format(self),
+category=PendingDeprecationWarning
+)
+expected_finish = sla
+
+# Set SLA parameters, batching invalid type messages into a
+# single exception.
+sla_param_errs: List = []
+if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
   it should be enforced in the CI pipeline at build time. If you tested 
mypy and it's not complaining about it, then it's because mypy is still an 
evolving project, so it can't catch all the type errors yet, but it's safe to 
assume that it will eventually catch up.
   
   i wouldn't worry too much about these edge-cases to be honest. we are not 
doing runtime type check anywhere else where type hint is defined in the code 
base, so it's a little bit odd to leave this one as a special case. On top of 
that, this runtime type check will result in an exception. The end result is 
the same as without this check because they will all lead to unrecoverable 
crashes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor the SLA mechanism
> --
>
> Key: AIRFLOW-249
> URL: https://issues.apache.org/jira/browse/AIRFLOW-249
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: dud
>Priority: Major
>
> Hello
> I've noticed the SLA feature is currently behaving as follow :
> - it doesn't work on DAG scheduled @once or None because they have no 
> dag.followwing_schedule property
> - it keeps endlessly checking for SLA misses without ever worrying about any 
> end_date. Worse I noticed that emails are still being sent for runs that are 
> never happening because of end_date
> - it keeps checking for recent TIs even if SLA notification has been already 
> been sent for them
> - the SLA logic is only being fired after following_schedule + sla has 
> elapsed, in other words one has to wait for the next TI before having a 
> chance of getting any email. Also the email reports dag.following_schedule 
> time (I guess because it is close of TI.start_date), but unfortunately that 
> doesn't match what the task instances shows nor the log filename
> - the SLA logic is based on max(TI.execution_date) for the starting point of 
> its checks, that means that for a DAG whose SLA is longer than its schedule 
> period if half of the TIs are running longer than expected it will go 
> unnoticed. This could be demonstrated with a DAG like this one :
> {code}
> from airflow import DAG
> from airflow.operators import *
> from datetime import datetime, timedelta
> from time import sleep
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'start_date': datetime(2016, 6, 16, 12, 20),
> 'email': my_email
> 'sla': timedelta(minutes=2),
> }
> dag = DAG('unnoticed_sla', default_args=default_args, 
> schedule_interval=timedelta(minutes=1))
> def alternating_sleep(**kwargs):
> minute = kwargs['execution_date'].strftime("%M")
> is_odd = int(minute) % 2
> if is_odd:
> sleep(300)
> else:
> sleep(10)
> return True
> PythonOperator(
> task_id='sla_miss',
> python_callable=alternating_sleep,
> provide_context=True,
> dag=dag)
> {code}
> I've tried to rework the SLA triggering mechanism by addressing the above 
> points., please [have a look on 
> it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d]
> I made some tests with this

[GitHub] [airflow] houqp commented on a change in pull request #8545: [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 )

2020-05-07 Thread GitBox


houqp commented on a change in pull request #8545:
URL: https://github.com/apache/airflow/pull/8545#discussion_r421708159



##
File path: airflow/models/baseoperator.py
##
@@ -392,10 +442,79 @@ def __init__(
% (self.task_id, dag.dag_id))
 self.sla = sla
 self.execution_timeout = execution_timeout
+
+# Warn about use of the deprecated SLA parameter
+if sla and expected_finish:
+warnings.warn(
+"Both sla and expected_finish provided as task "
+"parameters to {}; using expected_finish and ignoring "
+"deprecated sla parameter.".format(self),
+category=PendingDeprecationWarning
+)
+elif sla:
+warnings.warn(
+"sla is deprecated as a task parameter for {}; converting to "
+"expected_finish instead.".format(self),
+category=PendingDeprecationWarning
+)
+expected_finish = sla
+
+# Set SLA parameters, batching invalid type messages into a
+# single exception.
+sla_param_errs: List = []
+if expected_duration and not isinstance(expected_duration, timedelta):

Review comment:
   it should be enforced in the CI pipeline at build time. If you tested 
mypy and it's not complaining about it, then it's because mypy is still an 
evolving project, so it can't catch all the type errors yet, but it's safe to 
assume that it will eventually catch up.
   
   i wouldn't worry too much about these edge-cases to be honest. we are not 
doing runtime type check anywhere else where type hint is defined in the code 
base, so it's a little bit odd to leave this one as a special case. On top of 
that, this runtime type check will result in an exception. The end result is 
the same as without this check because they will all lead to unrecoverable 
crashes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #8770: Airflow 1.10.7 logs from S3 won't load, just hanging

2020-05-07 Thread GitBox


boring-cyborg[bot] commented on issue #8770:
URL: https://github.com/apache/airflow/issues/8770#issuecomment-625422513


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aarrtteemmuuss opened a new issue #8770: Airflow 1.10.7 logs from S3 won't load, just hanging

2020-05-07 Thread GitBox


aarrtteemmuuss opened a new issue #8770:
URL: https://github.com/apache/airflow/issues/8770


   **Apache Airflow version**: 1.10.7
   
   **Environment**: python 3.7, running locally
   
   **What happened**:
   
   I have following configuration in airflow.cfg: 
   
   `remote_logging = True`
   `remote_log_conn_id = "S3Connection"`
   `remote_base_log_folder = s3://bucket/logs`
   `encrypt_s3_logs = False`
   
   I have setup connection in UI with type S3 and following settings: 
   `{"aws_access_key_id":"xxx", "aws_secret_access_key": "xxx"}`
   
   Executor is LocalExecutor. Scheduler is able to write logs to S3, but when I 
open UI to look for task's logs, I can see its just hanging and nothing is 
happening. The spin spins forever and I can't even see that is the error. 
   
   
`http://127.0.0.1:8080/admin/airflow/get_logs_with_metadata?dag_id=example_bash_operator&task_id=run_after_loop&execution_date=2020-05-07T18%3A08%3A51.232255%2B00%3A00&try_number=1&metadata=null`
 - returns Empty Response Error and nothing works.
   
   Am I doing something wrong? Because S3 configuration is not good documented 
and I see bunch of reports that it is not working. 
   
   **What you expected to happen**:
   
   I expected logs to be pulled from S3 and shown in UI admin.
   
   **How to reproduce it**:
   
   Install airflow 1.10.7 locally and run any example dag with remote settings 
enables for S3 bucket. 
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] Sinsin1367 commented on pull request #8734: Added optional logging for pod container statuses once a pod fails. T…

2020-05-07 Thread GitBox


Sinsin1367 commented on pull request #8734:
URL: https://github.com/apache/airflow/pull/8734#issuecomment-625412537


   @ashb  @chrismclennon I would appreciate if you guys take a look and share 
your thoughts.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] anitakar opened a new pull request #8769: Set store_serialized_dags from config instead of defaulting it to false

2020-05-07 Thread GitBox


anitakar opened a new pull request #8769:
URL: https://github.com/apache/airflow/pull/8769


   Fix running dag from non-RBAC (old, deprecated) UI



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] anitakar commented on a change in pull request #8764: Store dags cleanup 1 10

2020-05-07 Thread GitBox


anitakar commented on a change in pull request #8764:
URL: https://github.com/apache/airflow/pull/8764#discussion_r421691736



##
File path: airflow/www_rbac/views.py
##
@@ -564,7 +568,8 @@ def dag_details(self, session=None):
 dag_id = request.args.get('dag_id')
 dag_orm = DagModel.get_dagmodel(dag_id, session=session)
 # FIXME: items needed for this view should move to the database
-dag = dag_orm.get_dag(STORE_SERIALIZED_DAGS)
+dag = dag_orm.get_dag(
+conf.getboolean('core', 'store_serialized_dags', fallback=False))

Review comment:
   Sure. Airflow does not support dynamic settings. And even if it did, it 
should not be one of them.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] anitakar commented on a change in pull request #8764: Store dags cleanup 1 10

2020-05-07 Thread GitBox


anitakar commented on a change in pull request #8764:
URL: https://github.com/apache/airflow/pull/8764#discussion_r421691284



##
File path: airflow/www/views.py
##
@@ -94,7 +94,16 @@
 
 UTF8_READER = codecs.getreader('utf-8')
 
-dagbag = models.DagBag(settings.DAGS_FOLDER, 
store_serialized_dags=STORE_SERIALIZED_DAGS)
+try:
+async_dagbag_loader = conf.getboolean('webserver', 'async_dagbag_loader')

Review comment:
   Sorry for that. I thought it got lost or was removed. It is not 
necessary with dag serialization and definitely such big functionalities should 
be first discussed via. AIP





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101913#comment-17101913
 ] 

ASF GitHub Bot commented on AIRFLOW-2310:
-

paulsyl commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625403415


   > @paulsyl could you possibly share your local Glue implementation (custom 
operator, hook, etc) as a Gist or otherwise that we could run in the meantime? 
If so I'd be happy to test it out and bring some of your improvements into this 
PR as well.
   
   Sure, here you go.  I don't claim these to be perfectly aligned to airflow 
principles, so please use them and please add to this PR once you've had chance 
to correct my errors, I would be more than happy to replace my own with 
contributions from the wider community.   
   
   Thanks to the excellent work by @abdulbasitds that enabled me to get this 
far with Airflow.
   
   [Glue Hook](https://gist.github.com/paulsyl/34ee457518689d120d64173f44f89b45)
   
   [Glue 
Operator](https://gist.github.com/paulsyl/bbed05dc0cc6146c36cc6b26ba9bd886)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable AWS Glue Job Integration
> ---
>
> Key: AIRFLOW-2310
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2310
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Olalekan Elesin
>Assignee: Olalekan Elesin
>Priority: Major
>  Labels: AWS
>
> Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs 
> and ETL pipelines can be orchestrated with Airflow



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] paulsyl commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-07 Thread GitBox


paulsyl commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625403415


   > @paulsyl could you possibly share your local Glue implementation (custom 
operator, hook, etc) as a Gist or otherwise that we could run in the meantime? 
If so I'd be happy to test it out and bring some of your improvements into this 
PR as well.
   
   Sure, here you go.  I don't claim these to be perfectly aligned to airflow 
principles, so please use them and please add to this PR once you've had chance 
to correct my errors, I would be more than happy to replace my own with 
contributions from the wider community.   
   
   Thanks to the excellent work by @abdulbasitds that enabled me to get this 
far with Airflow.
   
   [Glue Hook](https://gist.github.com/paulsyl/34ee457518689d120d64173f44f89b45)
   
   [Glue 
Operator](https://gist.github.com/paulsyl/bbed05dc0cc6146c36cc6b26ba9bd886)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] XD-DENG commented on pull request #8742: Avoid color info in response of /dag_stats & /task_stats

2020-05-07 Thread GitBox


XD-DENG commented on pull request #8742:
URL: https://github.com/apache/airflow/pull/8742#issuecomment-625392001


   Rebased to the latest master



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2310) Enable AWS Glue Job Integration

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101899#comment-17101899
 ] 

ASF GitHub Bot commented on AIRFLOW-2310:
-

pdeardorff-r7 commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625389437


   @paulsyl could you possibly share your local Glue implementation (custom 
operator, hook, etc) as a Gist or otherwise that we could run in the meantime?  
If so I'd be happy to test it out and bring some of your improvements into this 
PR as well.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable AWS Glue Job Integration
> ---
>
> Key: AIRFLOW-2310
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2310
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: contrib
>Reporter: Olalekan Elesin
>Assignee: Olalekan Elesin
>Priority: Major
>  Labels: AWS
>
> Would it be possible to integrate AWS Glue into Airflow, such that Glue jobs 
> and ETL pipelines can be orchestrated with Airflow



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] pdeardorff-r7 commented on pull request #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-05-07 Thread GitBox


pdeardorff-r7 commented on pull request #6007:
URL: https://github.com/apache/airflow/pull/6007#issuecomment-625389437


   @paulsyl could you possibly share your local Glue implementation (custom 
operator, hook, etc) as a Gist or otherwise that we could run in the meantime?  
If so I'd be happy to test it out and bring some of your improvements into this 
PR as well.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch v1-10-test updated: Show Deprecation warning on duplicate Task ids (#8728)

2020-05-07 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
 new 1ef5f13  Show Deprecation warning on duplicate Task ids (#8728)
1ef5f13 is described below

commit 1ef5f13377802eec91179d62630426b10b60f1ba
Author: Kaxil Naik 
AuthorDate: Thu May 7 18:18:52 2020 +0100

Show Deprecation warning on duplicate Task ids (#8728)
---
 airflow/models/baseoperator.py |  3 ++-
 airflow/models/dag.py  |  2 +-
 tests/models/test_dag.py   | 40 
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 828fdb1..0f76770 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -546,7 +546,8 @@ class BaseOperator(LoggingMixin):
 "The DAG assigned to {} can not be changed.".format(self))
 elif self.task_id not in dag.task_dict:
 dag.add_task(self)
-
+elif self.task_id in dag.task_dict and dag.task_dict[self.task_id] is 
not self:
+dag.add_task(self)
 self._dag = dag
 
 def has_dag(self):
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index c6b9171..4d7eef8 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -1329,7 +1329,7 @@ class DAG(BaseDag, LoggingMixin):
 elif task.end_date and self.end_date:
 task.end_date = min(task.end_date, self.end_date)
 
-if task.task_id in self.task_dict:
+if task.task_id in self.task_dict and self.task_dict[task.task_id] is 
not task:
 # TODO: raise an error in Airflow 2.0
 warnings.warn(
 'The requested task could not be added to the DAG because a '
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 580a3c7..cdbe1ee 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -25,6 +25,7 @@ import re
 import unittest
 from tempfile import NamedTemporaryFile
 
+import pytest
 from parameterized import parameterized
 from tests.compat import mock
 
@@ -36,6 +37,7 @@ from airflow import models, settings
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, AirflowDagCycleException
 from airflow.models import DAG, DagModel, TaskInstance as TI
+from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
 from airflow.utils import timezone
@@ -901,3 +903,41 @@ class DagTest(unittest.TestCase):
 
 self.assertEqual(dag.normalized_schedule_interval, 
expected_n_schedule_interval)
 self.assertEqual(dag.schedule_interval, schedule_interval)
+
+def test_duplicate_task_ids_raise_warning_with_dag_context_manager(self):
+"""Verify tasks with Duplicate task_id show warning"""
+
+deprecation_msg = "The requested task could not be added to the DAG 
because a task with " \
+  "task_id t1 is already in the DAG. Starting in 
Airflow 2.0, trying to " \
+  "overwrite a task will raise an exception."
+
+with pytest.warns(PendingDeprecationWarning) as record:
+with DAG("test_dag", start_date=DEFAULT_DATE) as dag:
+t1 = DummyOperator(task_id="t1")
+t2 = BashOperator(task_id="t1", bash_command="sleep 1")
+t1 >> t2
+
+warning = record[0]
+assert str(warning.message) == deprecation_msg
+assert issubclass(PendingDeprecationWarning, warning.category)
+
+self.assertEqual(dag.task_dict, {t1.task_id: t1})
+
+def test_duplicate_task_ids_raise_warning(self):
+"""Verify tasks with Duplicate task_id show warning"""
+
+deprecation_msg = "The requested task could not be added to the DAG 
because a task with " \
+  "task_id t1 is already in the DAG. Starting in 
Airflow 2.0, trying to " \
+  "overwrite a task will raise an exception."
+
+with pytest.warns(PendingDeprecationWarning) as record:
+dag = DAG("test_dag", start_date=DEFAULT_DATE)
+t1 = DummyOperator(task_id="t1", dag=dag)
+t2 = BashOperator(task_id="t1", bash_command="sleep 1", dag=dag)
+t1 >> t2
+
+warning = record[0]
+assert str(warning.message) == deprecation_msg
+assert issubclass(PendingDeprecationWarning, warning.category)
+
+self.assertEqual(dag.task_dict, {t1.task_id: t1})



[GitHub] [airflow] mik-laj opened a new pull request #8768: [POC] Mark keywords-only arguments in method signatures

2020-05-07 Thread GitBox


mik-laj opened a new pull request #8768:
URL: https://github.com/apache/airflow/pull/8768


   Hello,
   
   I am not sure if this feature is supported by all our tools, especially 
flake8, so I run the CI task to check it.
   
   Best regards,
   Kamil
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ipeluffo edited a comment on issue #8311: Task Duration Missing from the Graph View Tool-tip for Running Tasks

2020-05-07 Thread GitBox


ipeluffo edited a comment on issue #8311:
URL: https://github.com/apache/airflow/issues/8311#issuecomment-625382511


   I noticed this issue also in (some) tasks on finished DAGs. Has anyone else 
noticed this?
   ![Screenshot 2020-05-07 at 18 10 
31](https://user-images.githubusercontent.com/889705/81324130-69065580-908e-11ea-8f74-d424b3709eb1.png)
   ![Screenshot 2020-05-07 at 18 09 
54](https://user-images.githubusercontent.com/889705/81324138-6b68af80-908e-11ea-88a7-ca28aa7ba643.png)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ipeluffo commented on issue #8311: Task Duration Missing from the Graph View Tool-tip for Running Tasks

2020-05-07 Thread GitBox


ipeluffo commented on issue #8311:
URL: https://github.com/apache/airflow/issues/8311#issuecomment-625382511


   I noticed this issue also in (some) tasks on finished DAGs. Has anyone else 
noticed this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] anitakar commented on pull request #8718: Set store_serialized_dags from config in UI

2020-05-07 Thread GitBox


anitakar commented on pull request #8718:
URL: https://github.com/apache/airflow/pull/8718#issuecomment-625364951


   > > Hi,
   > > I will collectively answer the questions here.
   > > Many of the changes that I have made have been a result of setting 
store_serialized_dags from conf in DagBag init method in one of squashed 
together commits.
   > > Before I decided that it is better to leave it with False default I 
started to change it to False in all places where it broke things.
   > > And it broke things in CLI, dag parsing code and this migration: 
airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
   > 
   > What error exactly? And is that error reproducible in Airflow 1.10.10, or 
is it only a problem in the Cloud Composer version?
   > 
   No, it is not a problem in Cloud Composer. At some point I made the change 
in DagBag to read store_serialized_dags instead of setting it to false but it 
was intermediate state. It was never released like that without a fix.
   > > This change is in fact noop so maybe we should drop it.
   > > It actually is about not reading core.store_serialized_dags on server 
startup. I had also problems with tests because of that. But if that operation 
is expensive and people will not probably be turning dag serialization on and 
off, then maybe it is better to drop it.
   > 
   > Airflow does not support changing config at runtime -- it is not possible. 
It is only the tests that are able to change settings.
   > 
   Then I shall drop the change of removing STORE_SERIALIZED_DAGS from settings.
   But the change is not a no-op in the end. It had different code for 
accessing dag code in UI that you fixed later on.
   Composer for a moment in Airflow 1.10.6 used storing, deleting and saving 
one by one but community version uses batch operations that I saw you had to 
fix.
   But there is an actual fix for non-RBAC www for setting paused on DAG. I 
shall create a test for it.
   I can also remember that we had problems with calling run from non-RBAC UI 
but I am sure if it was true for open sourced version somebody would have 
noticed it by now.
   > > Here is a similar change to Airflow 1.10.10:
   > > #8764
   > > This one actually fixes starting dag from UI.
   > > In this change I can also see dag async dag bag loading was dropped 
altogether in Airflow 1.10.10. Am I correct?
   > 
   > No, async dag bag loading was _never in_ Airflow. Either dag serialization 
is enabled, in which case the webserver loads it form the serialized tables in 
the DB, or it's not and the existing mechnaism is in place. Having a _third_ 
method of loading dags when dag serialization exists and fixes the performance 
overhead seems like extra code to maintain for little benefit.
   > 
   > Also: there is a reasonable chance that in Airflow 2.0 serializing dags 
may be the _only_ option.
   Oh, so it was Composer internal feature for clients with lots of DAGs. 
Anyways, our intention was to always contribute back to community, so I guess 
it is not a problem. But as there is serialized dags solution fully working 
this workaround can be dropped.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on a change in pull request #8746: Remove old airflow logger causing side effects in tests

2020-05-07 Thread GitBox


kaxil commented on a change in pull request #8746:
URL: https://github.com/apache/airflow/pull/8746#discussion_r421625121



##
File path: tests/test_logging_config.py
##
@@ -97,6 +93,33 @@
 SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
 
 
+def reset_logging():
+"""Reset Logging"""
+manager = logging.root.manager
+manager.disabled = logging.NOTSET
+airflow_loggers = [
+logger for logger_name, logger in manager.loggerDict.items() if 
logger_name.startswith('airflow')
+]
+for logger in airflow_loggers:  # pylint: disable=too-many-nested-blocks
+if isinstance(logger, logging.Logger):

Review comment:
   We already do that in `settings_context` but it does seem to clear/reset 
everything:
   
   
https://github.com/apache/airflow/blob/7285e61c42c89799e967ce9f0d391707ee02664d/tests/test_logging_config.py#L186-L187





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb commented on a change in pull request #8746: Remove old airflow logger causing side effects in tests

2020-05-07 Thread GitBox


ashb commented on a change in pull request #8746:
URL: https://github.com/apache/airflow/pull/8746#discussion_r421621414



##
File path: tests/test_logging_config.py
##
@@ -97,6 +93,33 @@
 SETTINGS_DEFAULT_NAME = 'custom_airflow_local_settings'
 
 
+def reset_logging():
+"""Reset Logging"""
+manager = logging.root.manager
+manager.disabled = logging.NOTSET
+airflow_loggers = [
+logger for logger_name, logger in manager.loggerDict.items() if 
logger_name.startswith('airflow')
+]
+for logger in airflow_loggers:  # pylint: disable=too-many-nested-blocks
+if isinstance(logger, logging.Logger):

Review comment:
   could we not just reload the logging config?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-07 Thread GitBox


turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421604020



##
File path: docs/howto/use-additional-execute-contextmanager.rst
##
@@ -0,0 +1,47 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+
+Defining Additional Execute Context Manager
+===
+
+Creating new context manager
+
+
+Users can create their own execution context manager to allow context 
management on a higher level.
+To do so, one must define a new context manager in one of their files:
+
+.. code-block:: python
+
+  import contextlib
+
+
+  @contextlib.contextmanager
+  def example_user_context_manager(task_instance, execution_context):
+  """
+  An example of a context manager that a user may provide.
+  """
+  in_context = True
+  try:
+  yield in_context
+  finally:
+  in_context = False

Review comment:
   Can we make this example more detailed? For example with authenticating 
or checking something before/after execution? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-07 Thread GitBox


turbaszek commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421602145



##
File path: airflow/models/taskinstance.py
##
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
 session.merge(self)
 session.commit()
 
+def get_additional_execution_contextmanager(self, execution_context):
+"""
+Retrieves the user defined execution context callback from the 
configuration,
+and validates that it is indeed a context manager
+
+:param execution_context: the current execution context to be passed 
to user ctx
+"""
+additional_execution_contextmanager = conf.getimport("core", 
"additional_execute_contextmanager")
+if additional_execution_contextmanager:
+try:
+user_ctx_obj = additional_execution_contextmanager(self, 
execution_context)
+if hasattr(user_ctx_obj, "__enter__") and 
hasattr(user_ctx_obj, "__exit__"):
+return user_ctx_obj
+else:
+raise AirflowException(f"Loaded function 
{additional_execution_contextmanager} "
+   f"as additional execution 
contextmanager, but it does not have "
+   f"__enter__ or __exit__ method!")

Review comment:
   What about `@contextlib.contextmanager`? I think this check works only 
for context managers defined as a class 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] potiuk opened a new pull request #8767: Backport packages are renamed to include backport in their name

2020-05-07 Thread GitBox


potiuk opened a new pull request #8767:
URL: https://github.com/apache/airflow/pull/8767


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jonathanshir commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-07 Thread GitBox


jonathanshir commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r421592500



##
File path: airflow/config_templates/config.yml
##
@@ -366,6 +366,17 @@
   type: string
   example: "path.to.CustomXCom"
   default: "airflow.models.xcom.BaseXCom"
+- name: additional_execute_contextmanager
+  description: |
+Custom user function that returns a context manager. Syntax is 
"package.method".
+Context is entered when operator starts executing task. __enter__() 
will be called
+before the operator's execute method, and __exit__() shortly after.
+Function's signature should accept two positional parameters - task 
instance
+and execution context
+  version_added: 2.0.0
+  type: string
+  example: ~

Review comment:
   Sorry, I was wrong - example field instead of default field :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4543) Update slack operator to support slackclient v2

2020-05-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17101727#comment-17101727
 ] 

ASF GitHub Bot commented on AIRFLOW-4543:
-

serkef commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421559066



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   I want to test the `get_token` which is called in the `__init__` so this 
is why I'm creating a dummy subclass with empty init. Is there a better way to 
do it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update slack operator to support slackclient v2
> ---
>
> Key: AIRFLOW-4543
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4543
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks, operators
>Reporter: Sergio Kef
>Assignee: Sergio Kef
>Priority: Major
>
> Official [Slack API for python|https://pypi.org/project/slackclient/] has 
> recently released 
> [v.2|https://github.com/slackapi/python-slackclient/wiki/Migrating-to-2.x0]
> Among others some important points:
>  * Async IO
>  * SSL and Proxy
>  * Dropping 2.7 support
> Opening this ticket to work on the upgrade. Current functionalities will be 
> migrated and will try to extend functionalities, if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] serkef commented on a change in pull request #5519: [AIRFLOW-4543] Update slack operator to support slackclient v2

2020-05-07 Thread GitBox


serkef commented on a change in pull request #5519:
URL: https://github.com/apache/airflow/pull/5519#discussion_r421559066



##
File path: tests/providers/slack/hooks/test_slack.py
##
@@ -19,82 +19,154 @@
 import unittest
 
 import mock
+from slack.errors import SlackApiError
 
 from airflow.exceptions import AirflowException
 from airflow.providers.slack.hooks.slack import SlackHook
 
 
 class TestSlackHook(unittest.TestCase):
-def test_init_with_token_only(self):
+
+def test___get_token_with_token_only(self):
+""" tests `__get_token` method when only token is provided """
+# Given
 test_token = 'test_token'
-slack_hook = SlackHook(token=test_token, slack_conn_id=None)
+test_conn_id = None
+
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass
 
-self.assertEqual(slack_hook.token, test_token)
+# Run
+hook = DummySlackHook()
+
+# Assert
+output = hook._SlackHook__get_token(test_token, test_conn_id)  # 
pylint: disable=E1101
+expected = test_token
+self.assertEqual(output, expected)
 
 @mock.patch('airflow.providers.slack.hooks.slack.SlackHook.get_connection')
-def test_init_with_valid_slack_conn_id_only(self, get_connection_mock):
+def test___get_token_with_valid_slack_conn_id_only(self, 
get_connection_mock):
+""" tests `__get_token` method when only connection is provided """
+# Given
+test_token = None
+test_conn_id = 'x'
 test_password = 'test_password'
+
+# Mock
 get_connection_mock.return_value = mock.Mock(password=test_password)
 
-test_slack_conn_id = 'test_slack_conn_id'
-slack_hook = SlackHook(token=None, slack_conn_id=test_slack_conn_id)
+# Creating a dummy subclass is the easiest way to avoid running init
+#  which is actually using the method we are testing
+class DummySlackHook(SlackHook):
+def __init__(self, *args, **kwargs):  # pylint: disable=W0231
+pass

Review comment:
   I want to test the `get_token` which is called in the `__init__` so this 
is why I'm creating a dummy subclass with empty init. Is there a better way to 
do it?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >