[GitHub] [airflow] mik-laj edited a comment on pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


mik-laj edited a comment on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623267591


   This decorator should be in the same module as BaseSensorOperato. If you 
want to write some documentation, you can also add information about it to the 
guide on writing operators.
   https://airflow.readthedocs.io/en/latest/howto/custom-operator.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098692#comment-17098692
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

mik-laj commented on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623267591


   This decorator should be in the same module as BaseSensorOperator
   
   If you want to write some documentation, you can add information about it to 
the guide on writing operators.
   https://airflow.readthedocs.io/en/latest/howto/custom-operator.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GCSUploadSessionCompleteSensor breaks in reschedule mode.
> -
>
> Key: AIRFLOW-6586
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6586
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: Jacob Ferriero
>Priority: Minor
>
> This sensor is stateful and loses state between reschedules. 
> We should: 
>  # Warn about this in docstring
>  # Add a `poke_mode_only` class decorator for sensors that aren't safe in 
> reschedule mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


mik-laj commented on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623267591


   This decorator should be in the same module as BaseSensorOperator
   
   If you want to write some documentation, you can add information about it to 
the guide on writing operators.
   https://airflow.readthedocs.io/en/latest/howto/custom-operator.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098693#comment-17098693
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

mik-laj edited a comment on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623267591


   This decorator should be in the same module as BaseSensorOperato. If you 
want to write some documentation, you can also add information about it to the 
guide on writing operators.
   https://airflow.readthedocs.io/en/latest/howto/custom-operator.html



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GCSUploadSessionCompleteSensor breaks in reschedule mode.
> -
>
> Key: AIRFLOW-6586
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6586
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: Jacob Ferriero
>Priority: Minor
>
> This sensor is stateful and loses state between reschedules. 
> We should: 
>  # Warn about this in docstring
>  # Add a `poke_mode_only` class decorator for sensors that aren't safe in 
> reschedule mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419218559



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```python
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise ValueError()
   
   ```

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
   or as a decorator
   ```python
   def poke_only(cls_type):
   def mode_getter(_):
   return "poke"
   
   cls_type.mode = property(mode_getter)
   
   def mode_setter(_, value):
   if value != "value":
   raise ValueError()
   
   cls_type.mode.setter(mode_setter)
   ```
   Both examples are not tested but should work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098689#comment-17098689
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419218559



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```python
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise ValueError()
   
   ```

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
   or as a decorator
   ```python
   def poke_only(cls_type):
   def mode_getter(_):
   return "poke"
   
   cls_type.mode = property(mode_getter)
   
   def mode_setter(_, value):
   if value != "value":
   raise ValueError()
   
   cls_type.mode.setter(mode_setter)
   ```
   Both examples are not tested but should work.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:

[GitHub] [airflow] mik-laj commented on a change in pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419220670



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
   or as a decorator
   ```python
   def poke_only(cls_type):
   def mode_getter(_):
   return "poke"
   
   cls_type.mode = property(mode_getter)
   
   def mode_setter(_, value):
   if value != "value":
   raise AirflowException
   
   cls_type.mode.setter(mode_setter)
   ```
   Both examples are not tested but should work.

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```python
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise AirflowException()
   
   ```

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   

[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098688#comment-17098688
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419220670



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
   or as a decorator
   ```python
   def poke_only(cls_type):
   def mode_getter(_):
   return "poke"
   
   cls_type.mode = property(mode_getter)
   
   def mode_setter(_, value):
   if value != "value":
   raise AirflowException
   
   cls_type.mode.setter(mode_setter)
   ```
   Both examples are not tested but should work.

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```python
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise AirflowException()
   
   ```

##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright 

[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098684#comment-17098684
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419218559



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise AirflowException()
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GCSUploadSessionCompleteSensor breaks in reschedule mode.
> -
>
> Key: AIRFLOW-6586
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6586
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: Jacob Ferriero
>Priority: Minor
>
> This sensor is stateful and loses state between reschedules. 
> We should: 
>  # Warn about this in docstring
>  # Add a `poke_mode_only` class decorator for sensors that aren't safe in 
> reschedule mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#discussion_r419218559



##
File path: airflow/sensors/decorators/poke_mode_only.py
##
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import inspect
+import os
+from functools import wraps
+
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+
+
+def poke_mode_only(cls):
+"""
+Class Decorator for child classes of BaseSensorOperator to indicate
+that instances of this class are only safe to use poke mode.
+
+Will decorate all methods in the class to assert they did not change
+the mode from 'poke'.
+
+:param cls: BaseSensor class to enforce methods only use 'poke' mode.
+:type cls: type
+"""
+def decorate(cls):
+if not issubclass(cls, BaseSensorOperator):
+raise ValueError("poke_mode_only decorator should only be "
+ + "applied to subclasses of BaseSensorOperator,"
+ + f" got:{cls}.")
+for method in inspect.getmembers(cls, inspect.isfunction):

Review comment:
Is it really necessary to decorate all methods? Is it not enough to 
create the read-only `poke` attribute?
   
   Something similar to code below:
   ```
   @property
   def mode(self):
   return "poke"
   
   @mode.setter
   def mode(self, value):
   if value != "poke":
   raise AirflowException()
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8535: [Airflow-8439] Refactor test_variable_command.py

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8535:
URL: https://github.com/apache/airflow/pull/8535#discussion_r419217277



##
File path: tests/cli/commands/test_variable_command.py
##
@@ -31,58 +34,38 @@ def setUpClass(cls):
 cls.dagbag = models.DagBag(include_examples=True)
 cls.parser = cli_parser.get_parser()
 
-def test_variables(self):
-# Checks if all subcommands are properly received
+def setUp(self):
+clear_db_variables()
+
+def tearDown(self):
+clear_db_variables()
+
+def test_variables_set(self):
+"""Test variable_set command"""
+variable_command.variables_set(self.parser.parse_args([
+'variables', 'set', 'foo', 'bar']))
+self.assertIsNotNone(Variable.get("foo"))
+self.assertRaises(KeyError, Variable.get, "foo1")
+
+@unittest.mock.patch('sys.stdout', new_callable=io.StringIO)
+def test_variables_get(self, mock_stdout):
+Test variable_get command"""
+# Test conventional get call
 variable_command.variables_set(self.parser.parse_args([
 'variables', 'set', 'foo', '{"foo":"bar"}']))
 variable_command.variables_get(self.parser.parse_args([
 'variables', 'get', 'foo']))
 variable_command.variables_get(self.parser.parse_args([
 'variables', 'get', 'baz', '--default', 'bar']))
-variable_command.variables_list(self.parser.parse_args([
-'variables', 'list']))
-variable_command.variables_delete(self.parser.parse_args([
-'variables', 'delete', 'bar']))
-variable_command.variables_import(self.parser.parse_args([
-'variables', 'import', os.devnull]))
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', os.devnull]))
-
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'bar', 'original']))
-# First export
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', 'variables1.json']))
-
-first_exp = open('variables1.json', 'r')
-
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'bar', 'updated']))
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'foo', '{"foo":"oops"}']))
-variable_command.variables_delete(self.parser.parse_args([
-'variables', 'delete', 'foo']))
-# First import
-variable_command.variables_import(self.parser.parse_args([
-'variables', 'import', 'variables1.json']))
-
-self.assertEqual('original', Variable.get('bar'))
-self.assertEqual('{\n  "foo": "bar"\n}', Variable.get('foo'))
-# Second export
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', 'variables2.json']))
+self.assertEqual(mock_stdout.getvalue(), 'bar\n')

Review comment:
   This assertion failing on master, because this PR introduced unrelated 
change: https://github.com/apache/airflow/pull/8640/files
   
   Here is my fix: https://github.com/apache/airflow/pull/8698
   Can you do the review?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] seanxwzhang commented on a change in pull request #8535: [Airflow-8439] Refactor test_variable_command.py

2020-05-03 Thread GitBox


seanxwzhang commented on a change in pull request #8535:
URL: https://github.com/apache/airflow/pull/8535#discussion_r419210541



##
File path: tests/cli/commands/test_variable_command.py
##
@@ -31,58 +34,38 @@ def setUpClass(cls):
 cls.dagbag = models.DagBag(include_examples=True)
 cls.parser = cli_parser.get_parser()
 
-def test_variables(self):
-# Checks if all subcommands are properly received
+def setUp(self):
+clear_db_variables()
+
+def tearDown(self):
+clear_db_variables()
+
+def test_variables_set(self):
+"""Test variable_set command"""
+variable_command.variables_set(self.parser.parse_args([
+'variables', 'set', 'foo', 'bar']))
+self.assertIsNotNone(Variable.get("foo"))
+self.assertRaises(KeyError, Variable.get, "foo1")
+
+@unittest.mock.patch('sys.stdout', new_callable=io.StringIO)
+def test_variables_get(self, mock_stdout):
+Test variable_get command"""
+# Test conventional get call
 variable_command.variables_set(self.parser.parse_args([
 'variables', 'set', 'foo', '{"foo":"bar"}']))
 variable_command.variables_get(self.parser.parse_args([
 'variables', 'get', 'foo']))
 variable_command.variables_get(self.parser.parse_args([
 'variables', 'get', 'baz', '--default', 'bar']))
-variable_command.variables_list(self.parser.parse_args([
-'variables', 'list']))
-variable_command.variables_delete(self.parser.parse_args([
-'variables', 'delete', 'bar']))
-variable_command.variables_import(self.parser.parse_args([
-'variables', 'import', os.devnull]))
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', os.devnull]))
-
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'bar', 'original']))
-# First export
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', 'variables1.json']))
-
-first_exp = open('variables1.json', 'r')
-
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'bar', 'updated']))
-variable_command.variables_set(self.parser.parse_args([
-'variables', 'set', 'foo', '{"foo":"oops"}']))
-variable_command.variables_delete(self.parser.parse_args([
-'variables', 'delete', 'foo']))
-# First import
-variable_command.variables_import(self.parser.parse_args([
-'variables', 'import', 'variables1.json']))
-
-self.assertEqual('original', Variable.get('bar'))
-self.assertEqual('{\n  "foo": "bar"\n}', Variable.get('foo'))
-# Second export
-variable_command.variables_export(self.parser.parse_args([
-'variables', 'export', 'variables2.json']))
+self.assertEqual(mock_stdout.getvalue(), 'bar\n')

Review comment:
   It seems that this assertion is failing on master. Perhaps you meant
   ```
   self.assertEqual(mock_stdout.getvalue(), '{"foo":"bar"}\nbar\n')
   ```
   ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4761) Airflow Task Clear function throws error

2020-05-03 Thread Kamil Choudhury (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098649#comment-17098649
 ] 

Kamil Choudhury commented on AIRFLOW-4761:
--

I'm still running into this on 1.10.10/python 2.7.10, and can reproduce it 
programmatically.

 

I'm not too familiar with airflow internals, but am happy to walk through it 
with people who are more experienced. Thanks!

> Airflow Task Clear function throws error
> 
>
> Key: AIRFLOW-4761
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4761
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, DagRun
>Affects Versions: 1.10.3
> Environment: CentOS 7, Python 2.7.10
>Reporter: Ben Storrie
>Priority: Major
>
> When using the airflow webserver to clear a task inside a dagrun, an error is 
> thrown on certain types of tasks:
>  
> {code:java}
> Traceback (most recent call last):
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask/app.py",
>  line 2311, in wsgi_app
> response = self.full_dispatch_request()
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask/app.py",
>  line 1834, in full_dispatch_request
> rv = self.handle_user_exception(e)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask/app.py",
>  line 1737, in handle_user_exception
> reraise(exc_type, exc_value, tb)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask/app.py",
>  line 1832, in full_dispatch_request
> rv = self.dispatch_request()
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask/app.py",
>  line 1818, in dispatch_request
> return self.view_functions[rule.endpoint](**req.view_args)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask_admin/base.py",
>  line 69, in inner
> return self._run_view(f, *args, **kwargs)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask_admin/base.py",
>  line 368, in _run_view
> return fn(self, *args, **kwargs)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/flask_login/utils.py",
>  line 261, in decorated_view
> return func(*args, **kwargs)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/www/utils.py",
>  line 275, in wrapper
> return f(*args, **kwargs)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/www/utils.py",
>  line 322, in wrapper
> return f(*args, **kwargs)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/www/views.py",
>  line 1202, in clear
> include_upstream=upstream)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/models/__init__.py",
>  line 3830, in sub_dag
> dag = copy.deepcopy(self)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 174, in deepcopy
> y = copier(memo)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/models/__init__.py",
>  line 3815, in __deepcopy__
> setattr(result, k, copy.deepcopy(v, memo))
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 163, in deepcopy
> y = copier(x, memo)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 257, in _deepcopy_dict
> y[deepcopy(key, memo)] = deepcopy(value, memo)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 174, in deepcopy
> y = copier(memo)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/site-packages/airflow/models/__init__.py",
>  line 2492, in __deepcopy__
> setattr(result, k, copy.copy(v))
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 96, in copy
> return _reconstruct(x, rv, 0)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy.py", 
> line 329, in _reconstruct
> y = callable(*args)
> File 
> "/opt/my-miniconda/miniconda/envs/my-hadoop-airflow/lib/python2.7/copy_reg.py",
>  line 93, in __newobj__
> return cls.__new__(cls, *args)
> TypeError: instancemethod expected at least 2 arguments, got 0{code}
>  
> I had expected AIRFLOW-2060 being resolved to resolve this on upgrade to 
> 1.10.3:
> {code:java}
> (my-hadoop-airflow) [user@hostname ~]$ pip freeze | grep pendulum
> pendulum==1.4.4{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] RosterIn commented on issue #8696: Skip task itself instead of all downstream tasks

2020-05-03 Thread GitBox


RosterIn commented on issue #8696:
URL: https://github.com/apache/airflow/issues/8696#issuecomment-623248308


   seems related to https://github.com/apache/airflow/issues/7858



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128640



##
File path: tests/task/context/test_current_context.py
##
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, 
set_current_context
+
+DEFAULT_ARGS = {
+"owner": "test",
+"depends_on_past": True,
+"start_date": datetime(2020, 4, 22),
+"retries": 1,
+"retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+def test_current_context_no_context_raise(self):
+with pytest.raises(AirflowException):
+get_current_context()
+
+def test_current_context_roundtrip(self):
+example_context = {"Hello": "World"}
+
+with set_current_context(example_context):
+assert get_current_context() == example_context
+
+def test_context_removed_after_exit(self):
+example_context = {"Hello": "World"}
+
+with set_current_context(example_context):
+pass
+with pytest.raises(AirflowException, ):
+get_current_context()
+
+def test_nested_context(self):
+"""
+Nested execution context can occur in a few cases:
+1. User uses nested ctx managers.
+2. User is executing subdags and subruns

Review comment:
   This is not true for Airflow 2.0. In Airflow 2.0, each task is executed 
by a separate local task job.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8694: Add airflow config command (v1-10-test)

2020-05-03 Thread GitBox


mik-laj commented on pull request #8694:
URL: https://github.com/apache/airflow/pull/8694#issuecomment-623248081


   Checks are green :-) 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] RosterIn commented on pull request #8668: Bump supported mysqlclient to <1.5

2020-05-03 Thread GitBox


RosterIn commented on pull request #8668:
URL: https://github.com/apache/airflow/pull/8668#issuecomment-623247580


   you have erros:
   ```
   ImportError while importing test module 
'/opt/airflow/tests/providers/google/cloud/operators/test_mysql_to_gcs.py'.
   Hint: make sure your test modules/packages have valid Python names.
   Traceback:
   tests/providers/google/cloud/operators/test_mysql_to_gcs.py:24: in 
   from _mysql_exceptions import ProgrammingError
   E   ModuleNotFoundError: No module named '_mysql_exceptions'
   ```
   
   you need to make modifications to the code to get it working as there were 
breaking changes in `PyMySQL`
   https://github.com/PyMySQL/mysqlclient-python/blob/master/HISTORY.rst
   
   the change related to the error that you get is:
   _mysql and _mysql_exceptions modules are moved under MySQLdb package. 
(https://github.com/PyMySQL/mysqlclient-python/pull/293)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] RosterIn commented on pull request #8154: Delete cast(DR.execution_date, DateTime)

2020-05-03 Thread GitBox


RosterIn commented on pull request #8154:
URL: https://github.com/apache/airflow/pull/8154#issuecomment-623245883


   > The cast was added by #1871, but it prevents sqlalchemy from automatically 
converting execution_date to UTC and fails 
test_core.CoreTest.test_schedule_dag_fake_scheduled_previous when your 
default_timezone is not set to UTC.
   
   shouldn't be a unit test to catch that?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] RosterIn commented on pull request #8226: Update ldap_auth.py

2020-05-03 Thread GitBox


RosterIn commented on pull request #8226:
URL: https://github.com/apache/airflow/pull/8226#issuecomment-623243647


   @ShanGor why this PR is against v1.10 branch?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] gdevanla commented on pull request #8699: Scheduler just checks for task instances in 'running' state in execution. #8691

2020-05-03 Thread GitBox


gdevanla commented on pull request #8699:
URL: https://github.com/apache/airflow/pull/8699#issuecomment-623242313


   I have added this a proof-of-concept change. Any reviews regarding this 
approach is appreciated. I will add tests around this scenario if this is the 
change we want to make.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] gdevanla opened a new pull request #8699: Scheduler just checks for task instances in 'running' state in execution. #8691

2020-05-03 Thread GitBox


gdevanla opened a new pull request #8699:
URL: https://github.com/apache/airflow/pull/8699


   The 'scheduler' needs to only check if the executor has started the task by 
checked the 'running' dictionary of the executor instance. The entry in the 
`Executor.queued_tasks` will be over-written with a new task-instance. 
   
   This change assumes the `running` instance attributes of `Executor` signals 
the `scheduler` to not requeue the task. 
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ x] Description above provides context of the change
   - [ x] Unit tests coverage for changes (not needed for documentation changes)
   - [x ] Target Github ISSUE in description if exists
   - [ x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x ] Relevant documentation is updated including usage instructions.
   - [ x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] lokeshlal commented on pull request #8509: [AIRFLOW-4568]The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disall

2020-05-03 Thread GitBox


lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-623240562


   Hello @potiuk Could you please help in reviewing and merging this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4568) The ExternalTaskSensor should be configurable to raise an Airflow Exception in case the poked external task reaches a disallowed state, such as f.i. failed

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098640#comment-17098640
 ] 

ASF GitHub Bot commented on AIRFLOW-4568:
-

lokeshlal commented on pull request #8509:
URL: https://github.com/apache/airflow/pull/8509#issuecomment-623240562


   Hello @potiuk Could you please help in reviewing and merging this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The ExternalTaskSensor should be configurable to raise an Airflow Exception 
> in case the poked external task reaches a disallowed state, such as f.i. 
> failed
> ---
>
> Key: AIRFLOW-4568
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4568
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: ddluke
>Priority: Minor
>
> _As an engineer, I would like to have the behavior of the ExternalTaskSensor 
> changed_
> _So that it fails in case the poked external_task_id fails_
> *Therefore*
>  * I suggest extending the behavior of the sensor to optionally also query 
> the TaskInstance for disallowed states and raise an AirflowException if 
> found. Currently, if the poked external task reaches a failed state, the 
> sensor continues to poke and does not terminate
> *Acceptance Criteria (from my pov)*
>  * The class interface for ExternalTaskSensor is extended with an additional 
> parameter, disallowed_states, which is an Optional List of 
> airflow.utils.state.State
>  * The poke method is expanded to count the number of rows from TaskInstance 
> which met the filter criteria dag_id, task_id, disallowed_states and 
> dttm_filter if disallowed_states is not None
>  * If disallowed_states is not None and the above query returns a counter > 
> 0, an Airflow Exception is thrown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj opened a new pull request #8698: Fix failed master - Invalid output in test_variable assertion

2020-05-03 Thread GitBox


mik-laj opened a new pull request #8698:
URL: https://github.com/apache/airflow/pull/8698


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jaketf commented on pull request #7197: [AIRFLOW-6586] Improvements to gcs sensor

2020-05-03 Thread GitBox


jaketf commented on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623235491


   @turbaszek this is the first decorator in this dir. How do you think we 
should document this? 
   In the same place the docs ci check is complaining about ? 
operators-and-hooks-ref.rst? 
   Should I create a new docs file for sensor decorators?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6586) GCSUploadSessionCompleteSensor breaks in reschedule mode.

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098635#comment-17098635
 ] 

ASF GitHub Bot commented on AIRFLOW-6586:
-

jaketf commented on pull request #7197:
URL: https://github.com/apache/airflow/pull/7197#issuecomment-623235491


   @turbaszek this is the first decorator in this dir. How do you think we 
should document this? 
   In the same place the docs ci check is complaining about ? 
operators-and-hooks-ref.rst? 
   Should I create a new docs file for sensor decorators?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GCSUploadSessionCompleteSensor breaks in reschedule mode.
> -
>
> Key: AIRFLOW-6586
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6586
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.3
>Reporter: Jacob Ferriero
>Priority: Minor
>
> This sensor is stateful and loses state between reschedules. 
> We should: 
>  # Warn about this in docstring
>  # Add a `poke_mode_only` class decorator for sensors that aren't safe in 
> reschedule mode.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-05-03 Thread GitBox


maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015



##
File path: airflow/operators/python_operator.py
##
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 if self._pass_op_args():
+kwargs = {}
+for key, value in self.op_kwargs.items():

Review comment:
   @Fokko Thank you for taking time to review this PR, I've updated it 
based on your suggestions. 
   
   ```python
   def _write_args(self, input_filename):
   # serialize args to file
   if self.use_dill:
   serializer = dill
   else:
   serializer = pickle
   # some items from context can't be loaded in virtual env
   # see pr https://github.com/apache/airflow/pull/8256
   not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
   if self._pass_op_args():
   kwargs = {key: value for key, value in self.op_kwargs.items()
 if key not in not_serializable}
   with open(input_filename, 'wb') as f:
   arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
   serializer.dump(arg_dict, f)
   
   
   The invalid arguments here are SQLAlchemy db models, some fail to serialize, 
and some others (dag, task, and ti) fail to deserialize at run time due to 
reference to objects which are no longer present. See log.
   
   [log_1.txt](https://github.com/apache/airflow/files/4572523/log_1.txt)
   
   To solve this issues we would need to implement a __setstate__ and 
__getstate__ internal functions in those objects so they are properly 
serialized. This is not the scope of this PR and does represent considerable 
work, and tests. 
   
   I've also added a pytest to ensure this gets tested in the future. 
   
   ```python
   def test_config_context(self):
   """
   This test ensures we can use dag_run from the context
   to access the configuration at run time that's being
   passed from the UI, CLI, and REST API.
   """
   self.dag.create_dagrun(
   run_id='manual__' + DEFAULT_DATE.isoformat(),
   execution_date=DEFAULT_DATE,
   start_date=DEFAULT_DATE,
   state=State.RUNNING,
   external_trigger=False,
   )
   
   def pass_function(**kwargs):
   kwargs['dag_run'].conf
   
   t = PythonVirtualenvOperator(task_id='config_dag_run', dag=self.dag,
provide_context=True,
python_callable=pass_function)
   t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] resdevd edited a comment on pull request #8621: Docker Compose CeleryExecutor example

2020-05-03 Thread GitBox


resdevd edited a comment on pull request #8621:
URL: https://github.com/apache/airflow/pull/8621#issuecomment-623221335


   entrypoint.sh is maintained in dockerfile itself. This PR is docker-compose 
setup using Airflow official Docker Image.
   If you explore the code, you can see one time setup of initdb and inituser 
via docker-exec into main airflow container via a shell script.
   The idea is users who need initdb and intiuser setup can run that script one 
time after deploying docker compose, or users can just use docker-compose by 
replacing env variables with an existing db connection string.
   In this PR, the containers are in restart mode, so users can run that script 
while docker-compose is active. IMO we shouldn't waste resources by having a 
container run just for one time action. In this PR, users won't have to rebuild 
images by changing entrypoint.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] maganaluis commented on a change in pull request #8256: updated _write_args on PythonVirtualenvOperator

2020-05-03 Thread GitBox


maganaluis commented on a change in pull request #8256:
URL: https://github.com/apache/airflow/pull/8256#discussion_r419189015



##
File path: airflow/operators/python_operator.py
##
@@ -330,13 +330,28 @@ def _write_string_args(self, filename):
 
 def _write_args(self, input_filename):
 # serialize args to file
+if self.use_dill:
+serializer = dill
+else:
+serializer = pickle
+# some args from context can't be loaded in virtual env
+invalid_args = set(['dag', 'task', 'ti'])
 if self._pass_op_args():
+kwargs = {}
+for key, value in self.op_kwargs.items():

Review comment:
   @Fokko Thank you for taking time to review this PR, I've updated it 
based on your suggestions. 
   
   ```python
   def _write_args(self, input_filename):
   # serialize args to file
   if self.use_dill:
   serializer = dill
   else:
   serializer = pickle
   # some items from context can't be loaded in virtual env
   # see pr https://github.com/apache/airflow/pull/8256
   not_serializable = {'dag', 'task', 'ti', 'macros', 'task_instance', 
'var'}
   if self._pass_op_args():
   kwargs = {key: value for key, value in self.op_kwargs.items()
 if key not in not_serializable}
   with open(input_filename, 'wb') as f:
   arg_dict = ({'args': self.op_args, 'kwargs': kwargs})
   serializer.dump(arg_dict, f)
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] resdevd edited a comment on pull request #8621: Docker Compose CeleryExecutor example

2020-05-03 Thread GitBox


resdevd edited a comment on pull request #8621:
URL: https://github.com/apache/airflow/pull/8621#issuecomment-623221335


   entrypoint.sh is maintained in dockerfile itself. This PR is docker-compose 
setup using Airflow official Docker Image.
   If you explore the code, you can see one time setup of initdb and inituser 
via docker-exec into main airflow container via a shell script.
   The idea is users who need initdb and intiuser setup can run that script one 
time after deploying docker compose, or users can just use docker-compose by 
replacing env variables for db connection string.
   In this PR, the containers are in restart mode, so users can run that script 
while docker-compose is active. IMO we shouldn't waste resources by having a 
container run just for one time action. In this PR, users won't have to rebuild 
images by changing entrypoint.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] resdevd commented on pull request #8621: Docker Compose CeleryExecutor example

2020-05-03 Thread GitBox


resdevd commented on pull request #8621:
URL: https://github.com/apache/airflow/pull/8621#issuecomment-623221335


   entrypoint.sh is maintained in dockerfile itself. This PR is docker-compose 
setup using Airflow official Docker Image.
   If you explore the code, you can see one time setup of initdb and inituser 
via docker-exec into main airflow container via a shell script.
   The idea is users who need initdb and intiuser setup can run that script one 
time after deploying docker compose, or users can just use docker-compose by 
replacing env variables for db connection string.
   In this PR, the containers are in restart mode, so users can run that script 
while docker-compose is active. IMO we shouldn't waste resources by having a 
container run just for one time action.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8697: Changea provider:GCP to provider:Google

2020-05-03 Thread GitBox


mik-laj opened a new pull request #8697:
URL: https://github.com/apache/airflow/pull/8697


   It seems to me that it has a lot in common. I am also sad because the 
changes in the base hook have no label.
   
   We need to change name of the label in Github before merging this change.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7008) Add perf kit with common used decorators/contexts

2020-05-03 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098617#comment-17098617
 ] 

ASF subversion and git services commented on AIRFLOW-7008:
--

Commit aec768b5d258e3cfd535cb501468514938152263 in airflow's branch 
refs/heads/master from Kamil Breguła
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=aec768b ]

[AIRFLOW-7008] Add perf kit with common used decorators/contexts (#7650)



> Add perf kit with common used decorators/contexts
> -
>
> Key: AIRFLOW-7008
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7008
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-7008) Add perf kit with common used decorators/contexts

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098616#comment-17098616
 ] 

ASF GitHub Bot commented on AIRFLOW-7008:
-

mik-laj commented on a change in pull request #7650:
URL: https://github.com/apache/airflow/pull/7650#discussion_r419183415



##
File path: scripts/perf/perf_kit/python.py
##
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import cProfile
+import datetime
+import io
+import os
+import pstats
+import signal
+
+PYSPY_OUTPUT = os.environ.get("PYSPY_OUTPUT", "/files/pyspy/")
+
+
+@contextlib.contextmanager
+def pyspy():
+"""
+This decorator provide deterministic profiling. It generate and save flame 
graph to file. It uses``pyspy``
+internally.
+
+Running py-spy inside of a docker container will also usually bring up a 
permissions denied error
+even when running as root.
+
+This error is caused by docker restricting the process_vm_readv system 
call we are using. This can be

Review comment:
   It will be difficult because this error happens in the subprocess. It 
would be necessary to check whether this process started correctly, which will 
affect the result. When we run the process, we immediately need to start 
executing the observed code. Otherwise, we will have junk data in the diagram.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add perf kit with common used decorators/contexts
> -
>
> Key: AIRFLOW-7008
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7008
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #7650: [AIRFLOW-7008] Add perf kit with common used decorators/contexts

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7650:
URL: https://github.com/apache/airflow/pull/7650#discussion_r419183415



##
File path: scripts/perf/perf_kit/python.py
##
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import cProfile
+import datetime
+import io
+import os
+import pstats
+import signal
+
+PYSPY_OUTPUT = os.environ.get("PYSPY_OUTPUT", "/files/pyspy/")
+
+
+@contextlib.contextmanager
+def pyspy():
+"""
+This decorator provide deterministic profiling. It generate and save flame 
graph to file. It uses``pyspy``
+internally.
+
+Running py-spy inside of a docker container will also usually bring up a 
permissions denied error
+even when running as root.
+
+This error is caused by docker restricting the process_vm_readv system 
call we are using. This can be

Review comment:
   It will be difficult because this error happens in the subprocess. It 
would be necessary to check whether this process started correctly, which will 
affect the result. When we run the process, we immediately need to start 
executing the observed code. Otherwise, we will have junk data in the diagram.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8659: Setting num_retries for a google-cloud-platform using env variables breaks google tasks

2020-05-03 Thread GitBox


mik-laj commented on issue #8659:
URL: https://github.com/apache/airflow/issues/8659#issuecomment-623213202


   @michalslowikowski00 Can you help with it?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8556: Add system test for gcs_to_bigquery

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on pull request #8556:
URL: https://github.com/apache/airflow/pull/8556#issuecomment-623212977


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #6277: [AIRFLOW-2971] Add health check CLI for scheduler

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #6277:
URL: https://github.com/apache/airflow/pull/6277#discussion_r419176878



##
File path: tests/cli/test_cli.py
##
@@ -488,3 +490,26 @@ def test_run_naive_taskinstance(self, mock_local_job):
 pickle_id=None,
 pool=None,
 )
+
+@mock.patch("airflow.bin.cli.jobs.SchedulerJob.most_recent_job")
+def test_health_scheduler(self, mock_most_recent):
+"""
+Test that the 'scheduler' subcommand for health works
+"""
+job = SchedulerJob(
+state='running',
+latest_heartbeat=timezone.utcnow()
+)
+mock_most_recent.return_value = job
+
+with conf_vars({("scheduler", "scheduler_health_check_threshold"): 
'60'}):
+with mock.patch('sys.exit') as mock_exit:

Review comment:
   The mock on this method does not sound good because the pytest will not 
be able to use this method either. Can you catch the SystemExit exception 
instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2971) Health check command for scheduler

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098613#comment-17098613
 ] 

ASF GitHub Bot commented on AIRFLOW-2971:
-

mik-laj commented on a change in pull request #6277:
URL: https://github.com/apache/airflow/pull/6277#discussion_r419176878



##
File path: tests/cli/test_cli.py
##
@@ -488,3 +490,26 @@ def test_run_naive_taskinstance(self, mock_local_job):
 pickle_id=None,
 pool=None,
 )
+
+@mock.patch("airflow.bin.cli.jobs.SchedulerJob.most_recent_job")
+def test_health_scheduler(self, mock_most_recent):
+"""
+Test that the 'scheduler' subcommand for health works
+"""
+job = SchedulerJob(
+state='running',
+latest_heartbeat=timezone.utcnow()
+)
+mock_most_recent.return_value = job
+
+with conf_vars({("scheduler", "scheduler_health_check_threshold"): 
'60'}):
+with mock.patch('sys.exit') as mock_exit:

Review comment:
   The mock on this method does not sound good because the pytest will not 
be able to use this method either. Can you catch the SystemExit exception 
instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Health check command for scheduler
> --
>
> Key: AIRFLOW-2971
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2971
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Jon Davies
>Priority: Major
>
> As part of a Kubernetes deployment of Airflow, I would like to define an exec 
> command based health check for the Airflow scheduler:
> - 
> https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
> ...the webserver is simple as all that needs is checking that the HTTP port 
> is available. For the scheduler, it would be neat to have a command such as:
> airflow scheduler health
> That returned OK and exit 0/NOT OK and a non-zero value when it cannot reach 
> the database for instance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on pull request #8653: [AIRFLOW-8645] BQ: integer range partitioning support for BQ tables

2020-05-03 Thread GitBox


mik-laj commented on pull request #8653:
URL: https://github.com/apache/airflow/pull/8653#issuecomment-623208822


   Hi. Integration with BigQuery is undergoing extensive refactor by @turbaszek 
. Can you wait until we finish this work? It is possible that then this change 
will not be necessary because these operators will accept all parameters 
available through the API.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8647: Import fails when dags subfolder and plugins subfolder have the same name

2020-05-03 Thread GitBox


mik-laj commented on issue #8647:
URL: https://github.com/apache/airflow/issues/8647#issuecomment-623208206


   This is the expected behavior. 
   
   Airflow adds **an additional 3 directories** to the variable `sys.path`.
   * ``conf.get('core', 'dags_folder')``
   * ``conf.get('core', 'airflow_home')``
   * ``conf.get('core', 'plugins_folder')``
   
   This means that plugins and DAG can use imports that match the current 
directory relative to each other.   Otherwise, the plugin would not be able to 
load another file that is in the directory next to the plugin or DAG so that it 
cannot load the file that is next to it.
   
   In Airflow master you can check the content of the sys.path variable using 
the airflow info command.
   https://github.com/apache/airflow/pull/8290
   
   The problem is only the lack of description of this behavior in the 
documentation. However, I am already working on it in my free time.  The first 
step is to add the airflow info command because I need it to describe this 
behavior accurately.
   
   I recommend not using plugins to add new operators. This is problematic 
behavior and causes various problems, among others increased RAM consumption, 
slower task startup. It's best to load operators as if they were normal python 
modules. Ideally, it would be a separate package installed by pip or added to 
PYTHONPATH.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #8696: Skip task itself instead of all downstream tasks

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on issue #8696:
URL: https://github.com/apache/airflow/issues/8696#issuecomment-623207419


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] yuqian90 opened a new issue #8696: Skip task itself instead of all downstream tasks

2020-05-03 Thread GitBox


yuqian90 opened a new issue #8696:
URL: https://github.com/apache/airflow/issues/8696


   **Apache Airflow version**: 1.10.10
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: any
   - **OS** (e.g. from /etc/os-release): any
   - **Kernel** (e.g. `uname -a`): any
   - **Install tools**: any
   - **Others**: any
   
   **What happened**:
   
   - When a sensor is set to `soft_fail=True`, it becomes skipped when it 
fails. However, it also calls `BaseSensorOperator._do_skip_downstream_tasks()` 
and skips all of its downstream tasks unconditionally, including those with 
`trigger_rule` `none_failed`, `one_success` etc.
   - `ShortCircuitOperator` is similar. When it is skipped, it skips all its 
downstream tasks unconditionally.
   
   **What you expected to happen**:
   - When a soft_fail sensor fails, it should skip itself. Downstream tasks 
with `trigger_rule` `all_success` (i.e. the default) should be skipped because 
of `TriggerRuleDep`. Tasks that are `none_failed` or `one_success` etc should 
not be skipped unconditionally by the soft_fail sensor.
   - Same applies for `ShortCircuitOperator`
   
   **How to reproduce it**:
   Any DAG with `soft_fail` or `ShortCircuitOperator` and downstream tasks 
having `trigger_rule` `none_failed`, `one_success` will have this problem.
   
   **Anything else we need to know**:
   
   An old issue addressed this in general and made it possible for operators to 
skip themselves and not its downstream. The same principle should be applied to 
`soft_fail` and `ShortCircuitOperator`:
   https://github.com/apache/airflow/pull/1292
   
   The fix should to this issue is rather simple, just raise 
`AirflowSkipException` and not skip downstream tasks. Leave it to the 
TriggerRuleDep to do the skip based on the trigger_rule of tasks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8679: Fix displaying Executor Class Name in "Base Job" table

2020-05-03 Thread GitBox


mik-laj commented on pull request #8679:
URL: https://github.com/apache/airflow/pull/8679#issuecomment-623205033


   Why does every job need an executor? I think LocalTaskJob doesn't need it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #6277: [AIRFLOW-2971] Add health check CLI for scheduler

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #6277:
URL: https://github.com/apache/airflow/pull/6277#discussion_r419176878



##
File path: tests/cli/test_cli.py
##
@@ -488,3 +490,26 @@ def test_run_naive_taskinstance(self, mock_local_job):
 pickle_id=None,
 pool=None,
 )
+
+@mock.patch("airflow.bin.cli.jobs.SchedulerJob.most_recent_job")
+def test_health_scheduler(self, mock_most_recent):
+"""
+Test that the 'scheduler' subcommand for health works
+"""
+job = SchedulerJob(
+state='running',
+latest_heartbeat=timezone.utcnow()
+)
+mock_most_recent.return_value = job
+
+with conf_vars({("scheduler", "scheduler_health_check_threshold"): 
'60'}):
+with mock.patch('sys.exit') as mock_exit:

Review comment:
   The cat on this method does not sound good because the question will not 
be able to use this method either. Can you catch the SystemExit exception 
instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2971) Health check command for scheduler

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098601#comment-17098601
 ] 

ASF GitHub Bot commented on AIRFLOW-2971:
-

mik-laj commented on a change in pull request #6277:
URL: https://github.com/apache/airflow/pull/6277#discussion_r419176878



##
File path: tests/cli/test_cli.py
##
@@ -488,3 +490,26 @@ def test_run_naive_taskinstance(self, mock_local_job):
 pickle_id=None,
 pool=None,
 )
+
+@mock.patch("airflow.bin.cli.jobs.SchedulerJob.most_recent_job")
+def test_health_scheduler(self, mock_most_recent):
+"""
+Test that the 'scheduler' subcommand for health works
+"""
+job = SchedulerJob(
+state='running',
+latest_heartbeat=timezone.utcnow()
+)
+mock_most_recent.return_value = job
+
+with conf_vars({("scheduler", "scheduler_health_check_threshold"): 
'60'}):
+with mock.patch('sys.exit') as mock_exit:

Review comment:
   The cat on this method does not sound good because the question will not 
be able to use this method either. Can you catch the SystemExit exception 
instead?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Health check command for scheduler
> --
>
> Key: AIRFLOW-2971
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2971
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: scheduler
>Reporter: Jon Davies
>Priority: Major
>
> As part of a Kubernetes deployment of Airflow, I would like to define an exec 
> command based health check for the Airflow scheduler:
> - 
> https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-probes/
> ...the webserver is simple as all that needs is checking that the HTTP port 
> is available. For the scheduler, it would be neat to have a command such as:
> airflow scheduler health
> That returned OK and exit 0/NOT OK and a non-zero value when it cannot reach 
> the database for instance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8640: Support k8s auth method in vault secrets provider

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on pull request #8640:
URL: https://github.com/apache/airflow/pull/8640#issuecomment-623198724


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on pull request #8505: [POC] Add more precise tests (tests for jinja params) for AirflowVersion

2020-05-03 Thread GitBox


kaxil commented on pull request #8505:
URL: https://github.com/apache/airflow/pull/8505#issuecomment-623195226


   For future reference: 
https://flask.palletsprojects.com/en/1.1.x/signals/#subscribing-to-signals



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jsmodic edited a comment on issue #8480: Celery autoscaling overrides normal worker_concurrency setting

2020-05-03 Thread GitBox


jsmodic edited a comment on issue #8480:
URL: https://github.com/apache/airflow/issues/8480#issuecomment-623192154


   As far as I tested it should be enough to remove 'autoscale' from the 
dictionary to be unpacked into Celery if autoscaling isn't configured in the 
airflow.cfg.  But I haven't confirmed that works on other versions of Celery or 
how it behaves if you actually desire autoscaling.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] jsmodic commented on issue #8480: Celery autoscaling overrides normal worker_concurrency setting

2020-05-03 Thread GitBox


jsmodic commented on issue #8480:
URL: https://github.com/apache/airflow/issues/8480#issuecomment-623192154


   I can't speak to how Celery's various versions work, but as far as I tested 
it should be enough to remove 'autoscale' from the dictionary to be unpacked 
into Celery if autoscaling isn't configured in the airflow.cfg.  But I haven't 
confirmed that works on other versions of Celery or how it behaves.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8505: [POC] Add more precise tests (tests for jinja params) for AirflowVersion

2020-05-03 Thread GitBox


mik-laj commented on pull request #8505:
URL: https://github.com/apache/airflow/pull/8505#issuecomment-623191262


   @potiuk @ashb @kaxil @turbaszek I have updated PR. Now I use flask.signals 
instead of mock.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8505: [POC] Add more precise tests (tests for jinja params) for AirflowVersion

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8505:
URL: https://github.com/apache/airflow/pull/8505#discussion_r419167999



##
File path: setup.py
##
@@ -427,6 +427,7 @@ def write_version(filename: str = os.path.join(*[my_dir, 
"airflow", "git_version
 

 devel = [
 'beautifulsoup4~=4.7.1',
+'blinker',

Review comment:
   Required by flask: 
https://github.com/pallets/flask/blob/master/src/flask/signals.py#L13





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8694: Add airflow config command (v1-10-test)

2020-05-03 Thread GitBox


mik-laj commented on pull request #8694:
URL: https://github.com/apache/airflow/pull/8694#issuecomment-623188397


   I'm working on it. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] nadflinn commented on issue #8480: Celery autoscaling overrides normal worker_concurrency setting

2020-05-03 Thread GitBox


nadflinn commented on issue #8480:
URL: https://github.com/apache/airflow/issues/8480#issuecomment-623185999


   The autoscale config is commented out since 1.10.10 which is a change from 
1.10.9 where it isn't and I think this accounts for the difference in the 
behavior that people are seeing.
   
https://github.com/apache/airflow/blob/1.10.9/airflow/config_templates/default_airflow.cfg#L480
   
https://github.com/apache/airflow/blob/1.10.10/airflow/config_templates/default_airflow.cfg#L516
   
   Based on the 
[comment](https://github.com/apache/airflow/blob/1.10.9/airflow/config_templates/default_airflow.cfg#L477)
 above the setting, it makes sense that you wouldn't be able to set worker 
concurrency in 1.10.9: 
   ` If autoscale option is available, worker_concurrency will be ignored`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] gpongracz commented on pull request #8621: Docker Compose CeleryExecutor example

2020-05-03 Thread GitBox


gpongracz commented on pull request #8621:
URL: https://github.com/apache/airflow/pull/8621#issuecomment-623185001


   Looks like the entrypoint.sh is not doing an initdb or upgradedb prior to 
the command like is found in puckel
   
https://github.com/puckel/docker-airflow/blob/master/script/entrypoint.sh#L110
   I think that the puckel entrypoint script is very elegant and may help 
inform design of this one...
   In the mean time I have written a docker-compose for a local executor 
definition that overcomes this by setting restart to on-failure and starting a 
container which runs an initdb which then stops after successfully completing - 
a bit of a cludge but it gets the current image working...
   This is certainly not a production script but it works with the current image
   I've commented out a couple of optional mounts for dags and in my case aws 
keys
   If a feature similar to the puckel entrypoint above is developed then the 
webserver and scheduler definitions in the following docker-compose could be 
collapsed into a single container
   
   **_docker-compose.yml (working example mitigating for entrypoint.sh lacking 
initdb)_**
   version: '3.7'
   services:
   postgres:
   image: postgres:10
   restart: on-failure
   environment:
   - POSTGRES_USER=airflow
   - POSTGRES_PASSWORD=airflow
   - POSTGRES_DB=airflow
   logging:
   options:
   max-size: 10m
   max-file: "3"
   ports:
   - "5432:5432"
   initdb:
   image: apache/airflow:latest
   restart: on-failure
   depends_on:
   - postgres
   environment:
   - 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
   - AIRFLOW__CORE__EXECUTOR=LocalExecutor
   logging:
   options:
   max-size: 10m
   max-file: "3"
   command: initdb
   webserver:
   image: apache/airflow:latest
   restart: on-failure
   depends_on:
   - postgres
   environment:
   - 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
   - AIRFLOW__CORE__LOAD_EXAMPLES=True
   - AIRFLOW__CORE__EXECUTOR=LocalExecutor
   - AIRFLOW__WEBSERVER__BASE_URL=http://localhost:8080
   logging:
   options:
   max-size: 10m
   max-file: "3"
   #volumes:
   #- ./dags:/opt/airflow/dags
   #- ~/.aws:/home/airflow/.aws
   ports:
   - "8080:8080"
   command: webserver
   scheduler:
   image: apache/airflow:latest
   restart: on-failure
   depends_on:
   - postgres
   environment:
   - 
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
   - AIRFLOW__CORE__LOAD_EXAMPLES=True
   - AIRFLOW__CORE__EXECUTOR=LocalExecutor
   - AIRFLOW__WEBSERVER__BASE_URL=http://localhost:8080
   logging:
   options:
   max-size: 10m
   max-file: "3"
   #volumes:
   #- ./dags:/opt/airflow/dags
   #- ~/.aws:/home/airflow/.aws
   command: scheduler



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] nadflinn opened a new pull request #8695: Celery worker prefetch multiplier configurable

2020-05-03 Thread GitBox


nadflinn opened a new pull request #8695:
URL: https://github.com/apache/airflow/pull/8695


   There is some debate about whether Celery autoscale actually works, as 
discussed in the comments to this PR:
   https://github.com/apache/airflow/pull/3989#issuecomment-535882666
   
   This is also related to the discussion in this issue:
   https://github.com/apache/airflow/issues/8480
   
   I ran into this issue as well with Airflow (autoscale not working) and had a 
look at the celery code and I think the issue is that for the worker process 
count to grow this is dependent on the number of tasks the worker has claimed, 
known as the prefetch_count.  If the prefetch_count isn't above the worker 
process count, then the number of worker processes won't budge. It seems like a 
catch-22. Airflow runs into this problem because `worker_prefetch_multiplier` 
is set to 1 (and `task_acks_late` is set to True...setting this to False also 
bumps the prefetch_count).
   
   This issue can be worked around by setting the `worker_prefetch_multiplier` 
setting to an int greater than 1.  In this PR I included a note about the 
implications of this in the config and a link to relevant documentation.   
Currently in airflow `worker_prefetch_multiplier` is set to 1 so a worker can't 
prefetch and lay claim to more tasks than it has process workers.  So in theory 
setting this to 2 can get you into trouble if you have worker A that has 6 
processes and has grabbed 10 tasks and the 6 tasks it is working on are long 
running causing the other 4 tasks to be blocked.  Meanwhile worker B just 
finished up processing its own 6 tasks and is available to work on the 4 that 
are backed up on worker A but A has already claimed those tasks.  If you are 
running one worker, though, then this shouldn't be a problem.
   
   This PR makes `worker_prefetch_multiplier` configurable so that the user can 
get autoscale working if they feel that for their use case 
`worker_prefetch_multiplier` of greater than 1 won't be an issue.
   
   I also [opened up a Celery PR](https://github.com/celery/celery/pull/6069) 
with a suggested fix for this issue.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on pull request #8694: Add airflow config command (v1-10-test)

2020-05-03 Thread GitBox


kaxil commented on pull request #8694:
URL: https://github.com/apache/airflow/pull/8694#issuecomment-623184476


   Thanks for backporting the fixes but looks like the tests are failing.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] habibdhif commented on issue #8605: Add Production-ready docker compose for the production image

2020-05-03 Thread GitBox


habibdhif commented on issue #8605:
URL: https://github.com/apache/airflow/issues/8605#issuecomment-623182960


   Here is another example of a Docker Compose that I've been working on. The 
Compose defines multiple services to run Airflow. 
   There is an init service which is an ephemeral container to initialize the 
database and creates a user if necessary. 
   The init service command tries to run `airflow list_users` and if it fails 
it initializes the database and creates a user. Different approaches were 
considered but this one is simple enough and only involves airflow commands (no 
database-specific commands).
   
   Extension fields are used for airflow environment variables to reduce code 
duplication.
   
   I added a Makefile along the docker-compose.yml in my 
[repo](https://github.com/habibdhif/docker-compose-airflow) so all you have to 
do to run the docker-compose is run `make run`.
   
   ```yaml
   version: "3.7"
   x-airflow-environment: 
 AIRFLOW__CORE__EXECUTOR: CeleryExecutor
 AIRFLOW__WEBSERVER__RBAC: "True"
 AIRFLOW__CORE__LOAD_EXAMPLES: "False"
 AIRFLOW__CELERY__BROKER_URL: "redis://:@redis:6379/0"
 AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
   
   services:
 postgres:
   image: postgres:11.5
   environment:
 POSTGRES_USER: airflow
 POSTGRES_DB: airflow
 POSTGRES_PASSWORD: airflow
 redis:
   image: redis:5
   environment:
 REDIS_HOST: redis
 REDIS_PORT: 6379
   ports:
 - 6379:6379
 init:
   image: apache/airflow:1.10.10
   environment:
 <<: *airflow-environment
   depends_on:
 - redis
 - postgres
   volumes:
 - ./dags:/opt/airflow/dags
   entrypoint: /bin/bash
   command: >
 -c "airflow list_users || (airflow initdb
 && airflow create_user --role Admin --username airflow --password 
airflow -e airf...@airflow.com -f airflow -l airflow)"
   restart: on-failure
 webserver:
   image: apache/airflow:1.10.10
   ports:
 - 8080:8080
   environment:
 <<: *airflow-environment
   depends_on:
 - init
   volumes:
 - ./dags:/opt/airflow/dags
   command: "webserver"
   restart: always
 flower:
   image: apache/airflow:1.10.10
   ports:
 - :
   environment:
 <<: *airflow-environment
   depends_on:
 - redis
   command: flower
   restart: always
 scheduler:
   image: apache/airflow:1.10.10
   environment:
 <<: *airflow-environment
   depends_on:
 - webserver
   volumes:
 - ./dags:/opt/airflow/dags
   command: scheduler
   restart: always
 worker:
   image: apache/airflow:1.10.10
   environment:
 <<: *airflow-environment
   depends_on:
 - scheduler
   volumes:
 - ./dags:/opt/airflow/dags
   command: worker
   restart: always
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj edited a comment on pull request #8305: Guide for Apache Spark operators

2020-05-03 Thread GitBox


mik-laj edited a comment on pull request #8305:
URL: https://github.com/apache/airflow/pull/8305#issuecomment-623137703


   One more thing. Can you add this tutorial to `operators-and-hooks-ref.rst`? 
   https://github.com/apache/airflow/pull/8690
   I am working to automatically check that the list has been updated, but for 
now we must remember that. Your PR was an inspiration for me to make this 
check, so your next contribution will be easier.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-6440) AWS Fargate Executor (AIP-29) (WIP)

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098551#comment-17098551
 ] 

ASF GitHub Bot commented on AIRFLOW-6440:
-

stale[bot] commented on pull request #7030:
URL: https://github.com/apache/airflow/pull/7030#issuecomment-623172002


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AWS Fargate Executor (AIP-29) (WIP)
> ---
>
> Key: AIRFLOW-6440
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6440
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, executors
>Affects Versions: 1.10.8
> Environment: AWS Cloud
>Reporter: Ahmed Elzeiny
>Assignee: Ahmed Elzeiny
>Priority: Minor
>  Labels: AWS, Executor, autoscaling
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> h1. Links
> AIP - 
> [https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-29%3A+AWS+Fargate+Executor]
> PR - [https://github.com/apache/airflow/pull/7030]
> h1. Airflow on AWS Fargate
> {color:#707070}We propose the creation of a new Airflow Executor, called the 
> FargateExecutor, that runs tasks asynchronously on AWS Fargate. The Airflow 
> Scheduler comes up with a command that needs to be executed in some shell. A 
> Docker container parameterized with the command is passed in as an ARG, and 
> AWS Fargate provisions a new instance with . The container then completes or 
> fails the job, causing the container to die along with the Fargate instance. 
> The executor is responsible for keeping track what happened to the task with 
> an airflow task id and AWS ARN number, and based off of the instance exit 
> code we either say that the task succeeded or failed.{color}
> h1. Proposed Implementation
> As you could probably deduce, the underlying mechanism to launch, track, and 
> stop Fargate instances is AWS' Boto3 Library.
> To accomplish this we create a FargateExecutor under the "airflow.executors" 
> module. This class will extend from BaseExecutor and override 5 methods: 
> {{start()}}, {{{color:#3366ff}sync(){color}}},{{{color:#3366ff} 
> execute_async(){color}}}, {{{color:#3366ff}end(){color}}}, and 
> {{{color:#3366ff}terminate(){color}}}. Internally, the FargateExecutor uses 
> boto3 for monitoring and deployment purposes.
> {color:#707070}The three major Boto3 API calls are:{color}
>  * {color:#707070}The {color:#0747a6}{{execute_async()}}{color} function 
> calls boto3's {{{color:#0747a6}run_task(){color}}} function.{color}
>  * {color:#707070}The {{{color:#0747a6}sync(){color}}} function calls boto3's 
> {{{color:#0747a6}describe_tasks(){color}}} function.{color}
>  * {color:#707070}The {{{color:#0747a6}terminate(){color}}} function calls 
> boto3's {{{color:#0747a6}stop_task(){color}}} function.{color}
> h1. Maintenance
> The executor itself is nothing special since it mostly relies on overriding 
> the proper methods from .
> In general, AWS is fairly committed to keeping their APIs in service. Fargate 
> is rather new and I've personally perceived a lot more features added as 
> optional parameters over the course of the past year. However, the required 
> parameters for the three Boto3 calls that are used have remained the same. 
> I've also written test-cases that ensures that the Boto3 calls made are 
> complaint to the most current version of their APIs.
> We've also introduced a callback hook (very similar to the Celery Executor) 
> that allows users to launch tasks with their own parameters. Therefore if a 
> user doesn't like the default parameter options used in Boto3's 
> \{{run_task(),}}then they can call it themselves with whatever parameters 
> they want. This means that Airflow doesn't have to add a new configuration 
> everytime AWS makes an addition to AWS Fargate. It's just one configuration 
> to cover them all.
> h1. {color:#707070}Proposed Configuration{color}
>  
> {code:java}
> [fargate]
> # For more information on any of these execution parameters, see the link 
> below:
> # 
> https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.run_task
> # For boto3 credential management, see
> # 
> https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
> ### MANDATORY CONFIGS:
> # Name of region
> region = us-west-2
> # Name of cluster
> cluster = test-airflow
> ### EITHER POPULATE THESE:
> # Name 

[GitHub] [airflow] stale[bot] commented on pull request #7030: [AIRFLOW-6440][AIP-29] Add AWS Fargate Executor

2020-05-03 Thread GitBox


stale[bot] commented on pull request #7030:
URL: https://github.com/apache/airflow/pull/7030#issuecomment-623172002


   This issue has been automatically marked as stale because it has not had 
recent activity. It will be closed if no further activity occurs. Thank you for 
your contributions.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7065) Give StreamLogHandler ability to tee to log and console

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098545#comment-17098545
 ] 

ASF GitHub Bot commented on AIRFLOW-7065:
-

c-wilson commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419148387



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   It's possible, but I don't know if the result would be cleaner. Maybe 
you're right. This just serves as a compact way to pass the right stream to the 
StreamLogHandler when propagation is desired. This _could_ live in the 
task_command.py as if statements if we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give StreamLogHandler ability to tee to log and console
> ---
>
> Key: AIRFLOW-7065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7065
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 1.10.9
>Reporter: Christopher
>Assignee: Christopher
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> It would be helpful in some use case (ie log aggregation) to have Airflow 
> task output stream to console as well as go through the Airflow logger. 
> Currently stdout and stderr are captured and sent only to the logger - 
> putting a StreamLogHandler in the logging chain results in an infinite 
> recursion.
> I propose adding a configuration flag [core][task_console_output] that would 
> allow safe display of messages in the console as well as propagation to the 
> Airflow logging chain.
> I will put in a PR for this shortly and see how it goes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] c-wilson commented on a change in pull request #7740: [AIRFLOW-7065] Add optional propagation of task std streams to console

2020-05-03 Thread GitBox


c-wilson commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419148387



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   It's possible, but I don't know if the result would be cleaner. Maybe 
you're right. This just serves as a compact way to pass the right stream to the 
StreamLogHandler when propagation is desired. This _could_ live in the 
task_command.py as if statements if we want.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] habibdhif commented on pull request #8689: [AIRFLOW-8605] Added docker compose

2020-05-03 Thread GitBox


habibdhif commented on pull request #8689:
URL: https://github.com/apache/airflow/pull/8689#issuecomment-623166794


   > Note there's another PR for a docker-compose file: #8621
   
   Thanks for notifying me, closing the PR and moving this example under the 
#8605 issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] BasPH commented on pull request #8689: [AIRFLOW-8605] Added docker compose

2020-05-03 Thread GitBox


BasPH commented on pull request #8689:
URL: https://github.com/apache/airflow/pull/8689#issuecomment-623166409


   Note there's another PR for a docker-compose file: 
https://github.com/apache/airflow/pull/8621



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] c-wilson commented on pull request #7740: [AIRFLOW-7065] Add optional propagation of task std streams to console

2020-05-03 Thread GitBox


c-wilson commented on pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#issuecomment-623164289


   Hi @ashb I made some trivial cleanup changes to the ESTaskHandler but in 
reality I cannot find a better way to implement its functionality with the 
current (slightly deep) logging hierarchical pattern:
   
   ```airflow.LogStreamHandler > logging.logger > airflow.TaskLogHandler > 
logger.LogHandler > stream```
   
   In this flow, the ESTaskHandler really is just injecting a JSON formatter 
before the standard out stream as far as I can see. Seems that the sin of the 
ESTaskHandler is overloading the stdout and file writing paths in the same 
class which is a bit surprising.
   
   It is maybe a thankless and non-urgent task, but it may be worth a 
reevaluation of how the logging environment is composed at a broader level. 
When you enter a task context, factory functions could compose real stdlib 
`logging.Handler` instances instead of the current subclassing pattern. These 
could be composed such that they are pointed at desired streams (file or 
console) and have the desired formatters and could be attached to the logger 
via logger.addHandler. I'm not sure that this _actually_ cleaner/better, the 
current implementation obviously works, and I'm not sure how important it is 
that it be maximally readable/extensible.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7065) Give StreamLogHandler ability to tee to log and console

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098542#comment-17098542
 ] 

ASF GitHub Bot commented on AIRFLOW-7065:
-

c-wilson commented on pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#issuecomment-623164289


   Hi @ashb I made some trivial cleanup changes to the ESTaskHandler but in 
reality I cannot find a better way to implement its functionality with the 
current (slightly deep) logging hierarchical pattern:
   
   ```airflow.LogStreamHandler > logging.logger > airflow.TaskLogHandler > 
logger.LogHandler > stream```
   
   In this flow, the ESTaskHandler really is just injecting a JSON formatter 
before the standard out stream as far as I can see. Seems that the sin of the 
ESTaskHandler is overloading the stdout and file writing paths in the same 
class which is a bit surprising.
   
   It is maybe a thankless and non-urgent task, but it may be worth a 
reevaluation of how the logging environment is composed at a broader level. 
When you enter a task context, factory functions could compose real stdlib 
`logging.Handler` instances instead of the current subclassing pattern. These 
could be composed such that they are pointed at desired streams (file or 
console) and have the desired formatters and could be attached to the logger 
via logger.addHandler. I'm not sure that this _actually_ cleaner/better, the 
current implementation obviously works, and I'm not sure how important it is 
that it be maximally readable/extensible.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give StreamLogHandler ability to tee to log and console
> ---
>
> Key: AIRFLOW-7065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7065
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 1.10.9
>Reporter: Christopher
>Assignee: Christopher
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> It would be helpful in some use case (ie log aggregation) to have Airflow 
> task output stream to console as well as go through the Airflow logger. 
> Currently stdout and stderr are captured and sent only to the logger - 
> putting a StreamLogHandler in the logging chain results in an infinite 
> recursion.
> I propose adding a configuration flag [core][task_console_output] that would 
> allow safe display of messages in the console as well as propagation to the 
> Airflow logging chain.
> I will put in a PR for this shortly and see how it goes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #7740: [AIRFLOW-7065] Add optional propagation of task std streams to console

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419145400



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   ```suggestion
   class stdout_to_log(_StreamToLogRedirector):
   ```
   Python code convention suggests that context managers should be in 
snake_case. CamelCase is used when we want to emphasize that it is a class, but 
it is not important here.
   https://github.com/python/cpython/blob/3.8/Lib/contextlib.py#L336





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7065) Give StreamLogHandler ability to tee to log and console

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098539#comment-17098539
 ] 

ASF GitHub Bot commented on AIRFLOW-7065:
-

mik-laj commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419145400



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   ```suggestion
   class stdout_to_log(_StreamToLogRedirector):
   ```
   Python code convention suggests that context managers should be in 
snake_case. CamelCase is used when we want to emphasize that it is a class, but 
it is not important here.
   https://github.com/python/cpython/blob/3.8/Lib/contextlib.py#L336





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give StreamLogHandler ability to tee to log and console
> ---
>
> Key: AIRFLOW-7065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7065
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 1.10.9
>Reporter: Christopher
>Assignee: Christopher
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> It would be helpful in some use case (ie log aggregation) to have Airflow 
> task output stream to console as well as go through the Airflow logger. 
> Currently stdout and stderr are captured and sent only to the logger - 
> putting a StreamLogHandler in the logging chain results in an infinite 
> recursion.
> I propose adding a configuration flag [core][task_console_output] that would 
> allow safe display of messages in the console as well as propagation to the 
> Airflow logging chain.
> I will put in a PR for this shortly and see how it goes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-7065) Give StreamLogHandler ability to tee to log and console

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098538#comment-17098538
 ] 

ASF GitHub Bot commented on AIRFLOW-7065:
-

mik-laj commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419145161



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   Can we use ``contextlib.redirect_stdout`` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Give StreamLogHandler ability to tee to log and console
> ---
>
> Key: AIRFLOW-7065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7065
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: logging
>Affects Versions: 1.10.9
>Reporter: Christopher
>Assignee: Christopher
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> It would be helpful in some use case (ie log aggregation) to have Airflow 
> task output stream to console as well as go through the Airflow logger. 
> Currently stdout and stderr are captured and sent only to the logger - 
> putting a StreamLogHandler in the logging chain results in an infinite 
> recursion.
> I propose adding a configuration flag [core][task_console_output] that would 
> allow safe display of messages in the console as well as propagation to the 
> Airflow logging chain.
> I will put in a PR for this shortly and see how it goes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #7740: [AIRFLOW-7065] Add optional propagation of task std streams to console

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #7740:
URL: https://github.com/apache/airflow/pull/7740#discussion_r419145161



##
File path: airflow/utils/log/logging_mixin.py
##
@@ -118,6 +129,53 @@ def isatty(self):
 """
 return False
 
+def add_stream_target(self, target: IOBase):
+"""
+Adds a stream target to propagate messages to in addition to the 
provided logger.
+:param target: File like to write to.
+:return:
+"""
+if not hasattr(target, 'write'):
+raise TypeError('Stream target must be writeable.')
+
+self._additional_stream_targets.append(target)
+
+
+class _StreamToLogRedirector(AbstractContextManager):
+"""Context manager to redirect console stream to a StreamLogWriter"""
+stream_to_replace: str
+_existing_stream_target: List[IOBase]
+
+def __init__(self, logger: logging.Logger, level: int, 
propagate_to_existing_stream: bool = False):
+self.propagate_to_existing_stream = propagate_to_existing_stream
+self._existing_stream_target = []
+self._replacement_stream = StreamLogWriter(logger, level)
+
+def __enter__(self):
+"""Saves existing stream target and replaces it will this instance."""
+existing_stream = getattr(sys, self.stream_to_replace)
+self._existing_stream_target.append(existing_stream)
+
+if self.propagate_to_existing_stream:
+self._replacement_stream.add_stream_target(existing_stream)
+
+setattr(sys, self.stream_to_replace, self._replacement_stream)
+
+def __exit__(self, exc_type, exc_val, exc_tb):
+"""Puts back existing stream target"""
+self._replacement_stream.flush()
+setattr(sys, self.stream_to_replace, 
self._existing_stream_target.pop())
+
+
+class StdoutToLog(_StreamToLogRedirector):

Review comment:
   Can we use ``contextlib.redirect_stdout`` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] XD-DENG commented on pull request #8692: Sort connection type list in add/edit page alphabetically

2020-05-03 Thread GitBox


XD-DENG commented on pull request #8692:
URL: https://github.com/apache/airflow/pull/8692#issuecomment-623156345


   Thanks @zhongjiajie  and @mik-laj 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mustafagok commented on pull request #8581: Add google_api_to_s3_transfer example dags and system tests

2020-05-03 Thread GitBox


mustafagok commented on pull request #8581:
URL: https://github.com/apache/airflow/pull/8581#issuecomment-623156223


   I see that you have much bigger problems, I would like to suggest a small 
change, about naming "aws_system_helpers.py" as "amazon_system_helpers.py" 
since it contains `AmazonSystemTest` class and `@pytest.mark.system("amazon")`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8694: Add airfloww config command (v1-10-test)

2020-05-03 Thread GitBox


mik-laj opened a new pull request #8694:
URL: https://github.com/apache/airflow/pull/8694


   This PR is based on the following PRs:
   https://github.com/apache/airflow/pull/6840
   https://github.com/apache/airflow/pull/7117
   https://github.com/apache/airflow/pull/8404
   https://github.com/apache/airflow/pull/7413
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mustafagok commented on a change in pull request #8618: Add AWS EMR System tests

2020-05-03 Thread GitBox


mustafagok commented on a change in pull request #8618:
URL: https://github.com/apache/airflow/pull/8618#discussion_r419136967



##
File path: tests/providers/amazon/aws/operators/test_emr_system.py
##
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.test_utils.aws_system_helpers import AWS_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+from tests.utils.logging_command_executor import get_executor
+
+
+@pytest.fixture
+def create_emr_default_roles():
+"""Create EMR Default roles for running system test
+
+This will create the default IAM roles:
+- `EMR_EC2_DefaultRole`
+- `EMR_DefaultRole`
+"""
+executor = get_executor()
+executor.execute_cmd(["aws", "emr", "create-default-roles"])
+
+
+@pytest.mark.system("amazon.aws")

Review comment:
   As @potiuk specified in slack, it should refer to backport packages.
   
   Also, I think "aws_system_helpers.py" should be "amazon_system_helpers.py"
   
   ```suggestion
   @pytest.mark.system("amazon")
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mustafagok commented on a change in pull request #8618: Add AWS EMR System tests

2020-05-03 Thread GitBox


mustafagok commented on a change in pull request #8618:
URL: https://github.com/apache/airflow/pull/8618#discussion_r419136343



##
File path: tests/providers/amazon/aws/operators/test_emr_system.py
##
@@ -0,0 +1,47 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.test_utils.aws_system_helpers import AWS_DAG_FOLDER
+from tests.test_utils.system_tests_class import SystemTest
+from tests.utils.logging_command_executor import get_executor
+
+
+@pytest.fixture
+def create_emr_default_roles():
+"""Create EMR Default roles for running system test
+
+This will create the default IAM roles:
+- `EMR_EC2_DefaultRole`
+- `EMR_DefaultRole`
+"""
+executor = get_executor()
+executor.execute_cmd(["aws", "emr", "create-default-roles"])
+
+
+@pytest.mark.system("amazon.aws")
+@pytest.mark.usefixtures("create_emr_default_roles")
+class TestSystemAwsEmr(SystemTest):

Review comment:
   naming convention suggestion, I have named "ECSSystemTest", after 
checking google system test class names.
   
   ```suggestion
   class EmrSystemTest(SystemTest):
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #6075: [AIRFLOW-5266] Allow aws_athena_hook to get all query results

2020-05-03 Thread GitBox


mik-laj commented on pull request #6075:
URL: https://github.com/apache/airflow/pull/6075#issuecomment-623149528


   @lindsable Do you need any help?
   
   If you have applied all the comments of the reviewer, then it is worth 
asking to review again using a comment or PM on Slack. The commiters have a lot 
of work and are unable to check the status of every change.
   
  > Ping @ #development slack, comment @people. Be annoying. Be considerate.
   
  https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id33



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5266) AWS Athena Hook only returns first 1000 results

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098514#comment-17098514
 ] 

ASF GitHub Bot commented on AIRFLOW-5266:
-

mik-laj commented on pull request #6075:
URL: https://github.com/apache/airflow/pull/6075#issuecomment-623149528


   @lindsable Do you need any help?
   
   If you have applied all the comments of the reviewer, then it is worth 
asking to review again using a comment or PM on Slack. The commiters have a lot 
of work and are unable to check the status of every change.
   
  > Ping @ #development slack, comment @people. Be annoying. Be considerate.
   
  https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id33



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> AWS Athena Hook only returns first 1000 results
> ---
>
> Key: AIRFLOW-5266
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5266
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: aws, contrib, hooks
>Affects Versions: 1.10.4
>Reporter: Lindsay Portelli
>Assignee: Lindsay Portelli
>Priority: Minor
>  Labels: easyfix, newbie, pull-request-available
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> When using the get_query_results function in the AWSAthenaHook, you will only 
> get the first 1000 results for a given query execution id. See the [boto3 
> documentation|https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html?highlight=start_query_execution#Athena.Client.get_query_results]
> I'll be creating a pr in the next few days.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on pull request #7777: [AIRFLOW-Date_range compatible with schedule_interval]

2020-05-03 Thread GitBox


mik-laj commented on pull request #:
URL: https://github.com/apache/airflow/pull/#issuecomment-623148339


   @Rcharriol  Have you finished working on this change?  I see that Travis is 
sad, but in the meantime we migrated to Github Action. Can you do rebase?
   
   If you have applied all the comments of the reviewer, then it is worth 
asking to review again using a comment or through Slack. The commiters have a 
lot of work and are unable to remember every change.
   
   > Ping @ #development slack, comment @people. Be annoying. Be considerate.
   
   https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id33



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj edited a comment on pull request #7777: [AIRFLOW-Date_range compatible with schedule_interval]

2020-05-03 Thread GitBox


mik-laj edited a comment on pull request #:
URL: https://github.com/apache/airflow/pull/#issuecomment-623148339


   @Rcharriol  Have you finished working on this change?  I see that Travis is 
sad, but in the meantime we migrated to Github Action. Can you do rebase?
   
   If you have applied all the comments of the reviewer, then it is worth 
asking to review again using a comment or PM on Slack. The commiters have a lot 
of work and are unable to remember every change.
   
   > Ping @ #development slack, comment @people. Be annoying. Be considerate.
   
   https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id33



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-7068) Create EC2 Hook, Operator and Sensor

2020-05-03 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17098508#comment-17098508
 ] 

ASF GitHub Bot commented on AIRFLOW-7068:
-

mik-laj commented on pull request #7731:
URL: https://github.com/apache/airflow/pull/7731#issuecomment-623147774


   @mustafagok What is the status of this PR?  Do you need any help? If you 
have applied all the comments of the reviewer, then it is worth asking to 
review again using a comment or through Slack. The commiters has a lot of work 
and are unable to remember every PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create EC2 Hook, Operator and Sensor
> 
>
> Key: AIRFLOW-7068
> URL: https://issues.apache.org/jira/browse/AIRFLOW-7068
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws
>Affects Versions: 1.10.9
>Reporter: Mustafa Gök
>Assignee: Mustafa Gök
>Priority: Minor
>
> * New hook to interact with AWS EC2 Service.
>  * New operator to manage AWS EC2 instance and to change instance state by 
> applying given operation using boto3.
>  * New sensor to check the state of the AWS EC2 instance until state of the 
> instance become equal to the target state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on pull request #7731: [AIRFLOW-7068] Create EC2 Hook, Operator and Sensor

2020-05-03 Thread GitBox


mik-laj commented on pull request #7731:
URL: https://github.com/apache/airflow/pull/7731#issuecomment-623147774


   @mustafagok What is the status of this PR?  Do you need any help? If you 
have applied all the comments of the reviewer, then it is worth asking to 
review again using a comment or through Slack. The commiters has a lot of work 
and are unable to remember every PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8535: [Airflow-8439] Refactor test_variable_command.py

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on pull request #8535:
URL: https://github.com/apache/airflow/pull/8535#issuecomment-623146753


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8693: Carefully parse warning messages when building documentation

2020-05-03 Thread GitBox


mik-laj opened a new pull request #8693:
URL: https://github.com/apache/airflow/pull/8693


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] XD-DENG commented on pull request #8685: Fix Spark Conn Add/Edit Pages

2020-05-03 Thread GitBox


XD-DENG commented on pull request #8685:
URL: https://github.com/apache/airflow/pull/8685#issuecomment-623144773


   Hi @zhongjiajie , I have split the PR into two. The change to order 
connection types alphabetically is in PR 
https://github.com/apache/airflow/pull/8692
   
   The PR description, title, etc, of this PR are updated as well.
   
   Let me know if everything looks good to you?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] XD-DENG opened a new pull request #8692: Sort connection type list in add/edit page alphabetically

2020-05-03 Thread GitBox


XD-DENG opened a new pull request #8692:
URL: https://github.com/apache/airflow/pull/8692


   Currently the connection type list in the UI is sorted in the order of 
original `Connection._types`, which may be a bit inconvenient for users.
   
   It would be better if it can be sorted alphabetically.
   
   ## Pre-fix
   
![80909345-d0877280-8d27-11ea-8f5d-2ebfa6dbb1c9](https://user-images.githubusercontent.com/11539188/80920481-7d85dd80-8d70-11ea-9ca6-f11ab2cf558f.png)
   
   
   ## After Fix
   
   
![80909357-ded58e80-8d27-11ea-91f5-b9e82ac682fd](https://user-images.githubusercontent.com/11539188/80920485-81b1fb00-8d70-11ea-97f1-76e80abd46a7.png)
   
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Target Github ISSUE in description if exists
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] XD-DENG commented on pull request #8685: Fix Spark Conn Add/Edit Pages + Sort Connection Type Options

2020-05-03 Thread GitBox


XD-DENG commented on pull request #8685:
URL: https://github.com/apache/airflow/pull/8685#issuecomment-623142738


   > Code looking good, but should we separate PR? Seem sorting and adding miss 
type unrelated.
   
   @zhongjiajie two changes are both small so appended the 2nd change here. But 
I don't mind splitting them into two.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419129507



##
File path: airflow/config_templates/config.yml
##
@@ -366,6 +366,17 @@
   type: string
   example: "path.to.CustomXCom"
   default: "airflow.models.xcom.BaseXCom"
+- name: user_defined_execute_context

Review comment:
   I am not sure if the user should be able to overwrite this context 
manager. Can you provide use cases when necessary? 
   
   I recently thought a lot about a similar mechanism to be able to define a 
context that will allow authorization to 
[gcloud](https://github.com/apache/airflow/pull/8432), or [forward ports via 
SSH](https://github.com/apache/airflow/issues/8615). However, I am not 
convinced that this should be defined at the global level.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128640



##
File path: tests/task/context/test_current_context.py
##
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime, timedelta
+
+import pytest
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models.baseoperator import BaseOperator
+from airflow.operators.python import PythonOperator
+from airflow.task.context.current import get_current_context, 
set_current_context
+
+DEFAULT_ARGS = {
+"owner": "test",
+"depends_on_past": True,
+"start_date": datetime(2020, 4, 22),
+"retries": 1,
+"retry_delay": timedelta(minutes=1),
+}
+
+
+class TestCurrentContext:
+def test_current_context_no_context_raise(self):
+with pytest.raises(AirflowException):
+get_current_context()
+
+def test_current_context_roundtrip(self):
+example_context = {"Hello": "World"}
+
+with set_current_context(example_context):
+assert get_current_context() == example_context
+
+def test_context_removed_after_exit(self):
+example_context = {"Hello": "World"}
+
+with set_current_context(example_context):
+pass
+with pytest.raises(AirflowException, ):
+get_current_context()
+
+def test_nested_context(self):
+"""
+Nested execution context can occur in a few cases:
+1. User uses nested ctx managers.
+2. User is executing subdags and subruns

Review comment:
   This is not true for Airflow 2.0. In Airflow 2.0, each task is executed 
by a separate worker.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #8691: Task Instance will not be queued if `celery_executor` does not drop queue tasks on error.

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on issue #8691:
URL: https://github.com/apache/airflow/issues/8691#issuecomment-623140181


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] gdevanla opened a new issue #8691: Task Instance will not be queued if `celery_executor` does not drop queue tasks on error.

2020-05-03 Thread GitBox


gdevanla opened a new issue #8691:
URL: https://github.com/apache/airflow/issues/8691


   **Apache Airflow version**:
   1.10.3 
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   Python 3.7.4
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release):
   Ubuntu xenial/bionic
   - **Kernel** (e.g. `uname -a`): 
   Linux 4.15.0-45-generic #48~16.04.1-Ubuntu SMP Tue Jan 29 18:03:48 UTC 2019 
x86_64 x86_64 x86_64 GNU/Linux
   
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   The task_instance gets stuck in `scheduled` state because of inconsistency 
in expectations of how queued_tasks (that have failed to be queued successfully 
in CeleryExecutor) are to be handled. 
   
   Given a TaskInstance, `TI`, whose state is `None`, the following psuedo code 
is executed inside the 'scheduler_loop'. In this process, the `TI`, in some 
situations gets stuck in `scheduled` state
   
   (The indentations below depicts the call-stack)
   ```
   Given, a task_instance `TI`, in `state == None`,
   
   execute_helper (scheduler loop)
   (first iteration of the scheduler loop)
- calls `_execute_task_instances` 
   - calls `_find_executable_task_instances()` that returns `TI` that 
has state == `None`
   - calls  `_change_state_for_executable_task_instances` that updates 
`TI`s state = `queued`
   - calls `_enqueue_task_instances_with_queued_state`. This function 
adds `TI` to `Executor.queued_tasks` dictionary.
   - calls `CeleryExecutor.heartbeat`
   Tries to `send_task to worker`. If this succeeds, the `TI` is popped 
from `CeleryExecutor.queued_tasks`. But in our scenario, `CeleryExecutor`, just 
leaves the entry in`queued_tasks` intact beause either `Exception` was raised 
or `result` was `None`. The `CeleryExecutor` assumes the scheduler will handle 
this scenario. This is where the problem starts.(see second iteration below) 
(The link to this code is provided below)
   - calls `_change_state_for_tasks_failed_to_execute`.
This function notices that the `TI` entry in 
`CeleryExecutor.queued_tasks`, and assumes something went wrong and therefore 
correctly updates status of `TI` back to `scheduled`. Note, that the entry of 
`TI` still is in the `queued_tasks` and that causes the current issue (see 
second iteration below)
   
   - other maintenance activities happen in the scheduler loop  (not 
relevant to this issue)
   
   (second iteration of the scheduler loop)
   - calls `_execute_task_instances`
   - calls `_find_executable_task_instances()`.
   Now, this function is supposed to return `TI` since it is in 
`scheduled` state. But, it finds that an entry for `TI` already exists in 
`CeleryExecutor.queued_tasks` and therefore does not return `TI` (refer to link 
provided below which point to this case).  This means `TI` will never  be 
`queued` and is stuck in `scheduled` state. 
(https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033)
   ```
   The only workaround for this currently, is to restart the scheduler. When 
the scheduler is restarted, the `CeleryExecutor.queued_tasks` is reset and 
therefore the `TI` instance is `queued` again.
   
   The code where `queue_tasks` entry is updated by poping the TI is here:
   
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/executors/celery_executor.py#L223
   
   The code due to which `TI` gets stuck in `scheduled` state is here:
   
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033
   
   I think the code here should only check if `CeleryExecutor.running' 
dictionary has `TI` in its entries. But, I am not sure how it affects other 
schedulers.
   
   **What you expected to happen**:
   
   The `_find_executable_task_instances()` function, should only check if 
`CeleryExecutor.running` contains an entry for `TI` and return `TI` as part of 
its list of tasks to be queued.
   
   **How to reproduce it**:
   
   It can be reproduced by forcing the `result` value in 
`CeleryExecutor.heartbeat` to return an `ExceptionTraceback' object or `None`. 
   
   (Note: Links point to `master` branch. But, the problem applies to 1.10.3 
and higher versions)
   
   **Anything else we need to know**:
   
   I am not able to see a scenario where in `CeleryExecutor.heartbeart` the 
`result` is `None`.  Since, looking at the `Celery.app` module, it feels like 
the `result` can never be done. But, I suspect there are scenario's where the 
`result` is None and therefore the `CeleryExecutor` does not pop the `TI` from 
the queue. I am not able to prove this concretely.
   
   This also happens with later version's of Airflow. In the later version's of 
airflow, the `CeleryExecutor.trigger_dags' functions is 

[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128190



##
File path: airflow/models/taskinstance.py
##
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
 session.merge(self)
 session.commit()
 
+def get_user_defined_execute_context(self, execution_context):
+"""
+Retrieves the user defined execution context callback from the 
configuration,
+and validates that it is indeed a context manager
+:param execution_context: the current execution context to be passed 
to user ctx
+"""
+path_to_user_context = conf.get("core", "user_defined_execute_context")
+if path_to_user_context:
+try:
+imported = conf.getimport("core", 
"user_defined_execute_context")
+user_ctx_obj = imported(self, execution_context)

Review comment:
   Can you add a check here if the result of the getimport is None?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8651: [AIP-31] [AIRFLOW-8058] Retrieve current execution context without altering function's signature

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8651:
URL: https://github.com/apache/airflow/pull/8651#discussion_r419128100



##
File path: airflow/models/taskinstance.py
##
@@ -1110,6 +1116,27 @@ def signal_handler(signum, frame):
 session.merge(self)
 session.commit()
 
+def get_user_defined_execute_context(self, execution_context):
+"""
+Retrieves the user defined execution context callback from the 
configuration,
+and validates that it is indeed a context manager
+:param execution_context: the current execution context to be passed 
to user ctx
+"""
+path_to_user_context = conf.get("core", "user_defined_execute_context")

Review comment:
   This condition is not needed. getimport returns None when the option 
value is empty. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #8305: Guide for Apache Spark operators

2020-05-03 Thread GitBox


mik-laj commented on pull request #8305:
URL: https://github.com/apache/airflow/pull/8305#issuecomment-623137703


   One more thing. Can you add this tutorial to `operators-and-hooks-ref.rst`? 
   https://github.com/apache/airflow/pull/8690/files
   I am working to automatically check that the list has been updated, but for 
now we must remember that. Your PR was an inspiration for me to make this 
check, so your next contribution will be easier.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8305: Guide for Apache Spark operators

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8305:
URL: https://github.com/apache/airflow/pull/8305#discussion_r419126976



##
File path: docs/howto/operator/apache/spark.rst
##
@@ -0,0 +1,102 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Apache Spark Operators
+==
+
+.. contents::
+  :depth: 1
+  :local:
+
+Prerequisite
+
+
+To use ``SparkJDBCOperator`` and ``SparkSubmitOperator``, you must configure a 
:doc:`Spark Connection <../../connection/spark>`. For ``SparkJDBCOperator``, 
you must also configure a :doc:`JDBC connection <../../connection/jdbc>`.
+
+``SparkSqlOperator`` gets all the configurations from operator parameters.
+
+.. _howto/operator:SparkJDBCOperator:
+
+SparkJDBCOperator
+-
+
+Launches applications on a Apache Spark server, it uses 
``SparkSubmitOperator`` to perform data transfers to/from JDBC-based databases.
+
+For parameter definition take a look at 
:class:`~airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator`.
+
+Using the operator
+""
+
+Using ``cmd_type`` parameter, is possible to transfer data from Spark to a 
database (``spark_to_jdbc``) or from a database to Spark (``jdbc_to_spark``), 
which will write the table using the Spark command ``saveAsTable``.
+
+.. exampleinclude:: 
../../../../airflow/providers/apache/spark/example_dags/example_spark_dag.py
+:language: python
+:dedent: 4
+:start-after: [START howto_operator_spark_jdbc]
+:end-before: [END howto_operator_spark_jdbc]
+
+
+Reference
+"
+
+For further information, look at `Apache Spark DataFrameWriter documentation 
`_.
+
+.. _howto/operator:SparkSqlOperator:
+
+SparkSqlOperator
+
+
+Launches applications on a Apache Spark server, it requires that the 
``spark-sql`` script is in the PATH.
+The operator will run the SQL query on Spark Hive metastore service, the 
``sql`` parameter can be templated and be a ``.sql`` or ``.hql`` file.

Review comment:
   Can you add a link to API reference?
   ```
   :class:`~airflow.providers.apache.spark.operators.spark_sql.SparkSqlOperator`
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj opened a new pull request #8690: Check consistency between reference file and howto directory

2020-05-03 Thread GitBox


mik-laj opened a new pull request #8690:
URL: https://github.com/apache/airflow/pull/8690


   Depends on: https://github.com/apache/airflow/pull/8623
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8670: Include all imports in `import_module` block

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on pull request #8670:
URL: https://github.com/apache/airflow/pull/8670#issuecomment-623135851


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #8689: [AIRFLOW-8605] Added docker compose

2020-05-03 Thread GitBox


mik-laj commented on a change in pull request #8689:
URL: https://github.com/apache/airflow/pull/8689#discussion_r419126176



##
File path: IMAGES.rst
##
@@ -416,3 +416,22 @@ signals). This entrypoint works as follows:
 
 * If first argument is equal to "bash" - you are dropped in bash shell.
 * If there are any arguments they are passed to "airflow" command
+
+Docker Compose

Review comment:
   Should we add this documentation to docs/*.rst?  Otherwise this 
documentation will not be available to end users.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #8677: Remove _get_pretty_exception_message in PrestoHook

2020-05-03 Thread GitBox


boring-cyborg[bot] commented on pull request #8677:
URL: https://github.com/apache/airflow/pull/8677#issuecomment-623134852


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] pujaji commented on issue #8122: Guide about authorization and permission

2020-05-03 Thread GitBox


pujaji commented on issue #8122:
URL: https://github.com/apache/airflow/issues/8122#issuecomment-623128120


   Got it Totally!  
   
   Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >