[GitHub] [airflow] mik-laj commented on issue #6355: [AIRFLOW-5685] Loading AVRO file from GCS to BQ throwing ValueError

2019-10-16 Thread GitBox
mik-laj commented on issue #6355: [AIRFLOW-5685] Loading AVRO file from GCS to 
BQ throwing ValueError
URL: https://github.com/apache/airflow/pull/6355#issuecomment-543003552
 
 
   CC: @TobKed 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] danieltahara edited a comment on issue #4660: [AIRFLOW-3819] - Allow the configuration of a global default for work…

2019-10-16 Thread GitBox
danieltahara edited a comment on issue #4660: [AIRFLOW-3819] - Allow the 
configuration of a global default for work…
URL: https://github.com/apache/airflow/pull/4660#issuecomment-542982778
 
 
   I actually don't think this is a problem best solved at the Kubernetes 
level. Since LimitRanges are applied as a namespace default, the more tenants 
you have the less useful those ranges are going to be (because they'll have to 
be uselessly wide). There isn't a good reason that Airflow should be creating 
pods without memory limits or not providing that configuration option out of 
the box. It makes binpacking and cluster autoscaling much harder, since the 
pods will schedule effectively anywhere.
   
   Is there any chance of re-opening this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] danieltahara commented on issue #4660: [AIRFLOW-3819] - Allow the configuration of a global default for work…

2019-10-16 Thread GitBox
danieltahara commented on issue #4660: [AIRFLOW-3819] - Allow the configuration 
of a global default for work…
URL: https://github.com/apache/airflow/pull/4660#issuecomment-542982778
 
 
   I actually don't think this is a problem best solved at the Kubernetes 
level. Since LimitRanges are applied as a namespace default, the more tenants 
you have the less useful those ranges are going to be (because they'll have to 
be uselessly wide). There isn't a good reason that Airflow should be creating 
pods without memory limits or not providing that configuration option out of 
the box. It makes binpacking and cluster autoscaling much harder, since the 
pods will schedule effectively anywhere.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-5685) Loading AVRO file from GCS to BQ throwing ValueError

2019-10-16 Thread Ryan Yuan (Jira)
Ryan Yuan created AIRFLOW-5685:
--

 Summary: Loading AVRO file from GCS to BQ throwing ValueError
 Key: AIRFLOW-5685
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5685
 Project: Apache Airflow
  Issue Type: Bug
  Components: gcp
Affects Versions: 2.0.0
Reporter: Ryan Yuan
Assignee: Ryan Yuan


Using {{GoogleCloudStorageToBigQueryOperator to load AVRO file is causing 
exception as follows:}}

 

{{Traceback (most recent call last):
  File "/Users/ryanyuan/dev/airflow/airflow/models/taskinstance.py", line 932, 
in _run_raw_task
result = task_copy.execute(context=context)
  File "/Users/ryanyuan/dev/airflow/airflow/operators/gcs_to_bq.py", line 293, 
in execute
encryption_configuration=self.encryption_configuration)
  File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 1290, 
in run_load
backward_compatibility_configs)
  File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 2374, 
in _validate_src_fmt_configs
.format(k, source_format))*ValueError: skipLeadingRows is not a valid 
src_fmt_configs for type AVRO.*}}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-5685) Loading AVRO file from GCS to BQ throwing ValueError

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953296#comment-16953296
 ] 

ASF GitHub Bot commented on AIRFLOW-5685:
-

ryanyuan commented on pull request #6355: [AIRFLOW-5685] Loading AVRO file from 
GCS to BQ throwing ValueError
URL: https://github.com/apache/airflow/pull/6355
 
 
   Using GoogleCloudStorageToBigQueryOperator to load AVRO file is causing 
exception as follows:
   
Traceback (most recent call last):
   File "/Users/ryanyuan/dev/airflow/airflow/models/taskinstance.py", line 932, 
in _run_raw_task
   result = task_copy.execute(context=context)
   File "/Users/ryanyuan/dev/airflow/airflow/operators/gcs_to_bq.py", line 293, 
in execute
   encryption_configuration=self.encryption_configuration)
   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 1290, 
in run_load
   backward_compatibility_configs)
   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 2374, 
in _validate_src_fmt_configs
   .format(k, source_format))ValueError: skipLeadingRows is not a valid 
src_fmt_configs for type AVRO.
   
   The lines removed in this PR was originally from this 
[commit](https://github.com/apache/airflow/commit/dd36d90cadde765c6860a4a6bdeb6492a294a685#diff-ee06f8fcbc476ea65446a30160c2a2b2L1245).
 However, they have been brought back in this 
[commit](https://github.com/apache/airflow/commit/f4a658642917e377d36afd74730182d70178a981#diff-ee06f8fcbc476ea65446a30160c2a2b2R1240)
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following 
[Airflow-5685](https://issues.apache.org/jira/browse/AIRFLOW-5685) issues and 
references them in the PR title. 
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Loading AVRO file from GCS to BQ throwing ValueError
> 
>
> Key: AIRFLOW-5685
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5685
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Critical
>
> Using {{GoogleCloudStorageToBigQueryOperator to load AVRO file is causing 
> exception as follows:}}
>  
> {{Traceback (most recent call last):
>   File "/Users/ryanyuan/dev/airflow/airflow/models/taskinstance.py", line 
> 932, in _run_raw_task
> result = task_copy.execute(context=context)
>   File "/Users/ryanyuan/dev/airflow/airflow/operators/gcs_to_bq.py", line 
> 293, in execute
> encryption_configuration=self.encryption_configuration)
>   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 
> 1290, in run_load
> backward_compatibility_configs)
>   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 
> 2374, in _validate_src_fmt_configs
> .format(k, source_format))*ValueError: skipLeadingRows is not a valid 
> src_fmt_configs for type AVRO.*}}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ryanyuan opened a new pull request #6355: [AIRFLOW-5685] Loading AVRO file from GCS to BQ throwing ValueError

2019-10-16 Thread GitBox
ryanyuan opened a new pull request #6355: [AIRFLOW-5685] Loading AVRO file from 
GCS to BQ throwing ValueError
URL: https://github.com/apache/airflow/pull/6355
 
 
   Using GoogleCloudStorageToBigQueryOperator to load AVRO file is causing 
exception as follows:
   
Traceback (most recent call last):
   File "/Users/ryanyuan/dev/airflow/airflow/models/taskinstance.py", line 932, 
in _run_raw_task
   result = task_copy.execute(context=context)
   File "/Users/ryanyuan/dev/airflow/airflow/operators/gcs_to_bq.py", line 293, 
in execute
   encryption_configuration=self.encryption_configuration)
   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 1290, 
in run_load
   backward_compatibility_configs)
   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 2374, 
in _validate_src_fmt_configs
   .format(k, source_format))ValueError: skipLeadingRows is not a valid 
src_fmt_configs for type AVRO.
   
   The lines removed in this PR was originally from this 
[commit](https://github.com/apache/airflow/commit/dd36d90cadde765c6860a4a6bdeb6492a294a685#diff-ee06f8fcbc476ea65446a30160c2a2b2L1245).
 However, they have been brought back in this 
[commit](https://github.com/apache/airflow/commit/f4a658642917e377d36afd74730182d70178a981#diff-ee06f8fcbc476ea65446a30160c2a2b2R1240)
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following 
[Airflow-5685](https://issues.apache.org/jira/browse/AIRFLOW-5685) issues and 
references them in the PR title. 
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work started] (AIRFLOW-5685) Loading AVRO file from GCS to BQ throwing ValueError

2019-10-16 Thread Ryan Yuan (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-5685 started by Ryan Yuan.
--
> Loading AVRO file from GCS to BQ throwing ValueError
> 
>
> Key: AIRFLOW-5685
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5685
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Ryan Yuan
>Assignee: Ryan Yuan
>Priority: Critical
>
> Using {{GoogleCloudStorageToBigQueryOperator to load AVRO file is causing 
> exception as follows:}}
>  
> {{Traceback (most recent call last):
>   File "/Users/ryanyuan/dev/airflow/airflow/models/taskinstance.py", line 
> 932, in _run_raw_task
> result = task_copy.execute(context=context)
>   File "/Users/ryanyuan/dev/airflow/airflow/operators/gcs_to_bq.py", line 
> 293, in execute
> encryption_configuration=self.encryption_configuration)
>   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 
> 1290, in run_load
> backward_compatibility_configs)
>   File "/Users/ryanyuan/dev/airflow/airflow/gcp/hooks/bigquery.py", line 
> 2374, in _validate_src_fmt_configs
> .format(k, source_format))*ValueError: skipLeadingRows is not a valid 
> src_fmt_configs for type AVRO.*}}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542937526
 
 
   Alright, updated to add in the `airflow-version` label automatically


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] codecov-io commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
codecov-io commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542936295
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=h1) 
Report
   > Merging 
[#6351](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=desc) into 
[master](https://codecov.io/gh/apache/airflow/commit/ac42428bf530c259ab1b0dca08458c1ebf49b04a?src=pr=desc)
 will **decrease** coverage by `0.38%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/airflow/pull/6351/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=tree)
   
   ```diff
   @@Coverage Diff @@
   ##   master#6351  +/-   ##
   ==
   - Coverage   80.08%   79.69%   -0.39% 
   ==
 Files 616  616  
 Lines   3579735800   +3 
   ==
   - Hits2866828532 -136 
   - Misses   7129 7268 +139
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=tree) | 
Coverage Δ | |
   |---|---|---|
   | 
[airflow/operators/gcs\_to\_bq.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZ2NzX3RvX2JxLnB5)
 | `71.01% <100%> (+0.42%)` | :arrow_up: |
   | 
[airflow/gcp/hooks/bigquery.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9nY3AvaG9va3MvYmlncXVlcnkucHk=)
 | `70.27% <100%> (+0.08%)` | :arrow_up: |
   | 
[airflow/operators/mysql\_operator.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvbXlzcWxfb3BlcmF0b3IucHk=)
 | `0% <0%> (-100%)` | :arrow_down: |
   | 
[airflow/operators/mysql\_to\_hive.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvbXlzcWxfdG9faGl2ZS5weQ==)
 | `0% <0%> (-100%)` | :arrow_down: |
   | 
[airflow/executors/sequential\_executor.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9leGVjdXRvcnMvc2VxdWVudGlhbF9leGVjdXRvci5weQ==)
 | `47.61% <0%> (-52.39%)` | :arrow_down: |
   | 
[airflow/utils/log/colored\_log.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9sb2cvY29sb3JlZF9sb2cucHk=)
 | `72.72% <0%> (-20.46%)` | :arrow_down: |
   | 
[airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5)
 | `77.96% <0%> (-15.26%)` | :arrow_down: |
   | 
[airflow/executors/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9leGVjdXRvcnMvX19pbml0X18ucHk=)
 | `63.26% <0%> (-4.09%)` | :arrow_down: |
   | 
[airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==)
 | `56.23% <0%> (-2.67%)` | :arrow_down: |
   | 
[airflow/hooks/hive\_hooks.py](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9oaXZlX2hvb2tzLnB5)
 | `75.82% <0%> (-1.79%)` | :arrow_down: |
   | ... and [4 
more](https://codecov.io/gh/apache/airflow/pull/6351/diff?src=pr=tree-more) 
| |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=footer). 
Last update 
[ac42428...5ee32c5](https://codecov.io/gh/apache/airflow/pull/6351?src=pr=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335754928
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   Ref. `State.UP_FOR_RESCHEDULE`, this was added back in January by @seelmann 
as part of [AIRFLOW-2747](https://issues.apache.org/jira/browse/AIRFLOW-2747). 
Perhaps he has some thoughts on whether or not we can change the contract for 
rescheduled tasks to say that we retain XCom state? It's a fairly recent 
feature, so that might be okay, e.g. operators may not be depending on the 
current behaviour.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335754928
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   Ref. `State.UP_FOR_RESCHEDULE`, this was added back in January by @seelmann 
as part of [AIRFLOW-2747](https://issues.apache.org/jira/browse/AIRFLOW-2747). 
Perhaps he has some thoughts on whether or not we can tighten the contract for 
rescheduled tasks to say that we retain XCom state? It's a fairly recent 
feature, so that might be okay, e.g. operators may not be depending on the 
current behaviour.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335752238
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   > XCOM data appears to be cleared at start of each task. [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   > 
   > So when task restarts after reschedule, we lose the resource id.
   
   Ouch, that is quite a limitation for using XCom. Is there anywhere else to 
store task state? If not, some options:
   
   1. Use `mode='poke'`, at the expense of using up a task slot for 
long-running tasks. Not ideal, but gets "correct" behaviour. It'd then be 
"atomic" rather than "async" behaviour.
   
   1. Enhance `TaskInstance` to make clearing XCom data conditional by 
delegating it to the task/operator. There could be a new function like 
`BaseOperator.pre_execute_clear_state()` which can be overridden by 
implementers. `BaseOperator` is already aware of / coupled with `TaskInstance`, 
so I don't think we'd be breaking separation of concerns any more than it 
already is?
   
   1. There might be enough justification to say that for rescheduled tasks 
(i.e. transition from `State.UP_FOR_RESCHEDULE` to `State.RUNNING`) then 
`TaskInstance` shouldn't clear XCom. The call to `run()` -> 
`_check_and_change_state_before_execution()` does know about this state change, 
but I see in the code that there are places which bypass the call to `run()` 
and go directly to `_run_raw_task()` (e.g. the CLI).
   
   Seems that some form of #2 is the least risky way to get the desired outcome


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335752238
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   > XCOM data appears to be cleared at start of each task. [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   > 
   > So when task restarts after reschedule, we lose the resource id.
   
   Ouch, that is quite a limitation for using XCom. Is there anywhere else to 
store task state? If not, some options:
   
   1. Use `mode='poke'`, at the expense of using up a task slot for 
long-running tasks. Not ideal, but gets "correct" behaviour. It'd then be 
"atomic" rather than "async" behaviour.
   
   1. Enhance `TaskInstance` to make clearing XCom data conditional by 
delegating it to the task/operator. There could be a new function like 
`BaseOperator.pre_execute_clear_state()` which can be overridden by 
implementers. `BaseOperator` is already aware of / coupled with `TaskInstance`, 
so I don't think we'd be breaking separation of concerns any more than it 
already is?
   
   1. There might be enough justification to say that for rescheduled tasks 
(i.e. transition from `State.UP_FOR_RESCHEDULE` to `State.RUNNING`) then 
`TaskInstance` shouldn't clear XCom. The call to `run()` -> 
`_check_and_change_state_before_execution()` does know about this state change, 
but I see in the code that there are places which bypass the call to `run()` 
and go directly to `_run_raw_task()` (e.g. the CLI).
   
   Seems that some form of option 2 is the least risky way to get the desired 
outcome


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
JonnyIncognito commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335752238
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   > XCOM data appears to be cleared at start of each task. [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   > 
   > So when task restarts after reschedule, we lose the resource id.
   
   Ouch, that is quite a limitation for using XCom. Is there anywhere else to 
store task state? If not, some options:
   
   1. Use `mode='poke'`, at the expense of using up a task slot for 
long-running tasks. Not ideal, but gets "correct" behaviour. It'd then be 
"atomic" rather than "async" behaviour.
   
   1. Enhance `TaskInstance` to make clearing XCom data conditional by 
delegating it to the task/operator. There could be a new function like 
`BaseOperator.pre_execute_clear_state()` which can be overridden by 
implementers. `BaseOperator` is already aware of / coupled with `TaskInstance`, 
so I don't think we'd be breaking separation of concerns any more than it 
already is?
   
   1. There might be enough justification to say that for rescheduled tasks 
(i.e. transition from `State.UP_FOR_RESCHEDULE` to `State.RUNNING`) then 
`TaskInstance` shouldn't clear XCom. The call to `run()` -> 
`_check_and_change_state_before_execution()` does know about this state change, 
but I see in the code that there are places which bypass the call to `run()` 
and go directly to `_run_raw_task()` (e.g. the CLI).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] RosterIn commented on a change in pull request #5694: [AIRFLOW-5082] add subject in aws sns hook

2019-10-16 Thread GitBox
RosterIn commented on a change in pull request #5694: [AIRFLOW-5082] add 
subject in aws sns hook
URL: https://github.com/apache/airflow/pull/5694#discussion_r335727498
 
 

 ##
 File path: tests/contrib/hooks/test_aws_sns_hook.py
 ##
 @@ -42,9 +42,10 @@ def test_publish_to_target(self):
 
 message = "Hello world"
 topic_name = "test-topic"
+subject = "test-subject"
 target = hook.get_conn().create_topic(Name=topic_name).get('TopicArn')
 
-response = hook.publish_to_target(target, message)
+response = hook.publish_to_target(target, message, subject)
 
 Review comment:
   @shaikshakeel I'm also interested in this enhancement. Are u proceeding with 
it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5664) postgres_to_gcs operator drops milliseconds from timestamps

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953215#comment-16953215
 ] 

ASF GitHub Bot commented on AIRFLOW-5664:
-

osule commented on pull request #6354: [AIRFLOW-5664] Store timestamps with 
microseconds precision
URL: https://github.com/apache/airflow/pull/6354
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/5664) issues and references 
them in the PR title. ~For example, "\[AIRFLOW-XXX\] My Airflow PR"~
 - https://issues.apache.org/jira/browse/AIRFLOW-5664
 - ~In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.~
 - ~In case you are proposing a fundamental code change, you need to create 
an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).~
 - ~In case you are adding a dependency, check if the license complies with 
the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).~
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:~
   
   Microseconds value is lost in the conversion to timestamp using time.mktime.
   
   Timestamp is now computed to be precise up to microseconds.
   
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason: Synonymous with testing pendulum fluent helpers. 
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### ~Documentation~
   
   - [ ] ~In case of new functionality, my PR adds documentation that describes 
how to use it.~
 - ~All the public functions and the classes in the PR contain docstrings 
that explain what it does~
 - ~If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release~
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> postgres_to_gcs operator drops milliseconds from timestamps
> ---
>
> Key: AIRFLOW-5664
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5664
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.5
>Reporter: Joseph
>Assignee: Oluwafemi Sule
>Priority: Blocker
>
> Postgres stores timestamps with microsecond resolution. When using the 
> postgres_to_gcs operator, timestamps are converted to epoch/unix time using 
> the datetime.timetuple() method. This method drops the microseconds and so 
> you'll end up with a storage object that looks like this:
> {code:java}
> {"id": 1, "last_modified": 1571038537.0}
> {"id": 2, "last_modified": 1571038537.0}
> {"id": 3, "last_modified": 1571038537.0}
> {code}
> When it should look like this:
> {code:java}
> {"id": 1, "last_modified": 1571038537.123}
> {"id": 2, "last_modified": 1571038537.400}
> {"id": 3, "last_modified": 1571038537.455}
> {code}
> It would be useful to keep the timestamps' full resolution.
> I believe the same issue may occur with airflow.operators.mysql_to_gcs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] osule opened a new pull request #6354: [AIRFLOW-5664] Store timestamps with microseconds precision

2019-10-16 Thread GitBox
osule opened a new pull request #6354: [AIRFLOW-5664] Store timestamps with 
microseconds precision
URL: https://github.com/apache/airflow/pull/6354
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/5664) issues and references 
them in the PR title. ~For example, "\[AIRFLOW-XXX\] My Airflow PR"~
 - https://issues.apache.org/jira/browse/AIRFLOW-5664
 - ~In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.~
 - ~In case you are proposing a fundamental code change, you need to create 
an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).~
 - ~In case you are adding a dependency, check if the license complies with 
the [ASF 3rd Party License 
Policy](https://www.apache.org/legal/resolved.html#category-x).~
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:~
   
   Microseconds value is lost in the conversion to timestamp using time.mktime.
   
   Timestamp is now computed to be precise up to microseconds.
   
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason: Synonymous with testing pendulum fluent helpers. 
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### ~Documentation~
   
   - [ ] ~In case of new functionality, my PR adds documentation that describes 
how to use it.~
 - ~All the public functions and the classes in the PR contain docstrings 
that explain what it does~
 - ~If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release~
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6285: [AIRFLOW-XXX] Updates to Breeze documentation from GSOD

2019-10-16 Thread GitBox
mik-laj commented on issue #6285: [AIRFLOW-XXX] Updates to Breeze documentation 
from GSOD
URL: https://github.com/apache/airflow/pull/6285#issuecomment-542900700
 
 
   @feluelle I have twin brother ;-D  higrys is a nickname. 
https://twitter.com/higrys


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (AIRFLOW-5664) postgres_to_gcs operator drops milliseconds from timestamps

2019-10-16 Thread Oluwafemi Sule (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oluwafemi Sule reassigned AIRFLOW-5664:
---

Assignee: Oluwafemi Sule

> postgres_to_gcs operator drops milliseconds from timestamps
> ---
>
> Key: AIRFLOW-5664
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5664
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.5
>Reporter: Joseph
>Assignee: Oluwafemi Sule
>Priority: Blocker
>
> Postgres stores timestamps with microsecond resolution. When using the 
> postgres_to_gcs operator, timestamps are converted to epoch/unix time using 
> the datetime.timetuple() method. This method drops the microseconds and so 
> you'll end up with a storage object that looks like this:
> {code:java}
> {"id": 1, "last_modified": 1571038537.0}
> {"id": 2, "last_modified": 1571038537.0}
> {"id": 3, "last_modified": 1571038537.0}
> {code}
> When it should look like this:
> {code:java}
> {"id": 1, "last_modified": 1571038537.123}
> {"id": 2, "last_modified": 1571038537.400}
> {"id": 3, "last_modified": 1571038537.455}
> {code}
> It would be useful to keep the timestamps' full resolution.
> I believe the same issue may occur with airflow.operators.mysql_to_gcs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that 
propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542878228
 
 
   > How about we check whether any of the leaf nodes in the subdag is 
`skipped` and propagate that to parent DAG so that we don't need to specify the 
`essential tasks`?
   
   That would be better. I agree.  
   
   > I would recommend adding the logic in the `SubDagOperator` and add a flag 
to enable this feature instead of adding another new operator. I think it would 
be easier for existing DAGs that uses `SubDagOperator` to enable this feature.
   
   If we don't need the essential tasks function we don't need a class anymore 
hence we go back to a function that returns a DAG - better  . (I only added a 
class because I wanted to specify the essential tasks per subdag).
   
   A flag makes also sense  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335692036
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,183 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in a derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+  Use ``@apply_defaults`` decorator function to fill unspecified arguments 
with default_args.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+
+from airflow.utils.decorators import apply_defaults
+
+class HelloOperator(BaseOperator):
+
+@apply_defaults
+def __init__(
+self,
+name: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+
+def execute(self, context):
+message = "Hello {}".format(name)
+print(message)
+return message
+
+You can now use the derived custom operator as follows:
+
+.. code:: python
+
+hello_task = HelloOperator(task_id='sample-task', dag=dag, name='foo_bar')
+
+Hooks
+^
+Hooks act as an interface to communicate with the external shared resources in 
a DAG.
+For example, multiple tasks in a DAG can require access to a MySQL database. 
Instead of
+creating a connection per task, you can retrieve a connection from the hook 
and utilize it.
+Hook also helps to avoid storing connection auth parameters in a DAG. 
+See :doc:`../connection/index` for how to create and manage connections.
+
+Let's extend our previous example to fetch name from MySQL:
+
+.. code:: python
+
+class HelloDBOperator(BaseOperator):
+
+@apply_defaults
+def __init__(
+self,
+name: str,
+conn_id: str,
+database: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+self.conn_id = conn_id
+self.database = database
+
+def execute(self, context):
+hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
+ schema=self.database)
+sql = "select name from user"
+result = hook.get_first(sql)
+message = "Hello {}".format(result['name'])
+print(message)
+return message
+
+When the operator invokes the query on the hook object, a new connection gets 
created if it doesn't exist. 
+The hook retrieves the auth parameters such as username and password from 
Airflow
+backend and passes the params to the 
:py:func:`airflow.hooks.base_hook.BaseHook.get_connection`. 
+
+
+User interface
+^^^
+Airflow also allows the developer to control how the operator shows up in the 
DAG UI.
+Override ``ui_color`` to change the background color of the operator in UI. 
+Override ``ui_fgcolor`` to change the color of the label.
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+ui_color = '#ff'
+ui_fgcolor = '#00'
+
+
+Templating
+^^^
+You can use :ref:`Jinja templates ` to parameterize your 
operator.
+Airflow considers the field names present in ``template_fields``  for 
templating while rendering
+the operator.
+
+.. code:: python
+
+class 

[GitHub] [airflow] milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator 
that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542875495
 
 
   How about we check whether any of the leaf nodes in the subdag is `skipped` 
and propagate that to parent DAG so that we don't need to specify the 
`essential tasks`?
   
   I would recommend adding the logic in the `SubDagOperator` and add a flag to 
enable this feature instead of adding another new operator. I think it would be 
easier for existing DAGs that uses `SubDagOperator` to enable this feature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542874196
 
 
   Yes, (we only need to do it for the `skipped` state, because other states 
like success or failed propagate automatically)
   
   _You can also define a list of essential tasks_


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that 
propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542874196
 
 
   Yes, (we only need to do it for the `skipped` state, because other states 
like success or failed propagate automatically)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] TobKed commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
TobKed commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542873791
 
 
   1. @mik-laj is right, it will be good idea to set default label to something 
like:
   ```
   from airflow.version import version as airflow_version
   _AIRFLOW_VERSION = 'v' + airflow_version.replace('.', '-').replace('+', '-')
   ```
   2. We have plan to refactor BQ hooks and operators.
   Discussion is here: https://github.com/PolideaInternal/airflow/issues/231
   One of the idea is to pass both the 
`google.cloud.bigquery.job.QueryJobConfig` class as well as dict 
representation. (see also: 
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator 
that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542873507
 
 
   So the PR is that we propagate the state of essential tasks in the subdag to 
the parent DAG?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542871719
 
 
   > 
   > 
   > > Currently there is no way of telling the parent dag of a sub dag that an 
essential task has been skipped.
   > 
   > Currently when all tasks in a subdag finished (either `success` or 
`skipped`), the parent DAG would continue with the tasks that depends on such 
subdag right? So I am not sure why we need to introduce `essential task` here.
   
   Yes and that is the point. You cannot differ between `success` or `skipped`. 
So the actual issue might be that there is no DAG state that indicates skipped 
- your are "loosing" this information when exiting the subdag.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6341: Updated airflow

2019-10-16 Thread GitBox
mik-laj commented on issue #6341: Updated airflow
URL: https://github.com/apache/airflow/pull/6341#issuecomment-542872654
 
 
   Can you tell a little more about this change?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542871719
 
 
   > 
   > 
   > > Currently there is no way of telling the parent dag of a sub dag that an 
essential task has been skipped.
   > 
   > Currently when all tasks in a subdag finished (either `success` or 
`skipped`), the parent DAG would continue with the tasks that depends on such 
subdag right? So I am not sure why we need to introduce `essential task` here.
   
   Yes and that is the point. You cannot differ between `success` or `skipped`. 
So the actual issue might be that there is no DAG state that indicates skipped.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle edited a comment on issue #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542871719
 
 
   > 
   > 
   > > Currently there is no way of telling the parent dag of a sub dag that an 
essential task has been skipped.
   > 
   > Currently when all tasks in a subdag finished (either `success` or 
`skipped`), the parent DAG would continue with the tasks that depends on such 
subdag right? So I am not sure why we need to introduce `essential task` here.
   
   Yes and that is the point. You cannot differ between `success` or `skipped`. 
So the actual issue is that there is no DAG state that indicates skipped.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that 
propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542871719
 
 
   > 
   > 
   > > Currently there is no way of telling the parent dag of a sub dag that an 
essential task has been skipped.
   > 
   > Currently when all tasks in a subdag finished (either `success` or 
`skipped`), the parent DAG would continue with the tasks that depends on such 
subdag right? So I am not sure why we need to introduce `essential task` here.
   
   Yes and that is the point. You cannot differ between `success` or `skipped`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
milton0825 commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator 
that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542870443
 
 
   > Currently there is no way of telling the parent dag of a sub dag that an 
essential task has been skipped.
   
   Currently when all tasks in a subdag finished (either `success` or 
`skipped`), the parent DAG would continue with the tasks that depends on such 
subdag right? So I am not sure why we need to introduce `essential task` here.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5684) Docker-compose-kubernetes still used by breeze

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953148#comment-16953148
 ] 

ASF GitHub Bot commented on AIRFLOW-5684:
-

potiuk commented on pull request #6353: [AIRFLOW-5684] 
docker-compose-kubernetes still used
URL: https://github.com/apache/airflow/pull/6353
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
 - https://issues.apache.org/jira/browse/AIRFLOW-5684
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Docker-compose-kubernetes still used by breeze
> --
>
> Key: AIRFLOW-5684
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5684
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: ci
>Affects Versions: 2.0.0, 1.10.5
>Reporter: Jarek Potiuk
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] potiuk opened a new pull request #6353: [AIRFLOW-5684] docker-compose-kubernetes still used

2019-10-16 Thread GitBox
potiuk opened a new pull request #6353: [AIRFLOW-5684] 
docker-compose-kubernetes still used
URL: https://github.com/apache/airflow/pull/6353
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
 - https://issues.apache.org/jira/browse/AIRFLOW-5684
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-5684) Docker-compose-kubernetes still used by breeze

2019-10-16 Thread Jarek Potiuk (Jira)
Jarek Potiuk created AIRFLOW-5684:
-

 Summary: Docker-compose-kubernetes still used by breeze
 Key: AIRFLOW-5684
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5684
 Project: Apache Airflow
  Issue Type: Bug
  Components: ci
Affects Versions: 1.10.5, 2.0.0
Reporter: Jarek Potiuk






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542858230
 
 
   https://user-images.githubusercontent.com/12058428/66952258-904af600-f05c-11e9-9f62-cf7f74992a52.png;>
   
   Currently, this integration is being analyzed and will be refactored later 
to make it comply with the guide. We want to ensure similar behavior for all 
GCP integrations, but BigQuery just isn't done yet. 
   
   @TobKed Do you want to add something? I have the impression that you are 
responsible for this integration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542855432
 
 
   Also it seems the other BQ-related operators that support labels don't pass 
in the Airflow version as a default label.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] lyallcooper edited a comment on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
lyallcooper edited a comment on issue #6351: [AIRFLOW-5682] Allow labels in 
gcs_to_bq operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542853862
 
 
   @mik-laj sorry, I don't seem to have access to that google doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
lyallcooper commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542853862
 
 
   Sorry, I don't seem to have access to that google doc.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle commented on issue #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that 
propagates skipped state
URL: https://github.com/apache/airflow/pull/6352#issuecomment-542848213
 
 
   @milton0825 I requested you to review because I think you know the most 
about subdags :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5683) Add SubDagOperator that propagates skipped state

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953115#comment-16953115
 ] 

ASF GitHub Bot commented on AIRFLOW-5683:
-

feluelle commented on pull request #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352
 
 
   ### Jira
   
 - https://issues.apache.org/jira/browse/AIRFLOW-5683
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   Currently there is no way of telling the parent dag of a sub dag that an 
_essential_ task has been **skipped**. This PR addresses this issue by adding a 
new BaseSubDag model that is being used in an _advanced_ SubDagOperator. If you 
think it would make more sense to add it directly to the SubDagOperator class I 
will do it, but for now I wanted to make it backwards compatible.
   
   I also understand that this adds a new concept when dealing with subdags. So 
I am total fine with it if you think this does not fit into the official 
airflow code. It probably can be designed better than this. (And also the 
naming `Advanced` :D)
   
   **Please let me know what you guys think about this.**
   
   _Background story:_ 
https://lists.apache.org/thread.html/0eefd459a502c5100d792416f8ba720302aa49a8906fe6ea4ec8fca4@%3Cdev.airflow.apache.org%3E
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add SubDagOperator that propagates skipped state
> 
>
> Key: AIRFLOW-5683
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5683
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Felix Uellendall
>Assignee: Felix Uellendall
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] feluelle opened a new pull request #6352: [WIP][AIRFLOW-5683] Add SubDagOperator that propagates skipped state

2019-10-16 Thread GitBox
feluelle opened a new pull request #6352: [WIP][AIRFLOW-5683] Add 
SubDagOperator that propagates skipped state
URL: https://github.com/apache/airflow/pull/6352
 
 
   ### Jira
   
 - https://issues.apache.org/jira/browse/AIRFLOW-5683
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   Currently there is no way of telling the parent dag of a sub dag that an 
_essential_ task has been **skipped**. This PR addresses this issue by adding a 
new BaseSubDag model that is being used in an _advanced_ SubDagOperator. If you 
think it would make more sense to add it directly to the SubDagOperator class I 
will do it, but for now I wanted to make it backwards compatible.
   
   I also understand that this adds a new concept when dealing with subdags. So 
I am total fine with it if you think this does not fit into the official 
airflow code. It probably can be designed better than this. (And also the 
naming `Advanced` :D)
   
   **Please let me know what you guys think about this.**
   
   _Background story:_ 
https://lists.apache.org/thread.html/0eefd459a502c5100d792416f8ba720302aa49a8906fe6ea4ec8fca4@%3Cdev.airflow.apache.org%3E
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542845749
 
 
   Sample PR: https://github.com/apache/airflow/pull/6296/files


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
mik-laj commented on issue #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq 
operator
URL: https://github.com/apache/airflow/pull/6351#issuecomment-542845497
 
 
   According to the integration guide, you should also add default labels with 
the Airflow version
   
https://docs.google.com/document/d/1_rTdJSLCt0eyrAylmmgYc3yZr-_h51fVlnvMmWqhCkY/edit?ts=5bb72dfd#


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5682) Add support for labels in gcs_to_bq operator

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953111#comment-16953111
 ] 

ASF GitHub Bot commented on AIRFLOW-5682:
-

lyallcooper commented on pull request #6351: [AIRFLOW-5682] Allow labels in 
gcs_to_bq operator
URL: https://github.com/apache/airflow/pull/6351
 
 
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [X] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW-5682) issues and references 
them in the PR title.
   
   ### Description
   
   - [X] Adds support for labels when using the GoogleCloudStorageToBigQuery
   operator.
   
   ### Tests
   
   - [X] My PR adds the following unit test: `test_bigquery.TestLabelsInRunLoad`
   
   ### Commits
   
   - [X] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [X] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for labels in gcs_to_bq operator
> 
>
> Key: AIRFLOW-5682
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5682
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 1.10.5
>Reporter: Lyall Cooper
>Assignee: Lyall Cooper
>Priority: Minor
>
> Other BigQuery related operators, such as {{bigquery_to_gcs}} and 
> {{bigquery_to_bigquery}} support BigQuery's "labels" feature, which allows 
> callers to attach arbitrary key-value tags onto the jobs/queries. However, 
> the {{gcs_to_bq}} operator does not currently support this functionality, 
> which would be nice to have.
>  
> Currently a workaround is to use the underlying {{BigQueryHook}} to create a 
> load job from GCS to BQ, but that's not ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] lyallcooper opened a new pull request #6351: [AIRFLOW-5682] Allow labels in gcs_to_bq operator

2019-10-16 Thread GitBox
lyallcooper opened a new pull request #6351: [AIRFLOW-5682] Allow labels in 
gcs_to_bq operator
URL: https://github.com/apache/airflow/pull/6351
 
 
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [X] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW-5682) issues and references 
them in the PR title.
   
   ### Description
   
   - [X] Adds support for labels when using the GoogleCloudStorageToBigQuery
   operator.
   
   ### Tests
   
   - [X] My PR adds the following unit test: `test_bigquery.TestLabelsInRunLoad`
   
   ### Commits
   
   - [X] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [X] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335653352
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,183 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in a derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+  Use ``@apply_defaults`` decorator function to fill unspecified arguments 
with default_args.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+
+from airflow.utils.decorators import apply_defaults
+
+class HelloOperator(BaseOperator):
+
+@apply_defaults
+def __init__(
+self,
+name: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+
+def execute(self, context):
+message = "Hello {}".format(name)
+print(message)
+return message
+
+You can now use the derived custom operator as follows:
+
+.. code:: python
+
+hello_task = HelloOperator(task_id='sample-task', dag=dag, name='foo_bar')
+
+Hooks
+^
+Hooks act as an interface to communicate with the external shared resources in 
a DAG.
+For example, multiple tasks in a DAG can require access to a MySQL database. 
Instead of
+creating a connection per task, you can retrieve a connection from the hook 
and utilize it.
+Hook also helps to avoid storing connection auth parameters in a DAG. 
+See :doc:`../connection/index` for how to create and manage connections.
+
+Let's extend our previous example to fetch name from MySQL:
+
+.. code:: python
+
+class HelloDBOperator(BaseOperator):
+
+@apply_defaults
+def __init__(
+self,
+name: str,
+conn_id: str,
+database: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+self.conn_id = conn_id
+self.database = database
+
+def execute(self, context):
+hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
+ schema=self.database)
+sql = "select name from user"
+result = hook.get_first(sql)
+message = "Hello {}".format(result['name'])
+print(message)
+return message
+
+When the operator invokes the query on the hook object, a new connection gets 
created if it doesn't exist. 
+The hook retrieves the auth parameters such as username and password from 
Airflow
+backend and passes the params to the 
:py:func:`airflow.hooks.base_hook.BaseHook.get_connection`. 
+
+
+User interface
+^^^
+Airflow also allows the developer to control how the operator shows up in the 
DAG UI.
+Override ``ui_color`` to change the background color of the operator in UI. 
+Override ``ui_fgcolor`` to change the color of the label.
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+ui_color = '#ff'
+ui_fgcolor = '#00'
+
+
+Templating
+^^^
+You can use :ref:`Jinja templates ` to parameterize your 
operator.
+Airflow considers the field names present in ``template_fields``  for 
templating while rendering
+the operator.
+
+.. code:: python
+
+class 

[GitHub] [airflow] mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335651696
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,183 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in a derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+  Use ``@apply_defaults`` decorator function to fill unspecified arguments 
with default_args.
 
 Review comment:
   ```suggestion
 Use ``@apply_defaults`` decorator function to fill unspecified arguments 
with ``default_args``.
   ```
   I think it's worth pointing out where the default_args parameter is 
specified.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
mik-laj commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335651246
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,177 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+
+def __init__(
 
 Review comment:
   I think if it's worth adding a second import to BaseOperator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335648181
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,177 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+
+def __init__(
 
 Review comment:
   Is this fine or should I remove the import?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Work started] (AIRFLOW-5683) Add SubDagOperator that propagates skipped state

2019-10-16 Thread Felix Uellendall (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-5683 started by Felix Uellendall.
-
> Add SubDagOperator that propagates skipped state
> 
>
> Key: AIRFLOW-5683
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5683
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: operators
>Affects Versions: 1.10.6
>Reporter: Felix Uellendall
>Assignee: Felix Uellendall
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5683) Add SubDagOperator that propagates skipped state

2019-10-16 Thread Felix Uellendall (Jira)
Felix Uellendall created AIRFLOW-5683:
-

 Summary: Add SubDagOperator that propagates skipped state
 Key: AIRFLOW-5683
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5683
 Project: Apache Airflow
  Issue Type: New Feature
  Components: operators
Affects Versions: 1.10.6
Reporter: Felix Uellendall
Assignee: Felix Uellendall






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5682) Add support for labels in gcs_to_bq operator

2019-10-16 Thread Lyall Cooper (Jira)
Lyall Cooper created AIRFLOW-5682:
-

 Summary: Add support for labels in gcs_to_bq operator
 Key: AIRFLOW-5682
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5682
 Project: Apache Airflow
  Issue Type: Improvement
  Components: gcp
Affects Versions: 1.10.5
Reporter: Lyall Cooper
Assignee: Lyall Cooper


Other BigQuery related operators, such as {{bigquery_to_gcs}} and 
{{bigquery_to_bigquery}} support BigQuery's "labels" feature, which allows 
callers to attach arbitrary key-value tags onto the jobs/queries. However, the 
{{gcs_to_bq}} operator does not currently support this functionality, which 
would be nice to have.

 

Currently a workaround is to use the underlying {{BigQueryHook}} to create a 
load job from GCS to BQ, but that's not ideal.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ToxaZ commented on a change in pull request #6336: [AIRFLOW-5073] Change SQLSensor to optionally treat NULL as keep poking

2019-10-16 Thread GitBox
ToxaZ commented on a change in pull request #6336: [AIRFLOW-5073] Change 
SQLSensor to optionally treat NULL as keep poking
URL: https://github.com/apache/airflow/pull/6336#discussion_r335627167
 
 

 ##
 File path: airflow/sensors/sql_sensor.py
 ##
 @@ -103,5 +103,5 @@ def poke(self, context):
 else:
 raise AirflowException("self.success is present, but not 
callable -> {}".format(self.success))
 if self.allow_null:
-return str(first_cell) not in ('0', '')
-return str(first_cell) not in ('0', '', 'None')
+return not (first_cell is not None or str(first_cell) == '' or 
int(first_cell) == 0)
 
 Review comment:
   Agree. I've found even more pythonic way of checking.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #5914: [AIRFLOW-5311] Add an AWS Lambda Operator

2019-10-16 Thread GitBox
feluelle commented on issue #5914: [AIRFLOW-5311] Add an AWS Lambda Operator
URL: https://github.com/apache/airflow/pull/5914#issuecomment-542821090
 
 
   ping @iainjames88 :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #5997: [AIRFLOW-5396] - Add button Auto Refresh

2019-10-16 Thread GitBox
feluelle commented on issue #5997: [AIRFLOW-5396] - Add button Auto Refresh
URL: https://github.com/apache/airflow/pull/5997#issuecomment-542819898
 
 
   @LucasMMota are you still working on that change? :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to Breeze documentation from GSOD

2019-10-16 Thread GitBox
feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to 
Breeze documentation from GSOD
URL: https://github.com/apache/airflow/pull/6285#discussion_r335623587
 
 

 ##
 File path: BREEZE.rst
 ##
 @@ -46,409 +46,586 @@ Here is the short 10 minute video about Airflow Breeze
 Prerequisites
 =
 
-Docker
---
+Docker Community Edition
+
 
-You need latest stable Docker Community Edition installed and on the PATH. It 
should be
-configured to be able to run ``docker`` commands directly and not only via 
root user. Your user
-should be in the ``docker`` group. See `Docker installation guide 
`_
+- **Version**: Install the latest stable Docker Community Edition and add it 
to the PATH.
+- **Permissions**: Configure to run the ``docker`` commands directly and not 
only via root user.
+  Your user should be in the ``docker`` group.
+  See `Docker installation guide `_ for 
details.
+- **Disk space**: On macOS, increase your available disk space before starting 
to work with
+  the environment. At least 128 GB of free disk space is recommended. You can 
also get by with a
+  smaller space but make sure to clean up the Docker disk space periodically.
+  See also `Docker for Mac - Space 
`_ for details
+  on increasing disk space available for Docker on Mac.
+- **Docker problems**: Sometimes it is not obvious that space is an issue when 
you run into
+  a problem with Docker. If you see a weird behaviour, try
+  `cleaning up the images <#cleaning-up-the-images>`_. Also see
+  `pruning `_ instructions from 
Docker.
+
+Docker Compose
+--
 
-When you develop on Mac OS you usually have not enough disk space for Docker 
if you start using it
-seriously. You should increase disk space available before starting to work 
with the environment.
-Usually you have weird problems of docker containers when you run out of Disk 
space. It might not be
-obvious that space is an issue. At least 128 GB of Disk space is recommended. 
You can also get by with smaller space but you should more
-often clean the docker disk space periodically.
+- **Version**: Install the latest stable Docker Compose and add it to the PATH.
+  See `Docker Compose Installation Guide 
`_ for details.
 
-If you get into weird behaviour try `Cleaning up the images 
<#cleaning-up-the-images>`_.
+- **Permissions**: Configure to run the ``docker-compose`` command.
 
-See also `Docker for Mac - Space 
`_ for details of increasing
-disk space available for Docker on Mac.
+Docker Images Used by Breeze
+
 
-Docker compose
---
+For all development tasks, related integration tests and static code checks, 
we use Docker
+images maintained on the Docker Hub in the ``apache/airflow`` repository.
 
-Latest stable Docker Compose installed and on the PATH. It should be
-configured to be able to run ``docker-compose`` command.
-See `Docker compose installation guide 
`_
+There are three images that we are currently managing:
 
-Getopt and gstat
-
+* **Slim CI** image that is used for static code checks (size of ~500MB). Its 
tag follows the pattern
+  of ``-python-ci-slim`` (for example, 
``apache/airflow:master-python3.6-ci-slim``).
+  The image is built using the ``_ Dockerfile.
+* **Full CI image*** that is used for testing. It contains a lot more 
test-related installed software
+  (size of ~1GB). Its tag follows the pattern of 
``-python-ci``
+  (for example, ``apache/airflow:master-python3.6-ci``). The image is built 
using the
+  ``_ Dockerfile.
+* **Checklicense image** that is used during license check with the Apache RAT 
tool. It does not
+  require any of the dependencies that the two CI images need so it is built 
using a different Dockerfile
+  ``_ and only contains Java + Apache RAT tool. The 
image is
+  labelled with ``checklicence``, for example: 
``apache/airflow:checklicence``. No versioning is used for
+  the Checklicence image.
 
-* If you are on MacOS
+Before you run tests, enter the environment or run local static checks, the 
necessary local images should be
+pulled and built from Docker Hub. This happens automatically for the test 
environment but you need to
+manually trigger it for static checks as described in `Building the images 
<#bulding-the-images>`_
+and `Pulling the latest images <#pulling-the-latest-images>`_.
+The static checks will fail and inform what to do if the image is not yet 
built.
 
-  * you need gnu ``getopt`` and ``gstat`` to get Airflow Breeze running.
+Building the image first time pulls a pre-built version of images from the 
Docker Hub, which may take some
+time. But for subsequent source code changes, 

[GitHub] [airflow] feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to Breeze documentation from GSOD

2019-10-16 Thread GitBox
feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to 
Breeze documentation from GSOD
URL: https://github.com/apache/airflow/pull/6285#discussion_r335623735
 
 

 ##
 File path: BREEZE.rst
 ##
 @@ -474,214 +651,220 @@ Run pylint checks for all files:
  ./breeze --static-check-all-files pylint
 
 
-The ``license`` check is also run via separate script and separate docker 
image containing
-Apache RAT verification tool that checks for Apache-compatibility of licences 
within the codebase.
-It does not take pre-commit parameters as extra args.
+The ``license`` check is run via a separate script and a separate Docker image 
containing the
+Apache RAT verification tool that checks for Apache-compatibility of licenses 
within the codebase.
+It does not take pre-commit parameters as extra arguments.
 
 .. code-block:: bash
 
  ./breeze --static-check-all-files licenses
 
-Building the documentation
---
+Running Static Code Checks from the Host
+
 
-The documentation is build using ``-O``, ``--build-docs`` command:
+You can trigger the static checks from the host environment, without entering 
the Docker container. To do
+this, run the following scripts (the same is done in Travis CI):
 
-.. code-block:: bash
+* ``_ - checks the licenses.
+* ``_ - checks that documentation can be built without 
warnings.
+* ``_ - runs Flake8 source code style enforcement 
tool.
+* ``_ - runs lint checker for the Dockerfile.
+* ``_ - runs a check for mypy type annotation 
consistency.
+* ``_ - runs pylint static code checker for main 
files.
+* '``_ - runs pylint static code checker for 
tests.
 
- ./breeze --build-docs
+The scripts may ask you to rebuild the images, if needed.
 
-Results of the build can be found in ``docs/_build`` folder. Often errors 
during documentation generation
-come from the docstrings of auto-api generated classes. During the docs 
building auto-api generated
-files are stored in ``docs/_api`` folder - so that in case of problems with 
documentation you can
-find where the problems with documentation originated from.
+You can force rebuilding the images by deleting the [.build](./build) 
directory. This directory keeps cached
+information about the images already built and you can safely delete it if you 
want to start from scratch.
 
-Running tests directly from host
-
+After documentation is built, the HTML results are available in the 
[docs/_build/html](docs/_build/html)
+folder. This folder is mounted from the host so you can access those files on 
your host as well.
 
-If you wish to run tests only and not drop into shell, you can run them by 
providing
--t, --test-target flag. You can add extra nosetest flags after -- in the 
commandline.
+Running Static Code Checks in the Docker
+--
 
-.. code-block:: bash
+If you are already in the Breeze Docker environment (by running the 
``./breeze`` command),
+you can also run the same static checks from the container:
 
- ./breeze --test-target tests/hooks/test_druid_hook.py -- 
--logging-level=DEBUG
+* Mypy: ``./scripts/ci/in_container/run_mypy.sh airflow tests``
+* Pylint for main files: ``./scripts/ci/in_container/run_pylint_main.sh``
+* Pylint for test files: ``./scripts/ci/in_container/run_pylint_tests.sh``
+* Flake8: ``./scripts/ci/in_container/run_flake8.sh``
+* License check: ``./scripts/ci/in_container/run_check_licence.sh``
+* Documentation: ``./scripts/ci/in_container/run_docs_build.sh``
 
-You can run the whole test suite with special '.' test target:
+Running Static Code Analysis for Selected Files
+---
 
-.. code-block:: bash
+In all static check scripts, both in the container and host versions, you can 
also pass a module/file path as
+parameters of the scripts to only check selected modules or files. For example:
 
-./breeze --test-target .
+In the Docker container:
 
-You can also specify individual tests or group of tests:
+.. code-block::
 
-.. code-block:: bash
+  ./scripts/ci/in_container/run_pylint.sh ./airflow/example_dags/
 
-./breeze --test-target tests.core:TestCore
+or
 
-Pulling the latest images
--
+.. code-block::
 
-Sometimes the image on DockerHub is rebuilt from the scratch. This happens for 
example when there is a
-security update of the python version that all the images are based on.
-In this case it is usually faster to pull latest images rather than rebuild 
them
-from the scratch.
+  ./scripts/ci/in_container/run_pylint.sh ./airflow/example_dags/test_utils.py
 
-You can do it via ``--force-pull-images`` flag to force pull latest images 
from DockerHub.
+On the host:
 
-In the future Breeze will warn you when you are advised to force pull images.
+.. code-block::
 
-Running commands inside Docker
---
+  

[GitHub] [airflow] feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to Breeze documentation from GSOD

2019-10-16 Thread GitBox
feluelle commented on a change in pull request #6285: [AIRFLOW-XXX] Updates to 
Breeze documentation from GSOD
URL: https://github.com/apache/airflow/pull/6285#discussion_r335624250
 
 

 ##
 File path: BREEZE.rst
 ##
 @@ -46,409 +46,586 @@ Here is the short 10 minute video about Airflow Breeze
 Prerequisites
 =
 
-Docker
---
+Docker Community Edition
+
 
-You need latest stable Docker Community Edition installed and on the PATH. It 
should be
-configured to be able to run ``docker`` commands directly and not only via 
root user. Your user
-should be in the ``docker`` group. See `Docker installation guide 
`_
+- **Version**: Install the latest stable Docker Community Edition and add it 
to the PATH.
+- **Permissions**: Configure to run the ``docker`` commands directly and not 
only via root user.
+  Your user should be in the ``docker`` group.
+  See `Docker installation guide `_ for 
details.
+- **Disk space**: On macOS, increase your available disk space before starting 
to work with
+  the environment. At least 128 GB of free disk space is recommended. You can 
also get by with a
+  smaller space but make sure to clean up the Docker disk space periodically.
+  See also `Docker for Mac - Space 
`_ for details
+  on increasing disk space available for Docker on Mac.
+- **Docker problems**: Sometimes it is not obvious that space is an issue when 
you run into
+  a problem with Docker. If you see a weird behaviour, try
+  `cleaning up the images <#cleaning-up-the-images>`_. Also see
+  `pruning `_ instructions from 
Docker.
+
+Docker Compose
+--
 
-When you develop on Mac OS you usually have not enough disk space for Docker 
if you start using it
-seriously. You should increase disk space available before starting to work 
with the environment.
-Usually you have weird problems of docker containers when you run out of Disk 
space. It might not be
-obvious that space is an issue. At least 128 GB of Disk space is recommended. 
You can also get by with smaller space but you should more
-often clean the docker disk space periodically.
+- **Version**: Install the latest stable Docker Compose and add it to the PATH.
+  See `Docker Compose Installation Guide 
`_ for details.
 
-If you get into weird behaviour try `Cleaning up the images 
<#cleaning-up-the-images>`_.
+- **Permissions**: Configure to run the ``docker-compose`` command.
 
-See also `Docker for Mac - Space 
`_ for details of increasing
-disk space available for Docker on Mac.
+Docker Images Used by Breeze
+
 
-Docker compose
---
+For all development tasks, related integration tests and static code checks, 
we use Docker
+images maintained on the Docker Hub in the ``apache/airflow`` repository.
 
-Latest stable Docker Compose installed and on the PATH. It should be
-configured to be able to run ``docker-compose`` command.
-See `Docker compose installation guide 
`_
+There are three images that we are currently managing:
 
-Getopt and gstat
-
+* **Slim CI** image that is used for static code checks (size of ~500MB). Its 
tag follows the pattern
+  of ``-python-ci-slim`` (for example, 
``apache/airflow:master-python3.6-ci-slim``).
+  The image is built using the ``_ Dockerfile.
+* **Full CI image*** that is used for testing. It contains a lot more 
test-related installed software
+  (size of ~1GB). Its tag follows the pattern of 
``-python-ci``
+  (for example, ``apache/airflow:master-python3.6-ci``). The image is built 
using the
+  ``_ Dockerfile.
+* **Checklicense image** that is used during license check with the Apache RAT 
tool. It does not
+  require any of the dependencies that the two CI images need so it is built 
using a different Dockerfile
+  ``_ and only contains Java + Apache RAT tool. The 
image is
+  labelled with ``checklicence``, for example: 
``apache/airflow:checklicence``. No versioning is used for
+  the Checklicence image.
 
-* If you are on MacOS
+Before you run tests, enter the environment or run local static checks, the 
necessary local images should be
+pulled and built from Docker Hub. This happens automatically for the test 
environment but you need to
+manually trigger it for static checks as described in `Building the images 
<#bulding-the-images>`_
+and `Pulling the latest images <#pulling-the-latest-images>`_.
+The static checks will fail and inform what to do if the image is not yet 
built.
 
-  * you need gnu ``getopt`` and ``gstat`` to get Airflow Breeze running.
+Building the image first time pulls a pre-built version of images from the 
Docker Hub, which may take some
+time. But for subsequent source code changes, 

[jira] [Updated] (AIRFLOW-5643) S3Hook logic is duplicated and could be clearer

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-5643:
---
Fix Version/s: (was: 2.0.0)
   1.10.7

> S3Hook logic is duplicated and could be clearer
> ---
>
> Key: AIRFLOW-5643
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5643
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, hooks
>Affects Versions: 1.10.5
>Reporter: Louis Guitton
>Assignee: Louis Guitton
>Priority: Trivial
> Fix For: 1.10.7
>
>
> S3Hook.load_bytes is duplicating the logic of S3Hook.load_file_obj
> [https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L464-L539|https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L465-L539]
> Instead, we should stay consistent : S3Hook.load_string is already delegating 
> the logic to S3Hook.load_bytes, so we can use the same approach to delegate 
> to S3Hook.load_file_obj



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] houqp commented on a change in pull request #6342: [AIRFLOW-5662] fix incorrect naming for scheduler used slot metric

2019-10-16 Thread GitBox
houqp commented on a change in pull request #6342: [AIRFLOW-5662] fix incorrect 
naming for scheduler used slot metric
URL: https://github.com/apache/airflow/pull/6342#discussion_r335617211
 
 

 ##
 File path: airflow/models/pool.py
 ##
 @@ -107,3 +109,33 @@ def open_slots(self, session):
 Returns the number of slots open at the moment
 """
 return self.slots - self.occupied_slots(session)
+
+@provide_session
+def slots_stats(self, session) -> Dict:
+from airflow.models.taskinstance import TaskInstance  # Avoid circular 
import
+
+states_filter = STATES_TO_COUNT_AS_RUNNING | frozenset([State.RUNNING, 
State.QUEUED])
 
 Review comment:
   It is, this is to safe guard future changes that will add or remove states 
from `STATE_TO_COUNT_AS_RUNNING` set. The naming of that variable doesn't seem 
to guarantee it will only contain queued and running going forward. I can 
certainly remove this safe guard code if that's not the case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] milton0825 commented on a change in pull request #5743: [AIRFLOW-5088][AIP-24] Persisting serialized DAG in DB for webserver scalability

2019-10-16 Thread GitBox
milton0825 commented on a change in pull request #5743: [AIRFLOW-5088][AIP-24] 
Persisting serialized DAG in DB for webserver scalability
URL: https://github.com/apache/airflow/pull/5743#discussion_r335609229
 
 

 ##
 File path: docs/howto/enable-dag-serialization.rst
 ##
 @@ -0,0 +1,109 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+
+
+Enable DAG Serialization
+
+
+Add the following settings in ``airflow.cfg``:
+
+.. code-block:: ini
+
+[core]
+store_serialized_dags = True
+min_serialized_dag_update_interval = 30
+
+*   ``store_serialized_dags``: This flag decides whether to serialises DAGs 
and persist them in DB.
+If set to True, Webserver reads from DB instead of parsing DAG files
+*   ``min_serialized_dag_update_interval``: This flag sets the minimum 
interval (in seconds) after which
+the serialized DAG in DB should be updated. This helps in reducing 
database write rate.
+
+If you are updating Airflow from <1.10.6, please do not forget to run 
``airflow db upgrade``.
+
+
+How it works
+
+
+In order to make Airflow Webserver stateless (almost!), Airflow >=1.10.6 
supports
+DAG Serialization and DB Persistence.
+
+.. image:: ../img/dag_serialization.png
+
+As shown in the image above in Vanilla Airflow, the Webserver and the 
Scheduler both
+needs access to the DAG files. Both the scheduler and webserver parses the DAG 
files.
+
+With **DAG Serialization** we aim to decouple the webserver from DAG parsing
+which would make the Webserver very light-weight.
+
+As shown in the image above, when using the dag_serilization feature,
+the Scheduler parses the DAG files, serializes them in JSON format and saves 
them in the Metadata DB.
+
+The Webserver now instead of having to parse the DAG file again, reads the
+serialized DAGs in JSON, de-serializes them and create the DagBag and uses it
+to show in the UI.
+
+One of the key features that is implemented as the part of DAG Serialization 
is that
+instead of loading an entire DagBag when the WebServer starts we only load 
each DAG on demand from the
+Serialized Dag table. This helps reduce Webserver startup time and memory. The 
reduction is notable
+when you have large number of DAGs.
+
+Below is the screenshot of the ``serialized_dag`` table in Metadata DB:
+
+.. image:: ../img/serialized_dag_table.png
+
+Limitations
+---
+The Webserver will still need access to DAG files in the following cases,
+which is why we said "almost" stateless.
+
+*   **Rendered Template** tab will still have to parse Python file as it needs 
all the details like
+the execution date and even the data passed by the upstream task using 
Xcom.
+*   **Code View** will read the DAG File & show it using Pygments.
+However, it does not need to Parse the Python file so it is still a small 
operation.
+*   :doc:`Extra Operator Links ` would not work out of
+the box. They need to be defined in Airflow Plugin.
+
+**Existing Airflow Operators**:
+To make extra operator links work with existing operators like BigQuery, 
copy all
+the classes that are defined in ``operator_extra_links`` property.
 
 Review comment:
   Can you clarify a bit about the "bundled" GCP links? Are you talking about 
the operator extra links registered as a `@property` in the operator like:
   
https://github.com/apache/airflow/blob/master/airflow/gcp/operators/bigquery.py#L460-L470
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `up_for_reschedule` before 
later becoming `up_for_retry` I am not sure why that is but I have not used 
sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `Up for reschedule` before 
later becoming `Waiting for retry` I am not sure why that is but I have not 
used sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.
   
   It's also curious that state goes initially to `Up for reschedule` before 
later becoming `Up for retry` I am not sure why that is but I have not used 
sensors / rescheduling before...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated, and resource id is lost.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  So after first 
rescheduled poke, the xcom is obliterated.
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```
   
   Not sure what is best way to resolve.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335593800
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
 
 Review comment:
   OK I think I may have found an issue.
   
   In live testing it appeared XCOM was not recorded.  In actuality, the 
problem is XCOM is cleared when task restarts after reschedule.  
   
   XCOM data appears to be cleared at start of each task.  [See 
here](https://github.com/apache/airflow/blob/1a4c16432b8718189269e63c3afcd2709eed7379/airflow/models/taskinstance.py#L913).
   
   So during when task restarts after reschedule, we lose the resource id.
   
   Probably a similar explanation for invisible logs issue i commented on 
earlier.
   
   Here's my sample operator:
   ```
   class Op(BaseAsyncOperator):
   
   def submit_request(self, context: Dict) -> str:
   return '129uh8981h9u80eh'
   
   def poke(self, context):
   ti = context['ti']
   print(f"try_number: {ti.try_number}")
   
   for k, v in context.items():
   print(f"{k}: {v}")
   
   print("\n sleeping")
   import time
   time.sleep(60)
   
   return False
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335587922
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
 
 Review comment:
   is there a need to reference SkipMixin given that BaseSensor already extends 
this class?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335589712
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
 
 Review comment:
   should we extend ABC in this class?  maybe there is a reason we don't do 
this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-5681) Allow specification of a tag or hash for the git_sync init container

2019-10-16 Thread George Miller (Jira)
George Miller created AIRFLOW-5681:
--

 Summary: Allow specification of a tag or hash for the git_sync 
init container
 Key: AIRFLOW-5681
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5681
 Project: Apache Airflow
  Issue Type: New Feature
  Components: executor-kubernetes
Affects Versions: 1.10.5
Reporter: George Miller
Assignee: George Miller
 Fix For: 1.10.6


We want to deploy dags based on tags on a branch, and want to use the git_sync 
init container in our kuberenetes setup to clone the dag folder at the start of 
a task run.

 

Here's my PR 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-5681) Allow specification of a tag or hash for the git_sync init container

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953008#comment-16953008
 ] 

ASF GitHub Bot commented on AIRFLOW-5681:
-

george-miller commented on pull request #6350: [AIRFLOW-5681] Allow 
specification of a tag or hash for the git_sync init container
URL: https://github.com/apache/airflow/pull/6350
 
 
   
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW-5681) 
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   We want to deploy dags based on tags on a branch, and want to use the 
git_sync init container in our kuberenetes setup to clone the dag folder at the 
start of a task run.
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow specification of a tag or hash for the git_sync init container
> 
>
> Key: AIRFLOW-5681
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5681
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: executor-kubernetes
>Affects Versions: 1.10.5
>Reporter: George Miller
>Assignee: George Miller
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.6
>
>
> We want to deploy dags based on tags on a branch, and want to use the 
> git_sync init container in our kuberenetes setup to clone the dag folder at 
> the start of a task run.
>  
> Here's my PR 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5681) Allow specification of a tag or hash for the git_sync init container

2019-10-16 Thread George Miller (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Miller updated AIRFLOW-5681:
---
Description: 
We want to deploy dags based on tags on a branch, and want to use the git_sync 
init container in our kuberenetes setup to clone the dag folder at the start of 
a task run.

 

Here's my PR [https://github.com/apache/airflow/pull/6350]

  was:
We want to deploy dags based on tags on a branch, and want to use the git_sync 
init container in our kuberenetes setup to clone the dag folder at the start of 
a task run.

 

Here's my PR 


> Allow specification of a tag or hash for the git_sync init container
> 
>
> Key: AIRFLOW-5681
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5681
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: executor-kubernetes
>Affects Versions: 1.10.5
>Reporter: George Miller
>Assignee: George Miller
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.6
>
>
> We want to deploy dags based on tags on a branch, and want to use the 
> git_sync init container in our kuberenetes setup to clone the dag folder at 
> the start of a task run.
>  
> Here's my PR [https://github.com/apache/airflow/pull/6350]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] george-miller opened a new pull request #6350: [AIRFLOW-5681] Allow specification of a tag or hash for the git_sync init container

2019-10-16 Thread GitBox
george-miller opened a new pull request #6350: [AIRFLOW-5681] Allow 
specification of a tag or hash for the git_sync init container
URL: https://github.com/apache/airflow/pull/6350
 
 
   
   
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW-5681) 
   
   ### Description
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   
   We want to deploy dags based on tags on a branch, and want to use the 
git_sync init container in our kuberenetes setup to clone the dag folder at the 
start of a task run.
   
   ### Tests
   
   - [x] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [x] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
 - If you implement backwards incompatible changes, please leave a note in 
the [Updating.md](https://github.com/apache/airflow/blob/master/UPDATING.md) so 
we can assign it to a appropriate release
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5643) S3Hook logic is duplicated and could be clearer

2019-10-16 Thread Felix Uellendall (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16953002#comment-16953002
 ] 

Felix Uellendall commented on AIRFLOW-5643:
---

[~ash] I set Fix Version to 2.0 since you said you want to cut the rc tomorrow. 
I suggest we add it to 1.10.6 if we have a rc2 :D

> S3Hook logic is duplicated and could be clearer
> ---
>
> Key: AIRFLOW-5643
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5643
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, hooks
>Affects Versions: 1.10.5
>Reporter: Louis Guitton
>Assignee: Louis Guitton
>Priority: Trivial
> Fix For: 2.0.0
>
>
> S3Hook.load_bytes is duplicating the logic of S3Hook.load_file_obj
> [https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L464-L539|https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L465-L539]
> Instead, we should stay consistent : S3Hook.load_string is already delegating 
> the logic to S3Hook.load_bytes, so we can use the same approach to delegate 
> to S3Hook.load_file_obj



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (AIRFLOW-5643) S3Hook logic is duplicated and could be clearer

2019-10-16 Thread Felix Uellendall (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Uellendall resolved AIRFLOW-5643.
---
Resolution: Fixed

> S3Hook logic is duplicated and could be clearer
> ---
>
> Key: AIRFLOW-5643
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5643
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, hooks
>Affects Versions: 1.10.5
>Reporter: Louis Guitton
>Assignee: Louis Guitton
>Priority: Trivial
> Fix For: 2.0.0
>
>
> S3Hook.load_bytes is duplicating the logic of S3Hook.load_file_obj
> [https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L464-L539|https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L465-L539]
> Instead, we should stay consistent : S3Hook.load_string is already delegating 
> the logic to S3Hook.load_bytes, so we can use the same approach to delegate 
> to S3Hook.load_file_obj



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5643) S3Hook logic is duplicated and could be clearer

2019-10-16 Thread Felix Uellendall (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Felix Uellendall updated AIRFLOW-5643:
--
Fix Version/s: 2.0.0

> S3Hook logic is duplicated and could be clearer
> ---
>
> Key: AIRFLOW-5643
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5643
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, hooks
>Affects Versions: 1.10.5
>Reporter: Louis Guitton
>Assignee: Louis Guitton
>Priority: Trivial
> Fix For: 2.0.0
>
>
> S3Hook.load_bytes is duplicating the logic of S3Hook.load_file_obj
> [https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L464-L539|https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L465-L539]
> Instead, we should stay consistent : S3Hook.load_string is already delegating 
> the logic to S3Hook.load_bytes, so we can use the same approach to delegate 
> to S3Hook.load_file_obj



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-5643) S3Hook logic is duplicated and could be clearer

2019-10-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16952997#comment-16952997
 ] 

ASF GitHub Bot commented on AIRFLOW-5643:
-

feluelle commented on pull request #6313: [AIRFLOW-5643] Reduce duplicated 
logic in S3Hook
URL: https://github.com/apache/airflow/pull/6313
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> S3Hook logic is duplicated and could be clearer
> ---
>
> Key: AIRFLOW-5643
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5643
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: aws, hooks
>Affects Versions: 1.10.5
>Reporter: Louis Guitton
>Assignee: Louis Guitton
>Priority: Trivial
>
> S3Hook.load_bytes is duplicating the logic of S3Hook.load_file_obj
> [https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L464-L539|https://github.com/apache/airflow/blob/master/airflow/hooks/S3_hook.py#L465-L539]
> Instead, we should stay consistent : S3Hook.load_string is already delegating 
> the logic to S3Hook.load_bytes, so we can use the same approach to delegate 
> to S3Hook.load_file_obj



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] feluelle commented on issue #6313: [AIRFLOW-5643] Reduce duplicated logic in S3Hook

2019-10-16 Thread GitBox
feluelle commented on issue #6313: [AIRFLOW-5643] Reduce duplicated logic in 
S3Hook
URL: https://github.com/apache/airflow/pull/6313#issuecomment-542792264
 
 
   Yes, Jarek I just saw the thread right after I reran the tests. 
   
   I think we can merge it then.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335581468
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
+def submit_request(self, context: Dict) -> Optional[Union[str, List, 
Dict]]:
+"""
+This method should kick off a long running operation.
+This method should return the ID for the long running operation if
+applicable.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+
+:returns: a resource_id for the long running operation.
+:rtype: Optional[Union[String, List, Dict]]
+"""
+raise NotImplementedError
+
+def process_result(self, context: Dict):
+"""
+This method can optionally be overriden to process the result of a 
long running operation.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+"""
+self.log.info('Using 

[GitHub] [airflow] dstandish commented on a change in pull request #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish commented on a change in pull request #6210: [AIRFLOW-5567] 
BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#discussion_r335581468
 
 

 ##
 File path: airflow/models/base_async_operator.py
 ##
 @@ -0,0 +1,161 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Base Asynchronous Operator for kicking off a long running
+operations and polling for completion with reschedule mode.
+"""
+
+from abc import abstractmethod
+from typing import Dict, List, Optional, Union
+
+from airflow.models import SkipMixin, TaskReschedule
+from airflow.models.xcom import XCOM_EXTERNAL_RESOURCE_ID_KEY
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class BaseAsyncOperator(BaseSensorOperator, SkipMixin):
+"""
+AsyncOperators are derived from this class and inherit these attributes.
+AsyncOperators should be used for long running operations where the task
+can tolerate a longer poke interval. They use the task rescheduling
+mechanism similar to sensors to avoid occupying a worker slot between
+pokes.
+
+Developing concrete operators that provide parameterized flexibility
+for synchronous or asynchronous poking depending on the invocation is
+possible by programing against this `BaseAsyncOperator` interface,
+and overriding the execute method as demonstrated below.
+
+```python3
+class DummyFlexiblePokingOperator(BaseAsyncOperator):
+  def __init__(self, async=False, *args, **kwargs):
+self.async = async
+super().__init(*args, **kwargs)
+
+  def execute(self, context: Dict) -> None:
+if self.async:
+  # use the BaseAsyncOperator's execute
+  super().execute(context)
+else:
+  self.submit_request(context)
+  while not self.poke():
+time.sleep(self.poke_interval)
+  self.process_results(context)
+
+  def sumbit_request(self, context: Dict) -> Optional[str]:
+return None
+
+  def poke(self, context: Dict) -> bool:
+return bool(random.getrandbits(1))
+```
+
+AsyncOperators must override the following methods:
+:py:meth:`submit_request`: fire a request for a long running operation
+:py:meth:`poke`: a method to check if the long running operation is
+complete it should return True when a success criteria is met.
+
+Optionally, AsyncOperators can override:
+:py:meth: `process_result` to perform any operations after the success
+criteria is met in :py:meth: `poke`
+
+:py:meth: `poke` is executed at a time interval and succeed when a
+criteria is met and fail if and when they time out.
+
+:param soft_fail: Set to true to mark the task as SKIPPED on failure
+:type soft_fail: bool
+:param poke_interval: Time in seconds that the job should wait in
+between each tries
+:type poke_interval: int
+:param timeout: Time, in seconds before the task times out and fails.
+:type timeout: int
+
+"""
+ui_color = '#9933ff'  # type: str
+
+@apply_defaults
+def __init__(self,
+ *args,
+ **kwargs) -> None:
+super().__init__(mode='reschedule', *args, **kwargs)
+
+@abstractmethod
+def submit_request(self, context: Dict) -> Optional[Union[str, List, 
Dict]]:
+"""
+This method should kick off a long running operation.
+This method should return the ID for the long running operation if
+applicable.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+
+:returns: a resource_id for the long running operation.
+:rtype: Optional[Union[String, List, Dict]]
+"""
+raise NotImplementedError
+
+def process_result(self, context: Dict):
+"""
+This method can optionally be overriden to process the result of a 
long running operation.
+Context is the same dictionary used as when rendering jinja templates.
+
+Refer to get_template_context for more context.
+"""
+self.log.info('Using 

[GitHub] [airflow] dstandish edited a comment on issue #6210: [AIRFLOW-5567] BaseAsyncOperator

2019-10-16 Thread GitBox
dstandish edited a comment on issue #6210: [AIRFLOW-5567] BaseAsyncOperator
URL: https://github.com/apache/airflow/pull/6210#issuecomment-542495934
 
 
   I gave this a try ... one curious thing while the task is in "waiting 
for retry" status, the log is not visible.
   
   while it runs poke and shortly after, before status is updated, log is 
visible again.
   
   then when status is updated to retry, again log is invisible. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335577650
 
 

 ##
 File path: docs/howto/operator/index.rst
 ##
 @@ -36,3 +36,4 @@ information.
 kubernetes
 papermill
 python
+custom-operator
 
 Review comment:
   Then it should be placed in howto folder rather than howto/operator.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 'Create a custom operator' doc

2019-10-16 Thread GitBox
KKcorps commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r335576837
 
 

 ##
 File path: docs/howto/operator/custom-operator.rst
 ##
 @@ -0,0 +1,177 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+Create Custom Operator
+===
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+
+def __init__(
+self,
+name: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+
+def execute(self, context):
+message = "Hello {}".format(name)
+print(message)
+return message
+
+The above operator can now be used as follows:
+
+.. code:: python
+
+hello_task = HelloOperator(task_id='', dag=dag, name='foo_bar')
+
+Hooks
+^
+Hooks act as an interface to communicate with the external shared resources in 
a DAG.
+For example, multiple tasks in a DAG can require access to a MySQL database. 
Instead of
+creating a connection per task, you can retrieve a connection from the hook 
and utilize it.
+Hook also helps to avoid storing connection auth parameters in a DAG. 
+See :doc:`../connection/index` for how to create and manage connections.
+
+Let's extend our previous example to fetch name from MySQL:
+
+.. code:: python
+
+class HelloDBOperator(BaseOperator):
+
+def __init__(
+self,
+name: str,
+conn_id: str,
+database: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.name = name
+self.conn_id = conn_id
+self.database = database
+
+def execute(self, context):
+hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
+ schema=self.database)
+sql = "select name from user"
+result = hook.get_first(sql)
+message = "Hello {}".format(result['name'])
+print(message)
+return message
+
+When the operator invokes the query on the hook object, a new connection gets 
created if it doesn't exist. 
+The hook retrieves the auth parameters such as username and password from 
Airflow
+backend and passes the params to the ``get_connection`` method from 
``BaseHook``. 
+
+
+User interface
+^^^
+Airflow also allows the developer to control how the operator shows up in the 
DAG UI.
+Override ``ui_color``to change the background color of the operator in UI. 
+Override ``ui_fgcolor`` to change the color of the label.
+
+.. code::  python
+
+class HelloOperator(BaseOperator):
+ui_color = '#ff'
+ui_fgcolor = '#00'
+
+
+Templating
+^^^
+You can use :ref:`Jinja templates ` to parameterize your 
operator.
+Airflow considers the field names present in ``template_fields``  for 
templating while rendering
+the operator.
+
+.. code:: python
+
+class HelloOperator(BaseOperator):
+
+template_fields = ['name']
+
+def __init__(
+self,
+name: str,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+

[GitHub] [airflow] mik-laj commented on issue #6259: [AIRFLOW-XXX] Example for BigQuery to BigQuery operator

2019-10-16 Thread GitBox
mik-laj commented on issue #6259: [AIRFLOW-XXX] Example for BigQuery to 
BigQuery operator
URL: https://github.com/apache/airflow/pull/6259#issuecomment-542773786
 
 
   This change must also have a ticket
   https://github.com/apache/airflow/pull/6227#issuecomment-537105167


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6336: [AIRFLOW-5073] Change SQLSensor to optionally treat NULL as keep poking

2019-10-16 Thread GitBox
ashb commented on a change in pull request #6336: [AIRFLOW-5073] Change 
SQLSensor to optionally treat NULL as keep poking
URL: https://github.com/apache/airflow/pull/6336#discussion_r335561348
 
 

 ##
 File path: airflow/sensors/sql_sensor.py
 ##
 @@ -103,5 +103,5 @@ def poke(self, context):
 else:
 raise AirflowException("self.success is present, but not 
callable -> {}".format(self.success))
 if self.allow_null:
-return str(first_cell) not in ('0', '')
-return str(first_cell) not in ('0', '', 'None')
+return not (first_cell is not None or str(first_cell) == '' or 
int(first_cell) == 0)
+return not (first_cell is None or str(first_cell) == '' or 
int(first_cell) == 0)
 
 Review comment:
   And this one can be
   ```suggestion
   return first_cell is not None and (first_cell != '' or first_cell != 
0)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #6336: [AIRFLOW-5073] Change SQLSensor to optionally treat NULL as keep poking

2019-10-16 Thread GitBox
ashb commented on a change in pull request #6336: [AIRFLOW-5073] Change 
SQLSensor to optionally treat NULL as keep poking
URL: https://github.com/apache/airflow/pull/6336#discussion_r335560715
 
 

 ##
 File path: airflow/sensors/sql_sensor.py
 ##
 @@ -103,5 +103,5 @@ def poke(self, context):
 else:
 raise AirflowException("self.success is present, but not 
callable -> {}".format(self.success))
 if self.allow_null:
-return str(first_cell) not in ('0', '')
-return str(first_cell) not in ('0', '', 'None')
+return not (first_cell is not None or str(first_cell) == '' or 
int(first_cell) == 0)
 
 Review comment:
   I'm not a fan of double-negatives. I think this is easier to understand 
without them. I think this works and carries the same meaning?
   
   ```suggestion
   return first_cell is None or first_cell != '' or first_cell != 0
   ```
   
   We don't want 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme

2019-10-16 Thread GitBox
mik-laj commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme
URL: https://github.com/apache/airflow/pull/6349#issuecomment-542766326
 
 
   In this PR it is okay for me


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (AIRFLOW-4928) Remove redundant config parsing in DagBag

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-4928:
---
Fix Version/s: (was: 2.0.0)
   1.10.6

> Remove redundant config parsing in DagBag 
> --
>
> Key: AIRFLOW-4928
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4928
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: models
>Affects Versions: 1.10.3
>Reporter: Sergio Kef
>Assignee: Sergio Kef
>Priority: Minor
>  Labels: easyfix
> Fix For: 1.10.6
>
>
> In DagBag, configuration file is parsed multiple times while this can be 
> avoided. We can save the values we read in the object namespace and refer it 
> within the instance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (AIRFLOW-5497) dag_processing::list_py_file_paths method pydoc needs param added

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-5497.

Fix Version/s: 1.10.6
   Resolution: Done

> dag_processing::list_py_file_paths method pydoc needs param added
> -
>
> Key: AIRFLOW-5497
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5497
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: utils
>Affects Versions: 1.10.6
>Reporter: Jakob Homan
>Priority: Minor
>  Labels: ccoss2019, newbie
> Fix For: 1.10.6
>
>
> Note: This ticket's being created to facilitate a new contributor's workshop 
> for Airflow. After the workshop has completed, I'll mark these all available 
> for anyone that might like to take them on.
> The {{list_py_file_paths}} method pydoc is missing an entry for the 
> {{include_examples}} parameter.  We should add it.
> airflow/utils/dag_processing.py:291
> {code:java}
> def list_py_file_paths(directory, safe_mode=conf.getboolean('core', 
> 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
>include_examples=None):
> """
> Traverse a directory and look for Python files.
> :param directory: the directory to traverse
> :type directory: unicode
> :param safe_mode: whether to use a heuristic to determine whether a file
> contains Airflow DAG definitions. If not provided, use the
> core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, 
> default
> to safe.
> :return: a list of paths to Python files in the specified directory
> :rtype: list[unicode] {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on issue #6259: [AIRFLOW-XXX] Example for BigQuery to BigQuery operator

2019-10-16 Thread GitBox
mik-laj commented on issue #6259: [AIRFLOW-XXX] Example for BigQuery to 
BigQuery operator
URL: https://github.com/apache/airflow/pull/6259#issuecomment-542762709
 
 
   Personally, I would prefer all examples for GCP to have system tests
   https://github.com/apache/airflow/pull/6227#issuecomment-537099595
   
   After that, an example for this operator is already in the repository.
   
https://github.com/apache/airflow/blob/master/airflow/gcp/example_dags/example_bigquery.py#L159-L163
   This example has system tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (AIRFLOW-5130) Use GOOGLE_APPLICATION_CREDENTIALS constant from library

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5130?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-5130:
---
Fix Version/s: (was: 2.0.0)
   1.10.6

> Use GOOGLE_APPLICATION_CREDENTIALS constant from library
> 
>
> Key: AIRFLOW-5130
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5130
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: gcp
>Affects Versions: 1.10.3
>Reporter: Kamil Bregula
>Priority: Major
> Fix For: 1.10.6
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5126) Read aws_session_token in extra_config of the aws hook

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-5126:
---
Fix Version/s: 1.10.6

> Read aws_session_token in extra_config of the aws hook
> --
>
> Key: AIRFLOW-5126
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5126
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.10.3
>Reporter: Alexandre Blanchard
>Assignee: Johannes Günther
>Priority: Minor
> Fix For: 1.10.6
>
>
> Hi,
> Thanks for the great software.
> At my company, we enforce security around our aws account and all accounts 
> must have mfa activated. To use airflow with my account, I generate a session 
> token with an expiration date using the command
> {code:java}
> aws sts assume-role --role-arn  --role-session-name 
> testing --serial-number  --token-code 
> 
>  --duration-seconds 18000{code}
> This way I retrieve all I need to connect to aws: a aws_access_key_id, a 
> aws_secret_access_key and a aws_session_token. 
> Currently I'm using boto3 directly in my dag and it's working great. I would 
> like to use a connection managed by airflow but when I set the parameters 
> this way:
> {code:java}
> airflow connections --add \
>  --conn_id s3_log \
>  --conn_type s3 \
>  --conn_login "" \
>  --conn_password "" \
>  --conn_extra "{ \
>    \"aws_session_token\": \"\" \
> }"
> {code}
> With a hook using this connection, I get the error:
> {code:java}
> [2019-08-06 12:31:28,157] {__init__.py:1580} ERROR - An error occurred (403) 
> when calling the HeadObject operation: Forbidden
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/site-packages/airflow/models/__init__.py", 
> line 1441, in _run_raw_task
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py",
>  line 112, in execute
> return_value = self.execute_callable()
>   File 
> "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py",
>  line 117, in execute_callable
> return self.python_callable(*self.op_args, **self.op_kwargs)
>   File "/root/airflow/dags/s3Dag.py", line 48, in download_raw_data
> dataObject = s3hook.get_key("poc/raw_data.csv.gz", s3_bucket)
>   File "/usr/local/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", 
> line 217, in get_key
> obj.load()
>   File "/usr/local/lib/python3.7/site-packages/boto3/resources/factory.py", 
> line 505, in do_action
> response = action(self, *args, **kwargs)
>   File "/usr/local/lib/python3.7/site-packages/boto3/resources/action.py", 
> line 83, in __call__
> response = getattr(parent.meta.client, operation_name)(**params)
>   File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 357, 
> in _api_call
> return self._make_api_call(operation_name, kwargs)
>   File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 661, 
> in _make_api_call
> raise error_class(parsed_response, operation_name)
> botocore.exceptions.ClientError: An error occurred (403) when calling the 
> HeadObject operation: Forbidden
> {code}
> Reading the code of the hook 
> (https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/aws_hook.py#L90),
>  I understand that the session token is not read from the extra config. The 
> only case a session token is passed to the boto3 client is when we assume a 
> role. In my case I want to use a role I have already assumed.
> So my suggestion is to read the session token from the extra config and use 
> it to connect to aws.
> Do you think it is the right way to do it ? Does this workflow make sense ?
> I am ready to contribute if my suggestion is accepted.
> Regards



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5585) Docker-context file check gives only marginal improvements for build

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-5585:
---
Fix Version/s: (was: 2.0.0)
   1.10.6

> Docker-context file check gives only marginal improvements for build
> 
>
> Key: AIRFLOW-5585
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5585
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: ci
>Affects Versions: 2.0.0, 1.10.5
>Reporter: Jarek Potiuk
>Priority: Major
> Fix For: 1.10.6
>
>
> After recent merges of breeze where we check whether Docker needs rebuild we 
> do not need to fix all files in the context.
> Most of the times the build will not be triggered (and setup.py, Dockerfile 
> package.json etc. will not change). So it is enough just to fix permissions 
> for those sensitive files and do not loose time for listing and fixing all 
> the files in the context.
> This will save 5 or so seconds for every build and decrease overall build 
> script complexity.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] leahecole commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme

2019-10-16 Thread GitBox
leahecole commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme
URL: https://github.com/apache/airflow/pull/6349#issuecomment-542762236
 
 
   I can add a link to that in this PR also, or would it be better to open a 
separate PR @mik-laj? 
   
   > We also have brandbook: 
https://cwiki.apache.org/confluence/display/AIRFLOW/Brandbook
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (AIRFLOW-5399) Add invoke operator for GCP Functions

2019-10-16 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-5399.

Resolution: Done

> Add invoke operator for GCP Functions
> -
>
> Key: AIRFLOW-5399
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5399
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: gcp
>Affects Versions: 2.0.0
>Reporter: Tomasz Urbaszek
>Priority: Minor
> Fix For: 1.10.6
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on issue #6333: [AIRFLOW-5656] Rename provider to providers module

2019-10-16 Thread GitBox
mik-laj commented on issue #6333: [AIRFLOW-5656] Rename provider to providers 
module
URL: https://github.com/apache/airflow/pull/6333#issuecomment-542760786
 
 
   I restarted tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] mik-laj commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme

2019-10-16 Thread GitBox
mik-laj commented on issue #6349: [AIRFLOW-XXX] Add logo info to readme
URL: https://github.com/apache/airflow/pull/6349#issuecomment-542760284
 
 
   We also have brandbook: 
https://cwiki.apache.org/confluence/display/AIRFLOW/Brandbook


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >