[GitHub] codecov-io commented on issue #4355: [AIRFLOW-3870] Update log level and return value

2019-02-28 Thread GitBox
codecov-io commented on issue #4355: [AIRFLOW-3870] Update log level and return 
value
URL: https://github.com/apache/airflow/pull/4355#issuecomment-468577074
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=h1) 
Report
   > Merging 
[#4355](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=desc) into 
[master](https://codecov.io/gh/apache/airflow/commit/aed71a794423cfac819dd576d9111dc2f527e463?src=pr=desc)
 will **increase** coverage by `2.37%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/airflow/pull/4355/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=tree)
   
   ```diff
   @@Coverage Diff @@
   ##   master#4355  +/-   ##
   ==
   + Coverage   74.16%   76.53%   +2.37% 
   ==
 Files 421  461  +40 
 Lines   2777335837+8064 
   ==
   + Hits2059827429+6831 
   - Misses   7175 8408+1233
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=tree) | 
Coverage Δ | |
   |---|---|---|
   | 
[airflow/contrib/operators/sftp\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9zZnRwX29wZXJhdG9yLnB5)
 | `87.5% <100%> (ø)` | :arrow_up: |
   | 
[airflow/operators/hive\_to\_mysql.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvaGl2ZV90b19teXNxbC5weQ==)
 | `80% <0%> (-20%)` | :arrow_down: |
   | 
[airflow/operators/hive\_stats\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvaGl2ZV9zdGF0c19vcGVyYXRvci5weQ==)
 | `80.85% <0%> (-19.15%)` | :arrow_down: |
   | 
[airflow/operators/hive\_to\_samba\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvaGl2ZV90b19zYW1iYV9vcGVyYXRvci5weQ==)
 | `82.75% <0%> (-17.25%)` | :arrow_down: |
   | 
[airflow/operators/jdbc\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvamRiY19vcGVyYXRvci5weQ==)
 | `85.71% <0%> (-14.29%)` | :arrow_down: |
   | 
[airflow/operators/presto\_to\_mysql.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcHJlc3RvX3RvX215c3FsLnB5)
 | `87.09% <0%> (-12.91%)` | :arrow_down: |
   | 
[airflow/utils/db.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9kYi5weQ==)
 | `30.33% <0%> (-5.38%)` | :arrow_down: |
   | 
[airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5)
 | `80.16% <0%> (-1.66%)` | :arrow_down: |
   | 
[airflow/contrib/operators/ssh\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9zc2hfb3BlcmF0b3IucHk=)
 | `82.27% <0%> (-1.48%)` | :arrow_down: |
   | 
[...rflow/contrib/operators/kubernetes\_pod\_operator.py](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree#diff-YWlyZmxvdy9jb250cmliL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZF9vcGVyYXRvci5weQ==)
 | `98.66% <0%> (-1.34%)` | :arrow_down: |
   | ... and [141 
more](https://codecov.io/gh/apache/airflow/pull/4355/diff?src=pr=tree-more) 
| |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=footer). 
Last update 
[aed71a7...6fef4a2](https://codecov.io/gh/apache/airflow/pull/4355?src=pr=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] iwilltry42 edited a comment on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
iwilltry42 edited a comment on issue #4589: [AIRFLOW-3766] Add support for 
kubernetes annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468574296
 
 
   @feng-tao this is definitely a good thing, but it won't help in that case.
   Let me expand on this a little bit:
   I am using the 
[sidecar-injector](https://github.com/tumblr/k8s-sidecar-injector/), which 
requests an injection config using an annotation on pods like this: 
`"injector.tumblr.com/request": "sidecar-test"`.
   That way, I can inject e.g. a kerberos sidecar into each pod that's being 
created by the KubernetesExecutor.
   So I put this into the `airflow.cfg` under the `[kubernetes_annotations]` 
section: `injector.tumblr.com/request = sidecar-test`. This works just fine.
   Now if I want to define this in an environment variable like all the other 
config options, then it will fail, as 
`AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST: sidecar-test` is 
not a valid environment variable (and also not a valid ConfigMap key) because 
of the `/` it contains.
   
   The error message for the Kubernetes deployment (using stable/airflow helm 
chart) when putting it into the ConfigMap: 
   > Invalid value: 
"AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST": a valid config 
key must consist of alphanumeric characters, '-', '_' or '.' (e.g. 'key.name',  
or 'KEY_NAME',  or 'key-name', regex used for validation is '[-._a-zA-Z0-9]+')
   
   And when putting it directly into an env var:
   > Invalid value: 
"AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST": a valid 
environment variable name must consist of alphabetic characters, digits, '_', 
'-', or '.', and must not start with a digit (e.g. 'my.env-name',  or 
'MY_ENV.NAME',  or 'MyEnvName1', regex used for validation is 
'[-._a-zA-Z][-._a-zA-Z0-9]*')
   
   Maybe there's room for a char-sequence substitution? As the `/` in 
annotations is a very common thing?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] iwilltry42 edited a comment on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
iwilltry42 edited a comment on issue #4589: [AIRFLOW-3766] Add support for 
kubernetes annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468574296
 
 
   @feng-tao this is definitely a good thing, but it won't help in that case.
   Let me expand on this a little bit:
   I am using the 
[sidecar-injector](https://github.com/tumblr/k8s-sidecar-injector/), which 
requests an injection config using an annotation on pods like this: 
`"injector.tumblr.com/request": "sidecar-test"`.
   That way, I can inject e.g. a kerberos sidecar into each pod that's being 
created by the KubernetesExecutor.
   So I put this into the `airflow.cfg` under the `[kubernetes_annotations]` 
section: `injector.tumblr.com/request = sidecar-test`. This works just fine.
   Now if I want to define this in an environment variable like all the other 
config options, then it will fail, as 
`AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST: sidecar-test` is 
not a valid environment variable (and also not a valid ConfigMap key) because 
of the `/` it contains.
   
   The error message for the Kubernetes deployment (using stable/airflow helm 
chart): 
   > Invalid value: 
"AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST": a valid config 
key must consist of alphanumeric characters, '-', '_' or '.' (e.g. 'key.name',  
or 'KEY_NAME',  or 'key-name', regex used for validation is '[-._a-zA-Z0-9]+')


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes 
annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468576600
 
 
   got you @iwilltry42 , thanks for the explain. lgtm.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao edited a comment on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
feng-tao edited a comment on issue #4589: [AIRFLOW-3766] Add support for 
kubernetes annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468576600
 
 
   got you @iwilltry42 , thanks for the explaination. lgtm.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on a change in pull request #4772: [AIRFLOW-3937] KubernetesPodOperator support for envFrom configMapRef…

2019-02-28 Thread GitBox
feng-tao commented on a change in pull request #4772: [AIRFLOW-3937] 
KubernetesPodOperator support for envFrom configMapRef…
URL: https://github.com/apache/airflow/pull/4772#discussion_r261506193
 
 

 ##
 File path: 
airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py
 ##
 @@ -191,3 +191,10 @@ def extract_image_pull_secrets(pod, req):
 def extract_tolerations(pod, req):
 if pod.tolerations:
 req['spec']['tolerations'] = pod.tolerations
+
+@staticmethod
+def extract_env_from(pod, req):
+for idx, configmap in enumerate(pod.envs_from_configmaps):
+
req['spec']['containers'][0]['envFrom'][idx]['configMapRef']['name'] = 
configmap.name
 
 Review comment:
   I am not very familiar with Airflow k8s code. But this seems to a long chain 
of map? Does it always work? Could it happen when the key doesn't exist in the 
dict which causes exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-3982) DagRun state updater is incorrect when DagRun's tasks don't correspond to DAG's

2019-02-28 Thread Dima Kamalov (JIRA)
Dima Kamalov created AIRFLOW-3982:
-

 Summary: DagRun state updater is incorrect when DagRun's tasks 
don't correspond to DAG's
 Key: AIRFLOW-3982
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3982
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Dima Kamalov
Assignee: Dima Kamalov


To repro:

(1) create a Dag with a start date and a task, run it

(2) add a task in that DAG with a later start date

(3) rerun past run – tasks will get processed correctly but DagRun state won't 
get updated



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] iwilltry42 commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
iwilltry42 commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes 
annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468574296
 
 
   @feng-tao this is definitely a good thing, but it won't help in that case.
   Let me expand on this a little bit:
   I am using the 
[sidecar-injector](https://github.com/tumblr/k8s-sidecar-injector/), which 
requests an injection config using an annotation on pods like this: 
`"injector.tumblr.com/request": "sidecar-test"`.
   That way, I can inject e.g. a kerberos sidecar into each pod that's being 
created by the KubernetesExecutor.
   So I put this into the `airflow.cfg` under the `[kubernetes_annotations]` 
section: `injector.tumblr.com/request = sidecar-test`. This works just fine.
   Now if I want to define this in an environment variable like all the other 
config options, then it will fail, as 
`AIRFLOW__KUBERNETES_ANNOTATIONS__INJECTOR.TUMBLR.COM/REQUEST: sidecar-test` is 
not a valid environment variable (and also not a valid ConfigMap key) because 
of the `/` it contains.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes 
annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468569337
 
 
   @iwilltry42 will this https://github.com/apache/airflow/pull/4772 help?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-3870) SFTPOperator to push filepath into xcom

2019-02-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781368#comment-16781368
 ] 

ASF GitHub Bot commented on AIRFLOW-3870:
-

C4 commented on pull request #4355: [AIRFLOW-3870] Update log level and return 
value
URL: https://github.com/apache/airflow/pull/4355
 
 
   ### Jira
   
   https://issues.apache.org/jira/browse/AIRFLOW-3870
   
   ### Description
   
   After using this operator, it's very useful to see which file is getting 
downloaded. Changed the debug logging to info. 
   
   Also, return the filepath into xcom so that downstream tasks can use it.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SFTPOperator to push filepath into xcom
> ---
>
> Key: AIRFLOW-3870
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3870
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: jack
>Priority: Minor
>
> push filepath into xcom and update info messages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] feng-tao commented on issue #4355: [AIRFLOW-3870] Update log level and return value

2019-02-28 Thread GitBox
feng-tao commented on issue #4355: [AIRFLOW-3870] Update log level and return 
value
URL: https://github.com/apache/airflow/pull/4355#issuecomment-468568210
 
 
   will merge once CI passes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-3870) SFTPOperator to push filepath into xcom

2019-02-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781367#comment-16781367
 ] 

ASF GitHub Bot commented on AIRFLOW-3870:
-

feng-tao commented on pull request #4355: [AIRFLOW-3870] Update log level and 
return value
URL: https://github.com/apache/airflow/pull/4355
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SFTPOperator to push filepath into xcom
> ---
>
> Key: AIRFLOW-3870
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3870
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: jack
>Priority: Minor
>
> push filepath into xcom and update info messages.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] C4 opened a new pull request #4355: [AIRFLOW-3870] Update log level and return value

2019-02-28 Thread GitBox
C4 opened a new pull request #4355: [AIRFLOW-3870] Update log level and return 
value
URL: https://github.com/apache/airflow/pull/4355
 
 
   ### Jira
   
   https://issues.apache.org/jira/browse/AIRFLOW-3870
   
   ### Description
   
   After using this operator, it's very useful to see which file is getting 
downloaded. Changed the debug logging to info. 
   
   Also, return the filepath into xcom so that downstream tasks can use it.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao closed pull request #4355: [AIRFLOW-3870] Update log level and return value

2019-02-28 Thread GitBox
feng-tao closed pull request #4355: [AIRFLOW-3870] Update log level and return 
value
URL: https://github.com/apache/airflow/pull/4355
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] iwilltry42 commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
iwilltry42 commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes 
annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468566303
 
 
   I tested this change in my fork based on tag 1.10.2 and it works just fine.
   Although this only works completely fine in some environments if you put the 
annotations into the `airflow.cfg`, as many annotations (e.g. for Prometheus or 
the sidecar-injector) have a `/` in them which is neither allowed as an 
environment variable in Docker nor as a ConfigMap key in Kubernetes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on a change in pull request #4799: [AIRFLOW-3975] Handle null inputs in attribute renderers.

2019-02-28 Thread GitBox
feng-tao commented on a change in pull request #4799: [AIRFLOW-3975] Handle 
null inputs in attribute renderers.
URL: https://github.com/apache/airflow/pull/4799#discussion_r261497612
 
 

 ##
 File path: airflow/www/utils.py
 ##
 @@ -351,9 +355,8 @@ def get_attr_renderer():
 'doc_yaml': lambda x: render(x, lexers.YamlLexer),
 'doc_md': wrapped_markdown,
 'python_callable': lambda x: render(
-inspect.getsource(x), lexers.PythonLexer),
+inspect.getsource(x) if x is not None else None, 
lexers.PythonLexer),
 
 Review comment:
   should this line be `inspect.getsource(x) if x is not None else ''`? For 
your .jira description, the issue happens when None passes to render function 
which causes exception?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes annotations

2019-02-28 Thread GitBox
feng-tao commented on issue #4589: [AIRFLOW-3766] Add support for kubernetes 
annotations
URL: https://github.com/apache/airflow/pull/4589#issuecomment-468564593
 
 
   look reasonable to me. @dimberman , what do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on a change in pull request #4761: [AIRFLOW-3942] Expose FAB args to plugins

2019-02-28 Thread GitBox
feng-tao commented on a change in pull request #4761: [AIRFLOW-3942] Expose FAB 
args to plugins
URL: https://github.com/apache/airflow/pull/4761#discussion_r261494349
 
 

 ##
 File path: airflow/www/app.py
 ##
 @@ -151,15 +152,18 @@ def integrate_plugins():
 
 for v in flask_appbuilder_views:
 log.debug("Adding view %s", v["name"])
-appbuilder.add_view(v["view"],
-v["name"],
-category=v["category"])
+if 'view' in v:
+v['baseview'] = v.pop('view')
+warnings.warn(
+"AirflowPlugin.flask_appbuilder_views[] 
dictionaries should use 'baseview' instead of 'view. "
+"Using 'view' has been deprecated and will be 
removed in future versions.",
+DeprecationWarning,
+)
+
+appbuilder.add_view(**v)
 
 Review comment:
   baseview, name is not kwargs in 
https://flask-appbuilder.readthedocs.io/en/latest/api.html#flask_appbuilder.base.AppBuilder.add_view.
 Should we still change it to ```
   appbuilder.add_view(v["baseview"],
   v["name"],
   category=v["category"])
   ```?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] feng-tao commented on a change in pull request #4761: [AIRFLOW-3942] Expose FAB args to plugins

2019-02-28 Thread GitBox
feng-tao commented on a change in pull request #4761: [AIRFLOW-3942] Expose FAB 
args to plugins
URL: https://github.com/apache/airflow/pull/4761#discussion_r261494390
 
 

 ##
 File path: airflow/www/app.py
 ##
 @@ -151,15 +152,18 @@ def integrate_plugins():
 
 for v in flask_appbuilder_views:
 log.debug("Adding view %s", v["name"])
-appbuilder.add_view(v["view"],
-v["name"],
-category=v["category"])
+if 'view' in v:
+v['baseview'] = v.pop('view')
+warnings.warn(
+"AirflowPlugin.flask_appbuilder_views[] 
dictionaries should use 'baseview' instead of 'view. "
+"Using 'view' has been deprecated and will be 
removed in future versions.",
+DeprecationWarning,
+)
+
+appbuilder.add_view(**v)
 for ml in sorted(flask_appbuilder_menu_links, key=lambda x: 
x["name"]):
 log.debug("Adding menu link %s", ml["name"])
-appbuilder.add_link(ml["name"],
-href=ml["href"],
-category=ml["category"],
-category_icon=ml["category_icon"])
+appbuilder.add_link(**ml)
 
 Review comment:
   same


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (AIRFLOW-3981) Make Airflow UI timezone aware

2019-02-28 Thread Tao Feng (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-3981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Feng updated AIRFLOW-3981:
--
Summary: Make Airflow UI timezone aware  (was: Makr Airflow UI timezone 
aware)

> Make Airflow UI timezone aware
> --
>
> Key: AIRFLOW-3981
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3981
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Tao Feng
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-3981) Makr Airflow UI timezone aware

2019-02-28 Thread Tao Feng (JIRA)
Tao Feng created AIRFLOW-3981:
-

 Summary: Makr Airflow UI timezone aware
 Key: AIRFLOW-3981
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3981
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Tao Feng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] milton0825 closed pull request #4806: update contributing

2019-02-28 Thread GitBox
milton0825 closed pull request #4806: update contributing
URL: https://github.com/apache/airflow/pull/4806
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] milton0825 opened a new pull request #4806: update contributing

2019-02-28 Thread GitBox
milton0825 opened a new pull request #4806: update contributing
URL: https://github.com/apache/airflow/pull/4806
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
   - [ ] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR"
 - https://issues.apache.org/jira/browse/AIRFLOW-XXX
 - In case you are fixing a typo in the documentation you can prepend your 
commit with \[AIRFLOW-XXX\], code changes always need a Jira issue.
 - In case you are proposing a fundamental code change, you need to create 
an Airflow Improvement 
Proposal([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)).
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (AIRFLOW-2221) Fill up DagBag from remote locations

2019-02-28 Thread Tao Feng (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tao Feng reassigned AIRFLOW-2221:
-

Assignee: Chao-Han Tsai  (was: Diogo Franco)

> Fill up DagBag from remote locations
> 
>
> Key: AIRFLOW-2221
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2221
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: configuration, core
>Affects Versions: 2.0.0
>Reporter: Diogo Franco
>Assignee: Chao-Han Tsai
>Priority: Major
> Fix For: 2.0.0
>
>
> The ability to fill up the DagBag from remote locations (HDFS, S3...) seems 
> to be deemed useful, e.g. facilitating deployment processes.
> This JIRA is to propose an implementation of a *DagFetcher* abstraction on 
> the DagBag, where the collect_dags method can delegate the walking to a 
> *FileSystemDagFetcher*, *GitRepoDagFetcher*, *S3DagFetcher*, 
> *HDFSDagFetcher*, *GCSDagFetcher*, *ArtifactoryDagFetcher* or even 
> *TarballInS3DagFetcher*.
> This was briefly discussed in [this mailing list 
> thread|https://lists.apache.org/thread.html/03ddcd3a42b7fd6e3dad9711e8adea37fc00391f6053762f73af5b6a@%3Cdev.airflow.apache.org%3E]
> I'm happy to start work on this and provide an initial implementation for 
> review.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2221) Fill up DagBag from remote locations

2019-02-28 Thread Tao Feng (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781297#comment-16781297
 ] 

Tao Feng commented on AIRFLOW-2221:
---

[~milton0825], feel free to take it. But since it includes core change, we need 
to have an AIP(airflow improvement proposal) now. There is already one for this 
topic([https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-5+DagFetcher)] 
which you could talk to the author and see if you could take it over for 
discussion.

> Fill up DagBag from remote locations
> 
>
> Key: AIRFLOW-2221
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2221
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: configuration, core
>Affects Versions: 2.0.0
>Reporter: Diogo Franco
>Assignee: Diogo Franco
>Priority: Major
> Fix For: 2.0.0
>
>
> The ability to fill up the DagBag from remote locations (HDFS, S3...) seems 
> to be deemed useful, e.g. facilitating deployment processes.
> This JIRA is to propose an implementation of a *DagFetcher* abstraction on 
> the DagBag, where the collect_dags method can delegate the walking to a 
> *FileSystemDagFetcher*, *GitRepoDagFetcher*, *S3DagFetcher*, 
> *HDFSDagFetcher*, *GCSDagFetcher*, *ArtifactoryDagFetcher* or even 
> *TarballInS3DagFetcher*.
> This was briefly discussed in [this mailing list 
> thread|https://lists.apache.org/thread.html/03ddcd3a42b7fd6e3dad9711e8adea37fc00391f6053762f73af5b6a@%3Cdev.airflow.apache.org%3E]
> I'm happy to start work on this and provide an initial implementation for 
> review.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2221) Fill up DagBag from remote locations

2019-02-28 Thread Chao-Han Tsai (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781283#comment-16781283
 ] 

Chao-Han Tsai commented on AIRFLOW-2221:


[~DiogoFranco] are you still working on this ticket? If not, I am interested in 
taking this over.

> Fill up DagBag from remote locations
> 
>
> Key: AIRFLOW-2221
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2221
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: configuration, core
>Affects Versions: 2.0.0
>Reporter: Diogo Franco
>Assignee: Diogo Franco
>Priority: Major
> Fix For: 2.0.0
>
>
> The ability to fill up the DagBag from remote locations (HDFS, S3...) seems 
> to be deemed useful, e.g. facilitating deployment processes.
> This JIRA is to propose an implementation of a *DagFetcher* abstraction on 
> the DagBag, where the collect_dags method can delegate the walking to a 
> *FileSystemDagFetcher*, *GitRepoDagFetcher*, *S3DagFetcher*, 
> *HDFSDagFetcher*, *GCSDagFetcher*, *ArtifactoryDagFetcher* or even 
> *TarballInS3DagFetcher*.
> This was briefly discussed in [this mailing list 
> thread|https://lists.apache.org/thread.html/03ddcd3a42b7fd6e3dad9711e8adea37fc00391f6053762f73af5b6a@%3Cdev.airflow.apache.org%3E]
> I'm happy to start work on this and provide an initial implementation for 
> review.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] jmcarp commented on a change in pull request #4685: [AIRFLOW-3862] Check types with mypy.

2019-02-28 Thread GitBox
jmcarp commented on a change in pull request #4685: [AIRFLOW-3862] Check types 
with mypy.
URL: https://github.com/apache/airflow/pull/4685#discussion_r261476559
 
 

 ##
 File path: airflow/contrib/operators/azure_container_instances_operator.py
 ##
 @@ -18,6 +18,9 @@
 # under the License.
 
 from time import sleep
+from collections import namedtuple
+
+from typing import Dict, Sequence
 
 Review comment:
   Updated. We might want to consider https://github.com/timothycrosley/isort 
or similar to do this consistently.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-3977) Incorrect example about the interaction between skipped tasks and trigger rules in documentation.

2019-02-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781236#comment-16781236
 ] 

ASF GitHub Bot commented on AIRFLOW-3977:
-

cixuuz commented on pull request #4805: [AIRFLOW-3977] Fix/Add examples about 
how trigger rules interactive with skipped tasks
URL: https://github.com/apache/airflow/pull/4805
 
 
   ### Jira
   
   - [ ] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. 
 - https://issues.apache.org/jira/browse/AIRFLOW-3977
 - https://issues.apache.org/jira/browse/AIRFLOW-1784
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   The current LatestOnlyOperator will skip all downstream tasks blindly. 
However, the doc shows it will respect trigger rules which is a wrong behavior. 
   It also shows an incorrect example of the interaction between skipped tasks 
and trigger rules in schedule level. I replace it with other examplesusing 
BranchingOperator.
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   No test needed because its changes on documentation.
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incorrect example about the interaction between skipped tasks and trigger 
> rules in documentation.
> -
>
> Key: AIRFLOW-3977
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3977
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.8.2, 1.9.0, 1.10.0, 1.10.1, 1.10.2
>Reporter: cixuuz
>Assignee: cixuuz
>Priority: Major
>  Labels: documentaion
> Fix For: 1.10.3
>
>
> Current LatestOnlyOperator will skip all downstream tasks blindly. 
> BranchingOperator could be a better example to show how trigger rules 
> interacted with skipped tasks in schedule level. 
> This fix can also resolve this ticket:
> https://issues.apache.org/jira/browse/AIRFLOW-1784 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] cixuuz opened a new pull request #4805: [AIRFLOW-3977] Fix/Add examples about how trigger rules interactive with skipped tasks

2019-02-28 Thread GitBox
cixuuz opened a new pull request #4805: [AIRFLOW-3977] Fix/Add examples about 
how trigger rules interactive with skipped tasks
URL: https://github.com/apache/airflow/pull/4805
 
 
   ### Jira
   
   - [ ] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references 
them in the PR title. 
 - https://issues.apache.org/jira/browse/AIRFLOW-3977
 - https://issues.apache.org/jira/browse/AIRFLOW-1784
   
   ### Description
   
   - [ ] Here are some details about my PR, including screenshots of any UI 
changes:
   The current LatestOnlyOperator will skip all downstream tasks blindly. 
However, the doc shows it will respect trigger rules which is a wrong behavior. 
   It also shows an incorrect example of the interaction between skipped tasks 
and trigger rules in schedule level. I replace it with other examplesusing 
BranchingOperator.
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   No test needed because its changes on documentation.
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhongjiajie edited a comment on issue #4583: [AIRFLOW-3746] Fix to prevent missing container exit

2019-02-28 Thread GitBox
zhongjiajie edited a comment on issue #4583: [AIRFLOW-3746] Fix to prevent 
missing container exit
URL: https://github.com/apache/airflow/pull/4583#issuecomment-468526525
 
 
   @ashb @Fokko @feng-tao PLAL, @ashwiniadiga send a dev-mail for asking 
committer review and merge to master.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhongjiajie commented on issue #4583: [AIRFLOW-3746] Fix to prevent missing container exit

2019-02-28 Thread GitBox
zhongjiajie commented on issue #4583: [AIRFLOW-3746] Fix to prevent missing 
container exit
URL: https://github.com/apache/airflow/pull/4583#issuecomment-468526525
 
 
   @ashb @Fokko @feng-tao PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XD-DENG commented on issue #4804: [AIRFLOW-3980] Unify logger

2019-02-28 Thread GitBox
XD-DENG commented on issue #4804: [AIRFLOW-3980] Unify logger
URL: https://github.com/apache/airflow/pull/4804#issuecomment-468524852
 
 
   
   > ### Description
   > The logger was used incorrectly in several places. We should not format 
the text before passing it to the logger, because it can be ignored in the 
logger.
   
   Hi @mik-laj , does logger ignore format method? In which cases?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XD-DENG edited a comment on issue #4768: [AIRFLOW-3800] run a dag at the beginning of the scheduled interval

2019-02-28 Thread GitBox
XD-DENG edited a comment on issue #4768: [AIRFLOW-3800] run a dag at the 
beginning of the scheduled interval
URL: https://github.com/apache/airflow/pull/4768#issuecomment-468515868
 
 
   Hi @ali5h , as shared earlier, this change may not be well accepted by 
folks, since it may potentially causeeven more confusions.
   
   This is the thread of related discussion in the dev maillist, FYI: 
https://lists.apache.org/thread.html/f6e99f00d64eca9ad0818e87df7974eb1d4dd24614734db2e3adaeea@%3Cdev.airflow.apache.org%3E
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-3980) Unify logger

2019-02-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16781214#comment-16781214
 ] 

ASF GitHub Bot commented on AIRFLOW-3980:
-

mik-laj commented on pull request #4804: [AIRFLOW-3980] Unify logger
URL: https://github.com/apache/airflow/pull/4804
 
 
   Make sure you have checked _all_ steps below.
   
   ### Jira
   
 - https://issues.apache.org/jira/browse/AIRFLOW-3980
   
   ### Description
   
   The logger was used incorrectly in several places. We should not format the 
text before passing it to the logger, because it can be ignored in the logger.
   
   CC: @BasPH 
   
   ### Tests
   
   - [ ] My PR adds the following unit tests __OR__ does not need testing for 
this extremely good reason:
   
   ### Commits
   
   - [ ] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
 - All the public functions and the classes in the PR contain docstrings 
that explain what it does
   
   ### Code Quality
   
   - [ ] Passes `flake8`
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unify logger
> 
>
> Key: AIRFLOW-3980
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3980
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Kamil Bregula
>Priority: Trivial
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] XD-DENG commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of the scheduled interval

2019-02-28 Thread GitBox
XD-DENG commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of 
the scheduled interval
URL: https://github.com/apache/airflow/pull/4768#issuecomment-468515868
 
 
   Hi @ali5h , as shared earlier, this change may not be well accepted by 
folks, since it may potentially causing even more confusions.
   
   This is the thread of related discussion in the dev maillist, FYI: 
https://lists.apache.org/thread.html/f6e99f00d64eca9ad0818e87df7974eb1d4dd24614734db2e3adaeea@%3Cdev.airflow.apache.org%3E
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-3980) Unify logger

2019-02-28 Thread Kamil Bregula (JIRA)
Kamil Bregula created AIRFLOW-3980:
--

 Summary: Unify logger
 Key: AIRFLOW-3980
 URL: https://issues.apache.org/jira/browse/AIRFLOW-3980
 Project: Apache Airflow
  Issue Type: Improvement
Reporter: Kamil Bregula






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] mik-laj commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261457113
 
 

 ##
 File path: airflow/hooks/base_hook.py
 ##
 @@ -80,7 +80,7 @@ def get_connection(cls, conn_id):
 conn = random.choice(cls.get_connections(conn_id))
 if conn.host:
 log = LoggingMixin().log
-log.info("Using connection to: %s", conn.debug_info())
+# log.info("Using connection to: %s", conn.debug_info())
 
 Review comment:
   TODO: Remove it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261456984
 
 

 ##
 File path: airflow/contrib/operators/gcp_vision_operator.py
 ##
 @@ -671,3 +672,284 @@ def execute(self, context):
 timeout=self.timeout,
 metadata=self.metadata,
 )
+
+
+class CloudVisionAnnotateImageOperator(BaseOperator):
+"""
+Run image detection and annotation for an image.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionAnnotateImageOperator`
+
+:param request: (Required) Individual file annotation requests.
+If a dict is provided, it must be of the same form as the protobuf
+message class:`google.cloud.vision_v1.types.AnnotateImageRequest`
+:type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_annotate_image_template_fields]
+template_fields = ('request', 'gcp_conn_id')
+# [END vision_annotate_image_template_fields]
+
+@apply_defaults
+def __init__(
+self, request, retry=None, timeout=None, 
gcp_conn_id='google_cloud_default', *args, **kwargs
+):
+super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs)
+self.request = request
+self.retry = retry
+self.timeout = timeout
+self.gcp_conn_id = gcp_conn_id
+
+def execute(self, context):
+hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
 
 Review comment:
   It is better when the hook is created in the execute method. Otherwise, 
templates for gcp_conn_id are not correctly rendered.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261456640
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -354,3 +307,140 @@ def _get_autogenerated_id(response):
 if '/' not in name:
 raise AirflowException('Unable to get id from name... 
[{}]'.format(name))
 return name.rsplit('/', 1)[1]
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def create_reference_image(
 
 Review comment:
   We use an automatic tool to check the style of the code. The same one that 
is used by Google


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261456481
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -157,17 +162,12 @@ def get_product_set(
 client = self.get_conn()
 name = ProductSearchClient.product_set_path(project_id, location, 
product_set_id)
 self.log.info('Retrieving ProductSet: %s', name)
-response = self._handle_request(
-lambda **kwargs: client.get_product_set(**kwargs),
-name=name,
-retry=retry,
-timeout=timeout,
-metadata=metadata,
-)
+response = client.get_product_set(name=name, retry=retry, 
timeout=timeout, metadata=metadata)
 
 Review comment:
   We use automatic code formatting to maintain a uniform style for all GCP 
operators.  We use black. The same tool is used by the google-cloud-python 
library.
   
   See: 
https://github.com/googleapis/google-cloud-python/blob/b718d2d9bb32b0e7934ae90d57dc80c81ce0fb73/vision/noxfile.py#L44-L55


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261455422
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261455239
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261454998
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
 
 Review comment:
   It's a parameter for `BaseSensorOperator`. We have no special 
recommendations for values. We have added it only to be able to set a lower 
value, which will allow the operator to perform properly with a smaller set of 
data.
   
   I will expand the description of this parameter.
   
   Reference: 
https://incubator-airflow.readthedocs.io/en/airflow-1075/code.html#airflow.operators.sensors.BaseSensorOperator


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261454998
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
 
 Review comment:
   It's a parameter for `BaseSensorOperator`. We have no special 
recommendations for values. We have added it only to be able to set a lower 
value, which will allow the operator to perform properly with a smaller set of 
data.
   
   Reference: 
https://incubator-airflow.readthedocs.io/en/airflow-1075/code.html#airflow.operators.sensors.BaseSensorOperator


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ali5h commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of the scheduled interval

2019-02-28 Thread GitBox
ali5h commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of 
the scheduled interval
URL: https://github.com/apache/airflow/pull/4768#issuecomment-468509905
 
 
   @Fokko  I think @XD-DENG wanted it at dag-level. I original had it at the 
instance level, but dag level makes more sense. I don't think I quite 
understood your question.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ali5h commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of the scheduled interval

2019-02-28 Thread GitBox
ali5h commented on issue #4768: [AIRFLOW-3800] run a dag at the beginning of 
the scheduled interval
URL: https://github.com/apache/airflow/pull/4768#issuecomment-468509457
 
 
   @XD-DENG moved to dag level.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261454170
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] codecov-io edited a comment on issue #4773: [AIRFLOW-3767] Correct bulk insert function

2019-02-28 Thread GitBox
codecov-io edited a comment on issue #4773: [AIRFLOW-3767] Correct bulk insert 
function
URL: https://github.com/apache/airflow/pull/4773#issuecomment-467066997
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=h1) 
Report
   > Merging 
[#4773](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=desc) into 
[master](https://codecov.io/gh/apache/airflow/commit/04e63017611f4f8d612065a4faa1fd31388871ab?src=pr=desc)
 will **increase** coverage by `0.1%`.
   > The diff coverage is `100%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/airflow/pull/4773/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=tree)
   
   ```diff
   @@Coverage Diff@@
   ##   master#4773 +/-   ##
   =
   + Coverage   74.33%   74.44%   +0.1% 
   =
 Files 450  450 
 Lines   2897028972  +2 
   =
   + Hits2153621569 +33 
   + Misses   7434 7403 -31
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=tree) | 
Coverage Δ | |
   |---|---|---|
   | 
[airflow/hooks/oracle\_hook.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9vcmFjbGVfaG9vay5weQ==)
 | `95.79% <100%> (+0.07%)` | :arrow_up: |
   | 
[airflow/jobs.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5)
 | `76.46% <0%> (+0.71%)` | :arrow_up: |
   | 
[airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==)
 | `59.15% <0%> (+1.05%)` | :arrow_up: |
   | 
[airflow/executors/\_\_init\_\_.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy9leGVjdXRvcnMvX19pbml0X18ucHk=)
 | `63.46% <0%> (+3.84%)` | :arrow_up: |
   | 
[airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5)
 | `81.81% <0%> (+4.54%)` | :arrow_up: |
   | 
[airflow/executors/sequential\_executor.py](https://codecov.io/gh/apache/airflow/pull/4773/diff?src=pr=tree#diff-YWlyZmxvdy9leGVjdXRvcnMvc2VxdWVudGlhbF9leGVjdXRvci5weQ==)
 | `100% <0%> (+50%)` | :arrow_up: |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=footer). 
Last update 
[04e6301...0a3f460](https://codecov.io/gh/apache/airflow/pull/4773?src=pr=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261453350
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261453108
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261450692
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
 
 Review comment:
   I will introduce your suggestions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261450565
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261449384
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261449053
 
 

 ##
 File path: airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
 ##
 @@ -16,112 +16,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import warnings
 
-from airflow.models import BaseOperator
-from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
-from airflow.utils.decorators import apply_defaults
+from airflow.contrib.operators.gcp_transfer_operator import (  # noqa
 
 Review comment:
   This import is not used in this file. It is only for backward compatibility.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261448744
 
 

 ##
 File path: docs/howto/operator.rst
 ##
 @@ -2327,3 +2327,403 @@ More information
 
 See `Google Cloud Vision Product delete documentation
 
`_.
+
+Google Cloud Transfer Service Operators
 
 Review comment:
   Recently we have discussed this topic in the team. This link is also missing 
in other places. We wanted to introduce it in a separate change. For now, we 
prefer to keep style with other operators.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261448256
 
 

 ##
 File path: tests/contrib/hooks/test_gcp_transfer_hook.py
 ##
 @@ -18,125 +18,487 @@
 # under the License.
 #
 import json
-import datetime
 import unittest
+from copy import deepcopy
 
-from airflow.exceptions import AirflowException
-from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
-from airflow.contrib.hooks.gcp_transfer_hook import TIME_TO_SLEEP_IN_SECONDS
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import (
+GCPTransferServiceHook,
+TIME_TO_SLEEP_IN_SECONDS,
+GcpTransferOperationStatus,
+)
+from tests.contrib.utils.base_gcp_mock import (
+mock_base_gcp_hook_no_default_project_id,
+mock_base_gcp_hook_default_project_id,
+)
 
 try:
 from unittest import mock
-except ImportError:
+except ImportError:  # pragma: no cover
 try:
 import mock
 except ImportError:
 mock = None
 
+PROJECT_ID = 'project-id'
+BODY = {'description': 'AAA', 'project_id': PROJECT_ID}
 
-class TestGCPTransferServiceHook(unittest.TestCase):
+TRANSFER_JOB_NAME = "transfer-job"
+TRANSFER_OPERATION_NAME = "transfer-operation"
+
+TRANSFER_JOB = {"name": TRANSFER_JOB_NAME}
+TRANSFER_OPERATION = {"name": TRANSFER_OPERATION_NAME}
+
+TRANSFER_JOB_FILTER = {'project_id': 'project-id', 'job_names': 
[TRANSFER_JOB_NAME]}
+TRANSFER_OPERATION_FILTER = {'project_id': 'project-id', 'job_names': 
[TRANSFER_JOB_NAME]}
+UPDATE_TRANSFER_JOB_BODY = {
+"transferJob": {'description': 'description-1'},
+'project_id': PROJECT_ID,
+"update_transfer_job_field_mask": 'description',
+}
+
+
+class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
 def setUp(self):
-with mock.patch.object(GCPTransferServiceHook, '__init__', 
return_value=None):
-self.conn = mock.Mock()
-self.transfer_hook = GCPTransferServiceHook()
-self.transfer_hook._conn = self.conn
-
-def test_create_transfer_job(self):
-mock_create = self.conn.transferJobs.return_value.create
-mock_execute = mock_create.return_value.execute
-mock_execute.return_value = {
-'projectId': 'test-project',
-'name': 'transferJobs/test-job',
-}
-now = datetime.datetime.utcnow()
-transfer_spec = {
-'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
-'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
-}
-self.transfer_hook.create_transfer_job(
-project_id='test-project',
-description='test-description',
-schedule=None,
-transfer_spec=transfer_spec)
-mock_create.assert_called_once_with(body={
-'status': 'ENABLED',
-'projectId': 'test-project',
-'description': 'test-description',
-'transferSpec': transfer_spec,
-'schedule': {
-'scheduleStartDate': {
-'day': now.day,
-'month': now.month,
-'year': now.year,
-},
-'scheduleEndDate': {
-'day': now.day,
-'month': now.month,
-'year': now.year,
-}
-}
-})
-
-def test_create_transfer_job_custom_schedule(self):
-mock_create = self.conn.transferJobs.return_value.create
-mock_execute = mock_create.return_value.execute
-mock_execute.return_value = {
-'projectId': 'test-project',
-'name': 'transferJobs/test-job',
-}
-schedule = {
-'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
-'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
-}
-transfer_spec = {
-'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
-'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
-}
-self.transfer_hook.create_transfer_job(
-project_id='test-project',
-description='test-description',
-schedule=schedule,
-transfer_spec=transfer_spec)
-mock_create.assert_called_once_with(body={
-'status': 'ENABLED',
-'projectId': 'test-project',
-'description': 'test-description',
-'transferSpec': transfer_spec,
-'schedule': schedule,
-})
+with mock.patch(
+'airflow.contrib.hooks.' 
'gcp_api_base_hook.GoogleCloudBaseHook.__init__',
+new=mock_base_gcp_hook_no_default_project_id,
+):
+self.gct_hook = GCPTransferServiceHook(gcp_conn_id='test')
+
+@mock.patch('airflow.contrib.hooks.gcp_transfer_hook.' 
'GCPTransferServiceHook.get_conn')
+def 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261447715
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261447618
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
mik-laj commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261446532
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
 
 Review comment:
   This is a piece of code copied from other places in the project. For 
consistency, we have not introduced new rules. Airflow 2.0 iIs planned to 
release, which will drop support for Python 2.. After release, we will update 
this code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260911817
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
 
 Review comment:
   Is there a recommendation for a value here? What unit is the interval 
specified in?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260966092
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261428233
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261431388
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261406105
 
 

 ##
 File path: tests/contrib/sensors/test_gcp_transfer_sensor.py
 ##
 @@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from airflow.contrib.hooks.gcp_transfer_hook import GcpTransferOperationStatus
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+
+try:
+from unittest import mock
+except ImportError:
+try:
+import mock
+except ImportError:
+mock = None
+
+
+class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
 
 Review comment:
   I'd like to see additional test cases: Variations on # of statuses provided 
(0, 1, > 1)
   I'd also like to see where the mock initially returns no successful 
operations and then returns a successful one.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260997512
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261402956
 
 

 ##
 File path: docs/howto/operator.rst
 ##
 @@ -2327,3 +2327,403 @@ More information
 
 See `Google Cloud Vision Product delete documentation
 
`_.
+
+Google Cloud Transfer Service Operators
+---
+
+.. _howto/operator:GcpTransferServiceJobCreateOperator:
+
+GcpTransferServiceJobCreateOperator
+^^^
+
+Create a transfer job.
+
+The function accepts dates in two formats:
+
+- consistent with `Google API 
`_
 ::
+
+{ "year": 2019, "month": 2, "day": 11 }
+
+- as an :class:`~datetime.datetime` object
+
+The function accepts time in two formats:
+
+- consistent with `Google API 
`_
 ::
+
+{ "hours": 12, "minutes": 30, "seconds": 0 }
+
+- as an :class:`~datetime.time` object
+
+If you want to create a job transfer that copies data from AWS S3 then you 
must have a connection configured. Information about configuration for AWS is 
available: :ref:`connection-type-AWS`
+The selected connection for AWS can be indicated by the parameter 
``aws_conn_id``.
+
+Arguments
+"
+
+Some arguments in the example DAG are taken from the OS environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_transfer.py
+  :language: python
+  :start-after: [START howto_operator_gct_common_variables]
+  :end-before: [END howto_operator_gct_common_variables]
+
+Using the operator
+""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_transfer.py
+  :language: python
+  :start-after: [START howto_operator_gct_create_job_body_gcp]
+  :end-before: [END howto_operator_gct_create_job_body_gcp]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_transfer.py
+  :language: python
+  :start-after: [START howto_operator_gct_create_job_body_aws]
+  :end-before: [END howto_operator_gct_create_job_body_aws]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_transfer.py
+  :language: python
+  :dedent: 4
+  :start-after: [START howto_operator_gct_create_job]
+  :end-before: [END howto_operator_gct_create_job]
+
+Templating
+""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_transfer_operator.py
+:language: python
+:dedent: 4
+:start-after: [START gcp_transfer_job_create_template_fields]
+:end-before: [END gcp_transfer_job_create_template_fields]
+
+More information
+
+
+See `Google Cloud Transfer Service - Method: transferJobs.create
+`_.
+
+.. _howto/operator:GcpTransferServiceJobDeleteOperator:
+
+GcpTransferServiceJobDeleteOperator
+^^^
+
+Deletes a transfer job.
+
+Arguments
+"
+
+Some arguments in the example DAG are taken from the OS environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_transfer.py
+  :language: python
+  :start-after: [START howto_operator_gct_common_variables]
 
 Review comment:
   is this supposed to be gcp?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261007224
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260997416
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261003821
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261270005
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261285303
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261403640
 
 

 ##
 File path: docs/howto/operator.rst
 ##
 @@ -2327,3 +2327,403 @@ More information
 
 See `Google Cloud Vision Product delete documentation
 
`_.
+
+Google Cloud Transfer Service Operators
 
 Review comment:
   Maybe we should add a link to the GCP page: 
https://cloud.google.com/storage-transfer/docs/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260962340
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261009106
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261004626
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260966418
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260993058
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260993571
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261407861
 
 

 ##
 File path: tests/contrib/sensors/test_gcp_transfer_sensor.py
 ##
 @@ -0,0 +1,79 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import unittest
+
+from airflow.contrib.hooks.gcp_transfer_hook import GcpTransferOperationStatus
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+
+try:
+from unittest import mock
+except ImportError:
+try:
+import mock
+except ImportError:
+mock = None
+
+
+class TestGcpStorageTransferOperationWaitForJobStatusSensor(unittest.TestCase):
+
@mock.patch('airflow.contrib.sensors.gcp_transfer_sensor.GCPTransferServiceHook')
+def test_wait_for_status_success(self, mock_tool):
+operations = [{'metadata': {'status': 
GcpTransferOperationStatus.SUCCESS}}]
+mock_tool.return_value.list_transfer_operations.return_value = 
operations
+mock_tool.are_operations_contains_expected_statuses.return_value = True
+
+op = GCPTransferServiceWaitForJobStatusSensor(
+task_id='task-id',
+operation_name='operation-name',
+job_name='job-name',
+project_id='project-id',
+expected_statuses=GcpTransferOperationStatus.SUCCESS,
+)
+
+task_instance = mock.Mock(**{'xcom_push.return_value': None})
+context = {'task_instance': task_instance}
+result = op.poke(context)
+
+mock_tool.return_value.list_transfer_operations.assert_called_with(
+filter={'project_id': 'project-id', 'job_names': ['job-name']}
+)
+mock_tool.are_operations_contains_expected_statuses.assert_called_with(
+operations=operations, 
expected_statuses=[GcpTransferOperationStatus.SUCCESS]
+)
+self.assertTrue(result)
+
+
@mock.patch('airflow.contrib.sensors.gcp_transfer_sensor.GCPTransferServiceHook')
+def test_wait_for_status_success_default_expected_status(self, mock_tool):
 
 Review comment:
   Is this testing the case with no operations? If so the test name should 
reflect that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260973882
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
 
 Review comment:
   What is "body project_id"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260965148
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'awsS3DataSource': {'bucketName': GCP_TRANSFER_SOURCE_AWS_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_aws]
+
+# [START howto_operator_gct_create_job_body_gcp]
+create_body_gcs = {
+"description": GCP_DESCRIPTION,
+"status": GcpTransferJobsStatus.ENABLED,
+"projectId": GCP_PROJECT_ID,
+"schedule": {
+"scheduleStartDate": datetime(2015, 1, 1).date(),
+"scheduleEndDate": datetime(2030, 1, 1).date(),
+"startTimeOfDay": datetime.utcnow() + timedelta(minutes=2),
+},
+"transferSpec": {
+'gcsDataSource': {'bucketName': GCP_TRANSFER_FIRST_TARGET_BUCKET},
+"gcsDataSink": {"bucketName": GCP_TRANSFER_SECOND_TARGET_BUCKET},
+"transferOptions": {"overwriteObjectsAlreadyExistingInSink": True},
+},
+}
+# [END howto_operator_gct_create_job_body_gcp]
+
+# [START howto_operator_gct_update_job_body]
+update_body = {
+"projectId": GCP_PROJECT_ID,
+"transferJob": {"description": "%s_updated".format(GCP_DESCRIPTION)},
+"updateTransferJobFieldMask": 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261007212
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
+return
+
+if 'awsAccessKey' in self.body['transferSpec']['awsS3DataSource']:
+raise AirflowException(
+"AWS credentials detected inside the body parameter 
(awsAccessKey). This is not allowed, "
+"please use Airflow connections to store credentials."
+)
+
+def _restrict_empty_body(self):
+if not self.body:
+raise AirflowException("The required parameter 'body' is empty or 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260996490
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260991871
 
 

 ##
 File path: airflow/contrib/hooks/gcp_transfer_hook.py
 ##
 @@ -52,56 +76,277 @@ def get_conn(self):
 """
 if not self._conn:
 http_authorized = self._authorize()
-self._conn = build('storagetransfer', self.api_version,
-   http=http_authorized, cache_discovery=False)
+self._conn = build(
+'storagetransfer', self.api_version, http=http_authorized, 
cache_discovery=False
+)
 return self._conn
 
-def create_transfer_job(self, description, schedule, transfer_spec, 
project_id=None):
-transfer_job = {
-'status': 'ENABLED',
-'projectId': project_id or self.project_id,
-'description': description,
-'transferSpec': transfer_spec,
-'schedule': schedule or self._schedule_once_now(),
-}
-return 
self.get_conn().transferJobs().create(body=transfer_job).execute()
-
-def wait_for_transfer_job(self, job):
+@GoogleCloudBaseHook.catch_http_exception
+def create_transfer_job(self, body):
+"""
+Creates a transfer job that runs periodically.
+
+:param body: (Required) A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: transfer job.
+See:
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs#TransferJob
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return 
self.get_conn().transferJobs().create(body=body).execute(num_retries=NUM_RETRIES)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def get_transfer_job(self, job_name, project_id=None):
+"""
+Gets the latest state of a long-running operation in Google Storage
+Transfer Service.
+
+:param job_name: (Required) Name of the job to be fetched
+:type job_name: str
+:param project_id: (Optional) the ID of the project that owns the 
Transfer
+Job. If set to None or missing, the default project_id from the GCP
+connection is used.
+:type project_id: str
+:return: Transfer Job
+:rtype: dict
+"""
+return (
+self.get_conn()
+.transferJobs()
+.get(jobName=job_name, projectId=project_id)
+.execute(num_retries=NUM_RETRIES)
+)
+
+def list_transfer_job(self, filter):
+"""
+Lists long-running operations in Google Storage Transfer
+Service that match the specified filter.
+
+:param filter: (Required) A request filter, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/list#body.QUERY_PARAMETERS.filter
+:type filter: dict
+:return: List of Transfer Jobs
+:rtype: list[dict]
+"""
+conn = self.get_conn()
+filter = self._inject_project_id(filter, 'filter project_id')
+request = conn.transferJobs().list(filter=json.dumps(filter))
+jobs = []
+
+while request is not None:
+response = request.execute(num_retries=NUM_RETRIES)
+jobs.extend(response['transferJobs'])
+
+request = conn.transferJobs().list_next(previous_request=request, 
previous_response=response)
+
+return jobs
+
+@GoogleCloudBaseHook.catch_http_exception
+def update_transfer_job(self, job_name, body):
+"""
+Updates a transfer job that runs periodically.
+
+:param job_name: (Required) Name of the job to be updated
+:type job_name: str
+:param body: A request body, as described in
+
https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/patch#request-body
+:type body: dict
+:return: If successful, TransferJob.
+:rtype: dict
+"""
+body = self._inject_project_id(body, 'body project_id')
+return (
+self.get_conn().transferJobs().patch(jobName=job_name, 
body=body).execute(num_retries=NUM_RETRIES)
+)
+
+@GoogleCloudBaseHook.fallback_to_default_project_id
+@GoogleCloudBaseHook.catch_http_exception
+def delete_transfer_job(self, job_name, project_id):
+"""
+Deletes a transfer job. This is a soft delete. After a transfer job is
+deleted, the job and all the transfer executions are subject to garbage
+collection. Transfer jobs become eligible for garbage collection
+30 days after soft delete.
+
+:param job_name: 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261427718
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261282629
 
 

 ##
 File path: airflow/contrib/operators/gcs_to_gcs_transfer_operator.py
 ##
 @@ -16,112 +16,13 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import warnings
 
-from airflow.models import BaseOperator
-from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
-from airflow.utils.decorators import apply_defaults
+from airflow.contrib.operators.gcp_transfer_operator import (  # noqa
 
 Review comment:
   What's # noqa?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261426663
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
 
 Review comment:
   Shouldn't we let the unit test fail w/o mock?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261002151
 
 

 ##
 File path: airflow/contrib/operators/gcp_transfer_operator.py
 ##
 @@ -0,0 +1,753 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from copy import deepcopy
+from datetime import date, time
+
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook, 
GcpTransferJobsStatus
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+try:
+from airflow.contrib.hooks.aws_hook import AwsHook
+except ImportError:  # pragma: no cover
+AwsHook = None
+
+
+class TransferJobPreprocessor:
+def __init__(self, body, aws_conn_id='aws_default'):
+self.body = body
+self.aws_conn_id = aws_conn_id
+
+def _inject_aws_credentials(self):
+if 'transferSpec' not in self.body or 'awsS3DataSource' not in 
self.body['transferSpec']:
+return
+
+aws_hook = AwsHook(self.aws_conn_id)
+aws_credentials = aws_hook.get_credentials()
+aws_access_key_id = aws_credentials.access_key
+aws_secret_access_key = aws_credentials.secret_key
+self.body['transferSpec']['awsS3DataSource']["awsAccessKey"] = {
+"accessKeyId": aws_access_key_id,
+"secretAccessKey": aws_secret_access_key,
+}
+
+def _reformat_date(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], date):
+schedule[field_key] = 
self._convert_date_to_dict(schedule[field_key])
+
+def _reformat_time(self, field_key):
+schedule = self.body['schedule']
+if field_key not in schedule:
+return
+if isinstance(schedule[field_key], time):
+schedule[field_key] = 
self._convert_time_to_dict(schedule[field_key])
+
+def _reformat_schedule(self):
+if 'schedule' not in self.body:
+return
+self._reformat_date('scheduleStartDate')
+self._reformat_date('scheduleEndDate')
+self._reformat_time('startTimeOfDay')
+
+def process_body(self):
+self._inject_aws_credentials()
+self._reformat_schedule()
+return self.body
+
+@staticmethod
+def _convert_date_to_dict(field_date):
+"""
+Convert native python ``datetime.date`` object  to a format supported 
by the API
+"""
+return {'day': field_date.day, 'month': field_date.month, 'year': 
field_date.year}
+
+@staticmethod
+def _convert_time_to_dict(time):
+"""
+Convert native python ``datetime.time`` object  to a format supported 
by the API
+"""
+return {"hours": time.hour, "minutes": time.minute, "seconds": 
time.second}
+
+
+class TransferJobValidator:
+def __init__(self, body):
+self.body = body
+
+def _verify_data_source(self):
+is_gcs = 'gcsDataSource' in self.body['transferSpec']
+is_aws_s3 = 'awsS3DataSource' in self.body['transferSpec']
+is_http = 'httpDataSource' in self.body['transferSpec']
+
+sources_count = sum([is_gcs, is_aws_s3, is_http])
+if sources_count != 0 and sources_count != 1:
+raise AirflowException(
+"More than one data source detected. Please choose exactly one 
data source from: "
+"gcsDataSource, awsS3DataSource and httpDataSource."
+)
+
+def _restrict_aws_credentials(self):
+if 'awsS3DataSource' not in self.body['transferSpec']:
 
 Review comment:
   Have you considered putting all of these keys as constants rather than 
strings? It can prevent typos and allow users to use these keys directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261443202
 
 

 ##
 File path: tests/contrib/operators/test_gcp_transfer_operator.py
 ##
 @@ -0,0 +1,702 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import itertools
+import unittest
+from copy import deepcopy
+from datetime import date, time
+
+from parameterized import parameterized
+from botocore.credentials import Credentials
+
+from airflow import AirflowException, configuration
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceOperationCancelOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationsListOperator,
+TransferJobValidator,
+TransferJobPreprocessor,
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+S3ToGoogleCloudStorageTransferOperator,
+GoogleCloudStorageToGoogleCloudStorageTransferOperator,
+GcpTransferServiceJobDeleteOperator,
+)
+from airflow.models import TaskInstance, DAG
+from airflow.utils import timezone
+
+
+try:
+# noinspection PyProtectedMember
+from unittest import mock
+except ImportError:  # pragma: no cover
+try:
+import mock
+except ImportError:
+mock = None
+try:
+import boto3
+except ImportError:  # pragma: no cover
+boto3 = None
+
+GCP_PROJECT_ID = 'project-id'
+TASK_ID = 'task-id'
+
+JOB_NAME = "job-name"
+OPERATION_NAME = "operation-name"
+AWS_BUCKET_NAME = "aws-bucket-name"
+GCS_BUCKET_NAME = "gcp-bucket-name"
+DESCRIPTION = "description"
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
+
+FILTER = {'jobNames': [JOB_NAME]}
+
+AWS_ACCESS_KEY_ID = "test-key-1"
+AWS_ACCESS_SECRET = "test-secret-1"
+AWS_ACCESS_KEY = {'accessKeyId': AWS_ACCESS_KEY_ID, 'secretAccessKey': 
AWS_ACCESS_SECRET}
+
+NATIVE_DATE = date(2018, 10, 15)
+DICT_DATE = {'day': 15, 'month': 10, 'year': 2018}
+NATIVE_TIME = time(hour=11, minute=42, second=43)
+DICT_TIME = {'hours': 11, 'minutes': 42, 'seconds': 43}
+SCHEDULE_NATIVE = {
+'scheduleStartDate': NATIVE_DATE,
+'scheduleEndDate': NATIVE_DATE,
+'startTimeOfDay': NATIVE_TIME,
+}
+
+SCHEDULE_DICT = {
+'scheduleStartDate': {'day': 15, 'month': 10, 'year': 2018},
+'scheduleEndDate': {'day': 15, 'month': 10, 'year': 2018},
+'startTimeOfDay': {'hours': 11, 'minutes': 42, 'seconds': 43},
+}
+
+SOURCE_AWS = {"awsS3DataSource": {"bucketName": AWS_BUCKET_NAME}}
+SOURCE_GCS = {"gcsDataSource": {"bucketName": GCS_BUCKET_NAME}}
+SOURCE_HTTP = {"httpDataSource": {"list_url": "http://example.com"}}
+
+VALID_TRANSFER_JOB_BASE = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+VALID_TRANSFER_JOB_GCS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_GCS['transferSpec'].update(deepcopy(SOURCE_GCS))
+VALID_TRANSFER_JOB_AWS = deepcopy(VALID_TRANSFER_JOB_BASE)
+VALID_TRANSFER_JOB_AWS['transferSpec'].update(deepcopy(SOURCE_AWS))
+
+VALID_TRANSFER_JOB_GCS = {
+"name": JOB_NAME,
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_NATIVE,
+'transferSpec': {
+"gcsDataSource": {"bucketName": GCS_BUCKET_NAME},
+'gcsDataSink': {'bucketName': GCS_BUCKET_NAME},
+},
+}
+
+VALID_TRANSFER_JOB_RAW = {
+'description': DESCRIPTION,
+'status': 'ENABLED',
+'schedule': SCHEDULE_DICT,
+'transferSpec': {'gcsDataSink': {'bucketName': GCS_BUCKET_NAME}},
+}
+
+VALID_TRANSFER_JOB_GCS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_GCS_RAW['transferSpec'].update(SOURCE_GCS)
+VALID_TRANSFER_JOB_AWS_RAW = deepcopy(VALID_TRANSFER_JOB_RAW)
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec'].update(deepcopy(SOURCE_AWS))
+VALID_TRANSFER_JOB_AWS_RAW['transferSpec']['awsS3DataSource']['awsAccessKey'] 
= AWS_ACCESS_KEY
+
+
+VALID_OPERATION = {"name": "operation-name"}
+
+
+class TransferJobPreprocessorTest(unittest.TestCase):
+def 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r261414544
 
 

 ##
 File path: tests/contrib/hooks/test_gcp_transfer_hook.py
 ##
 @@ -18,125 +18,487 @@
 # under the License.
 #
 import json
-import datetime
 import unittest
+from copy import deepcopy
 
-from airflow.exceptions import AirflowException
-from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
-from airflow.contrib.hooks.gcp_transfer_hook import TIME_TO_SLEEP_IN_SECONDS
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import (
+GCPTransferServiceHook,
+TIME_TO_SLEEP_IN_SECONDS,
+GcpTransferOperationStatus,
+)
+from tests.contrib.utils.base_gcp_mock import (
+mock_base_gcp_hook_no_default_project_id,
+mock_base_gcp_hook_default_project_id,
+)
 
 try:
 from unittest import mock
-except ImportError:
+except ImportError:  # pragma: no cover
 try:
 import mock
 except ImportError:
 mock = None
 
+PROJECT_ID = 'project-id'
+BODY = {'description': 'AAA', 'project_id': PROJECT_ID}
 
-class TestGCPTransferServiceHook(unittest.TestCase):
+TRANSFER_JOB_NAME = "transfer-job"
+TRANSFER_OPERATION_NAME = "transfer-operation"
+
+TRANSFER_JOB = {"name": TRANSFER_JOB_NAME}
+TRANSFER_OPERATION = {"name": TRANSFER_OPERATION_NAME}
+
+TRANSFER_JOB_FILTER = {'project_id': 'project-id', 'job_names': 
[TRANSFER_JOB_NAME]}
+TRANSFER_OPERATION_FILTER = {'project_id': 'project-id', 'job_names': 
[TRANSFER_JOB_NAME]}
+UPDATE_TRANSFER_JOB_BODY = {
+"transferJob": {'description': 'description-1'},
+'project_id': PROJECT_ID,
+"update_transfer_job_field_mask": 'description',
+}
+
+
+class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
 def setUp(self):
-with mock.patch.object(GCPTransferServiceHook, '__init__', 
return_value=None):
-self.conn = mock.Mock()
-self.transfer_hook = GCPTransferServiceHook()
-self.transfer_hook._conn = self.conn
-
-def test_create_transfer_job(self):
-mock_create = self.conn.transferJobs.return_value.create
-mock_execute = mock_create.return_value.execute
-mock_execute.return_value = {
-'projectId': 'test-project',
-'name': 'transferJobs/test-job',
-}
-now = datetime.datetime.utcnow()
-transfer_spec = {
-'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
-'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
-}
-self.transfer_hook.create_transfer_job(
-project_id='test-project',
-description='test-description',
-schedule=None,
-transfer_spec=transfer_spec)
-mock_create.assert_called_once_with(body={
-'status': 'ENABLED',
-'projectId': 'test-project',
-'description': 'test-description',
-'transferSpec': transfer_spec,
-'schedule': {
-'scheduleStartDate': {
-'day': now.day,
-'month': now.month,
-'year': now.year,
-},
-'scheduleEndDate': {
-'day': now.day,
-'month': now.month,
-'year': now.year,
-}
-}
-})
-
-def test_create_transfer_job_custom_schedule(self):
-mock_create = self.conn.transferJobs.return_value.create
-mock_execute = mock_create.return_value.execute
-mock_execute.return_value = {
-'projectId': 'test-project',
-'name': 'transferJobs/test-job',
-}
-schedule = {
-'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
-'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
-}
-transfer_spec = {
-'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
-'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
-}
-self.transfer_hook.create_transfer_job(
-project_id='test-project',
-description='test-description',
-schedule=schedule,
-transfer_spec=transfer_spec)
-mock_create.assert_called_once_with(body={
-'status': 'ENABLED',
-'projectId': 'test-project',
-'description': 'test-description',
-'transferSpec': transfer_spec,
-'schedule': schedule,
-})
+with mock.patch(
+'airflow.contrib.hooks.' 
'gcp_api_base_hook.GoogleCloudBaseHook.__init__',
+new=mock_base_gcp_hook_no_default_project_id,
+):
+self.gct_hook = GCPTransferServiceHook(gcp_conn_id='test')
+
+@mock.patch('airflow.contrib.hooks.gcp_transfer_hook.' 
'GCPTransferServiceHook.get_conn')
+def 

[GitHub] sdevani commented on a change in pull request #4792: [AIRFLOW-3659] Create Google Cloud Transfer Service Operators

2019-02-28 Thread GitBox
sdevani commented on a change in pull request #4792:  [AIRFLOW-3659] Create 
Google Cloud Transfer Service Operators 
URL: https://github.com/apache/airflow/pull/4792#discussion_r260960865
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_transfer.py
 ##
 @@ -0,0 +1,254 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that demonstrates interactions with Google Cloud Transfer.
+
+This DAG relies on the following OS environment variables
+
+* GCP_PROJECT_ID - Google Cloud Project to use for the Google Cloud Transfer 
Service.
+* GCP_DESCRIPTION - Description of transfer job
+* GCP_TRANSFER_SOURCE_AWS_BUCKET - Amazon Web Services Storage bucket from 
which files are copied.
+* GCP_TRANSFER_FIRST_TARGET_BUCKET - Google Cloud Storage bucket to which 
files are copied from AWS.
+  It is also a source bucket in next step
+* GCP_TRANSFER_SECOND_TARGET_BUCKET - Google Cloud Storage bucket bucket to 
which files are copied
+* WAIT_FOR_OPERATION_POKE_INTERVAL - interval of what to check the status of 
the operation
+
+"""
+import os
+from datetime import datetime, timedelta
+
+from airflow import models
+from airflow.contrib.hooks.gcp_transfer_hook import 
GcpTransferOperationStatus, GcpTransferJobsStatus
+from airflow.contrib.operators.gcp_transfer_operator import (
+GcpTransferServiceJobCreateOperator,
+GcpTransferServiceJobDeleteOperator,
+GcpTransferServiceJobUpdateOperator,
+GcpTransferServiceOperationsListOperator,
+GcpTransferServiceOperationGetOperator,
+GcpTransferServiceOperationPauseOperator,
+GcpTransferServiceOperationResumeOperator,
+GcpTransferServiceOperationCancelOperator,
+)
+from airflow.contrib.sensors.gcp_transfer_sensor import 
GCPTransferServiceWaitForJobStatusSensor
+from airflow.utils.dates import days_ago
+
+# [START howto_operator_gct_common_variables]
+GCP_PROJECT_ID = os.environ.get('GCP_PROJECT_ID', 'example-project')
+GCP_DESCRIPTION = os.environ.get('GCP_DESCRIPTION', 'description')
+GCP_TRANSFER_TARGET_BUCKET = os.environ.get('GCP_TRANSFER_TARGET_BUCKET')
+WAIT_FOR_OPERATION_POKE_INTERVAL = 
os.environ.get('WAIT_FOR_OPERATION_POKE_INTERVAL', 5)
+
+GCP_TRANSFER_SOURCE_AWS_BUCKET = 
os.environ.get('GCP_TRANSFER_SOURCE_AWS_BUCKET')
+GCP_TRANSFER_FIRST_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_FIRST_TARGET_BUCKET', 'gcp-transfer-first-target'
+)
+GCP_TRANSFER_SECOND_TARGET_BUCKET = os.environ.get(
+'GCP_TRANSFER_SECOND_TARGET_BUCKET', 'gcp-transfer-second-target'
+)
+# [END howto_operator_gct_common_variables]
+
+# [START howto_operator_gct_create_job_body_aws]
+create_body_aws = {
 
 Review comment:
   To be a bit more descriptive, maybe a name like aws_to_gcp_transfer_body?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261441099
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_vision.py
 ##
 @@ -57,21 +74,41 @@
 GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1')
 # [END howto_operator_vision_args_common]
 
-# [START howto_operator_vision_productset]
-product_set = ProductSet(display_name='My Product Set 1')
-# [END howto_operator_vision_productset]
+# [START howto_operator_vision_product_set_explicit_id]
+GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 
'product_set_explicit_id')
+# [END howto_operator_vision_product_set_explicit_id]
+
+# [START howto_operator_vision_product_explicit_id]
+GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 
'product_explicit_id')
+# [END howto_operator_vision_product_explicit_id]
+
+# [START howto_operator_vision_reference_image_args]
+GCP_VISION_REFERENCE_IMAGE_ID = 
os.environ.get('GCP_VISION_REFERENCE_IMAGE_ID', 'reference_image_explicit_id')
+GCP_VISION_REFERENCE_IMAGE_URL = 
os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket/image1.jpg')
+# [END howto_operator_vision_reference_image_args]
+
+# [START howto_operator_vision_annotate_image_url]
+GCP_VISION_ANNOTATE_IMAGE_URL = 
os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket/image2.jpg')
+# [END howto_operator_vision_annotate_image_url]
+
+# [START howto_operator_vision_product_set]
+product_set = ProductSet(display_name='My Product Set')
+# [END howto_operator_vision_product_set]
 
 # [START howto_operator_vision_product]
 product = Product(display_name='My Product 1', product_category='toys')
 # [END howto_operator_vision_product]
 
-# [START howto_operator_vision_productset_explicit_id]
-GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 
'product_set_explicit_id')
-# [END howto_operator_vision_productset_explicit_id]
+# [START howto_operator_vision_reference_image]
+reference_image = ReferenceImage(uri=GCP_VISION_REFERENCE_IMAGE_URL)
 
 Review comment:
   Please see the comment in operator CloudVisionReferenceImageCreateOperator 
that this reference_image may be created from another task. Then the image URL 
is got from "task_instance.xcom_pull"


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261428480
 
 

 ##
 File path: airflow/contrib/example_dags/example_gcp_vision.py
 ##
 @@ -57,21 +74,41 @@
 GCP_VISION_LOCATION = os.environ.get('GCP_VISION_LOCATION', 'europe-west1')
 # [END howto_operator_vision_args_common]
 
-# [START howto_operator_vision_productset]
-product_set = ProductSet(display_name='My Product Set 1')
-# [END howto_operator_vision_productset]
+# [START howto_operator_vision_product_set_explicit_id]
+GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 
'product_set_explicit_id')
+# [END howto_operator_vision_product_set_explicit_id]
+
+# [START howto_operator_vision_product_explicit_id]
+GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 
'product_explicit_id')
+# [END howto_operator_vision_product_explicit_id]
+
+# [START howto_operator_vision_reference_image_args]
+GCP_VISION_REFERENCE_IMAGE_ID = 
os.environ.get('GCP_VISION_REFERENCE_IMAGE_ID', 'reference_image_explicit_id')
+GCP_VISION_REFERENCE_IMAGE_URL = 
os.environ.get('GCP_VISION_REFERENCE_IMAGE_URL', 'gs://bucket/image1.jpg')
+# [END howto_operator_vision_reference_image_args]
+
+# [START howto_operator_vision_annotate_image_url]
+GCP_VISION_ANNOTATE_IMAGE_URL = 
os.environ.get('GCP_VISION_ANNOTATE_IMAGE_URL', 'gs://bucket/image2.jpg')
+# [END howto_operator_vision_annotate_image_url]
+
+# [START howto_operator_vision_product_set]
+product_set = ProductSet(display_name='My Product Set')
+# [END howto_operator_vision_product_set]
 
 # [START howto_operator_vision_product]
 product = Product(display_name='My Product 1', product_category='toys')
 # [END howto_operator_vision_product]
 
-# [START howto_operator_vision_productset_explicit_id]
-GCP_VISION_PRODUCT_SET_ID = os.environ.get('GCP_VISION_PRODUCT_SET_ID', 
'product_set_explicit_id')
-# [END howto_operator_vision_productset_explicit_id]
+# [START howto_operator_vision_reference_image]
+reference_image = ReferenceImage(uri=GCP_VISION_REFERENCE_IMAGE_URL)
+# [END howto_operator_vision_reference_image]
 
-# [START howto_operator_vision_product_explicit_id]
-GCP_VISION_PRODUCT_ID = os.environ.get('GCP_VISION_PRODUCT_ID', 
'product_explicit_id')
-# [END howto_operator_vision_product_explicit_id]
+# [START howto_operator_vision_annotate_image_request]
+annotate_image_request = {
+'image': {'source': {'image_uri': GCP_VISION_ANNOTATE_IMAGE_URL}},
+'features': [{'type': enums.Feature.Type.LOGO_DETECTION}],
+}
+# [END howto_operator_vision_annotate_image_request]
 
 with models.DAG(
 
 Review comment:
   This dag will be giant when we have all operators implemented at the end. An 
option is to implement different examples into separate dags. e.g., product 
search and image annotation dags.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261438146
 
 

 ##
 File path: airflow/contrib/operators/gcp_vision_operator.py
 ##
 @@ -671,3 +672,284 @@ def execute(self, context):
 timeout=self.timeout,
 metadata=self.metadata,
 )
+
+
+class CloudVisionAnnotateImageOperator(BaseOperator):
+"""
+Run image detection and annotation for an image.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionAnnotateImageOperator`
+
+:param request: (Required) Individual file annotation requests.
+If a dict is provided, it must be of the same form as the protobuf
+message class:`google.cloud.vision_v1.types.AnnotateImageRequest`
+:type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_annotate_image_template_fields]
+template_fields = ('request', 'gcp_conn_id')
+# [END vision_annotate_image_template_fields]
+
+@apply_defaults
+def __init__(
+self, request, retry=None, timeout=None, 
gcp_conn_id='google_cloud_default', *args, **kwargs
+):
+super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs)
+self.request = request
+self.retry = retry
+self.timeout = timeout
+self.gcp_conn_id = gcp_conn_id
+
+def execute(self, context):
+hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
+return hook.annotate_image(request=self.request, retry=self.retry, 
timeout=self.timeout)
+
+
+class CloudVisionReferenceImageCreateOperator(BaseOperator):
+"""
+Creates and returns a new ReferenceImage ID resource.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionReferenceImageCreateOperator`
+
+:param location: (Required) The region where the Product is located. Valid 
regions (as of 2019-02-05) are:
+us-east1, us-west1, europe-west1, asia-east1
+:type location: str
+:param reference_image: (Required) The reference image to create. If an 
image ID is specified, it is
+ignored.
+If a dict is provided, it must be of the same form as the protobuf 
message
+:class:`google.cloud.vision_v1.types.ReferenceImage`
+:type reference_image: dict or google.cloud.vision_v1.types.ReferenceImage
+:param reference_image_id: (Optional) A user-supplied resource id for the 
ReferenceImage to be added.
+If set, the server will attempt to use this value as the resource id. 
If it is already in use, an
+error is returned with code ALREADY_EXISTS. Must be at most 128 
characters long. It cannot contain
+the character `/`.
+:type reference_image_id: str
+:param product_id: (Optional) The resource id of this Product.
+:type product_id: str
+:param project_id: (Optional) The project in which the Product is located. 
If set to None or
+missing, the default project_id from the GCP connection is used.
+:type project_id: str
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param metadata: (Optional) Additional metadata that is provided to the 
method.
+:type metadata: sequence[tuple[str, str]]
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_reference_image_create_template_fields]
+template_fields = ('location', 'product_id', 'reference_image_id', 
'project_id', 'gcp_conn_id')
 
 Review comment:
   There may be a need to be able to render reference_image's URL using 
template, then we can use "task_instance.xcom_pull(...)". It would be useful if 
this reference image is created by a precedent task.
   
   For example:
   reference_image = ReferenceImage(uri=GCP_VISION_REFERENCE_IMAGE_URL)
   The URL "GCP_VISION_REFERENCE_IMAGE_URL" may 

[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261409371
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -354,3 +307,140 @@ def _get_autogenerated_id(response):
 if '/' not in name:
 raise AirflowException('Unable to get id from name... 
[{}]'.format(name))
 return name.rsplit('/', 1)[1]
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def create_reference_image(
+self,
+location,
+product_id,
+reference_image,
+reference_image_id=None,
+project_id=None,
+retry=None,
+timeout=None,
+metadata=None,
+):
+"""
+For the documentation see:
+
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
+"""
+client = self.get_conn()
+self.log.info('Creating ReferenceImage')
+parent = ProductSearchClient.product_path(project=project_id, 
location=location, product=product_id)
+
+response = client.create_reference_image(
+parent=parent,
+reference_image=reference_image,
+reference_image_id=reference_image_id,
+retry=retry,
+timeout=timeout,
+metadata=metadata,
+)
+
+self.log.info('ReferenceImage created: %s', response.name if response 
else '')
+self.log.debug('ReferenceImage created:\n%s', response)
+
+if not reference_image_id:
+# Refernece image  id was generated by the API
+reference_image_id = self._get_autogenerated_id(response)
+self.log.info('Extracted autogenerated ReferenceImage ID from the 
response: %s', product_set_id)
+
+return reference_image_id
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def delete_reference_image(
 
 Review comment:
   Maybe better to make it the same for functions to return a response or not? 
The delete_product, delete_product_set methods return a response.
   
   Please check other methods as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261411536
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -354,3 +307,140 @@ def _get_autogenerated_id(response):
 if '/' not in name:
 raise AirflowException('Unable to get id from name... 
[{}]'.format(name))
 return name.rsplit('/', 1)[1]
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def create_reference_image(
+self,
+location,
+product_id,
+reference_image,
+reference_image_id=None,
+project_id=None,
+retry=None,
+timeout=None,
+metadata=None,
+):
+"""
+For the documentation see:
+
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
+"""
+client = self.get_conn()
+self.log.info('Creating ReferenceImage')
+parent = ProductSearchClient.product_path(project=project_id, 
location=location, product=product_id)
+
+response = client.create_reference_image(
+parent=parent,
+reference_image=reference_image,
+reference_image_id=reference_image_id,
+retry=retry,
+timeout=timeout,
+metadata=metadata,
+)
+
+self.log.info('ReferenceImage created: %s', response.name if response 
else '')
+self.log.debug('ReferenceImage created:\n%s', response)
+
+if not reference_image_id:
+# Refernece image  id was generated by the API
+reference_image_id = self._get_autogenerated_id(response)
+self.log.info('Extracted autogenerated ReferenceImage ID from the 
response: %s', product_set_id)
+
+return reference_image_id
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def delete_reference_image(
+self,
+location,
+product_id,
+reference_image_id,
+project_id=None,
+retry=None,
+timeout=None,
+metadata=None,
+):
+"""
+For the documentation see:
+
:py:class:`~airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator`
+"""
+client = self.get_conn()
+self.log.info('Deleting ReferenceImage')
+name = ProductSearchClient.reference_image_path(
+project=project_id, location=location, product=product_id, 
reference_image=reference_image_id
+)
+client.delete_reference_image(name=name, retry=retry, timeout=timeout, 
metadata=metadata)
 
 Review comment:
   Maybe add a log.info here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261360986
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -354,3 +307,140 @@ def _get_autogenerated_id(response):
 if '/' not in name:
 raise AirflowException('Unable to get id from name... 
[{}]'.format(name))
 return name.rsplit('/', 1)[1]
+
+@GoogleCloudBaseHook.catch_http_exception
+@GoogleCloudBaseHook.fallback_to_default_project_id
+def create_reference_image(
 
 Review comment:
   Maybe it is nicer to have the same layout for all function argument 
definitions? i.e., rows or a column


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261420342
 
 

 ##
 File path: airflow/contrib/operators/gcp_vision_operator.py
 ##
 @@ -671,3 +672,284 @@ def execute(self, context):
 timeout=self.timeout,
 metadata=self.metadata,
 )
+
+
+class CloudVisionAnnotateImageOperator(BaseOperator):
+"""
+Run image detection and annotation for an image.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionAnnotateImageOperator`
+
+:param request: (Required) Individual file annotation requests.
+If a dict is provided, it must be of the same form as the protobuf
+message class:`google.cloud.vision_v1.types.AnnotateImageRequest`
+:type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_annotate_image_template_fields]
+template_fields = ('request', 'gcp_conn_id')
+# [END vision_annotate_image_template_fields]
+
+@apply_defaults
+def __init__(
+self, request, retry=None, timeout=None, 
gcp_conn_id='google_cloud_default', *args, **kwargs
+):
+super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs)
+self.request = request
+self.retry = retry
+self.timeout = timeout
+self.gcp_conn_id = gcp_conn_id
+
+def execute(self, context):
+hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
+return hook.annotate_image(request=self.request, retry=self.retry, 
timeout=self.timeout)
+
+
+class CloudVisionReferenceImageCreateOperator(BaseOperator):
+"""
+Creates and returns a new ReferenceImage ID resource.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionReferenceImageCreateOperator`
+
+:param location: (Required) The region where the Product is located. Valid 
regions (as of 2019-02-05) are:
+us-east1, us-west1, europe-west1, asia-east1
+:type location: str
+:param reference_image: (Required) The reference image to create. If an 
image ID is specified, it is
+ignored.
+If a dict is provided, it must be of the same form as the protobuf 
message
+:class:`google.cloud.vision_v1.types.ReferenceImage`
+:type reference_image: dict or google.cloud.vision_v1.types.ReferenceImage
+:param reference_image_id: (Optional) A user-supplied resource id for the 
ReferenceImage to be added.
+If set, the server will attempt to use this value as the resource id. 
If it is already in use, an
+error is returned with code ALREADY_EXISTS. Must be at most 128 
characters long. It cannot contain
+the character `/`.
+:type reference_image_id: str
+:param product_id: (Optional) The resource id of this Product.
+:type product_id: str
+:param project_id: (Optional) The project in which the Product is located. 
If set to None or
+missing, the default project_id from the GCP connection is used.
+:type project_id: str
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param metadata: (Optional) Additional metadata that is provided to the 
method.
+:type metadata: sequence[tuple[str, str]]
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_reference_image_create_template_fields]
+template_fields = ('location', 'product_id', 'reference_image_id', 
'project_id', 'gcp_conn_id')
+# [END vision_reference_image_create_template_fields]
+
+@apply_defaults
+def __init__(
+self,
+location,
+reference_image,
+product_id,
+reference_image_id=None,
+project_id=None,
+retry=None,
+timeout=None,
+metadata=None,
+gcp_conn_id='google_cloud_default',
+

[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261423784
 
 

 ##
 File path: airflow/contrib/operators/gcp_vision_operator.py
 ##
 @@ -671,3 +672,284 @@ def execute(self, context):
 timeout=self.timeout,
 metadata=self.metadata,
 )
+
+
+class CloudVisionAnnotateImageOperator(BaseOperator):
+"""
+Run image detection and annotation for an image.
+
+.. seealso::
+For more information on how to use this operator, take a look at the 
guide:
+:ref:`howto/operator:CloudVisionAnnotateImageOperator`
+
+:param request: (Required) Individual file annotation requests.
+If a dict is provided, it must be of the same form as the protobuf
+message class:`google.cloud.vision_v1.types.AnnotateImageRequest`
+:type request: dict or google.cloud.vision_v1.types.AnnotateImageRequest
+:param retry: (Optional) A retry object used to retry requests. If `None` 
is
+specified, requests will not be retried.
+:type retry: google.api_core.retry.Retry
+:param timeout: (Optional) The amount of time, in seconds, to wait for the 
request to
+complete. Note that if retry is specified, the timeout applies to each 
individual
+attempt.
+:type timeout: float
+:param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud Platform.
+:type gcp_conn_id: str
+"""
+
+# [START vision_annotate_image_template_fields]
+template_fields = ('request', 'gcp_conn_id')
+# [END vision_annotate_image_template_fields]
+
+@apply_defaults
+def __init__(
+self, request, retry=None, timeout=None, 
gcp_conn_id='google_cloud_default', *args, **kwargs
+):
+super(CloudVisionAnnotateImageOperator, self).__init__(*args, **kwargs)
+self.request = request
+self.retry = retry
+self.timeout = timeout
+self.gcp_conn_id = gcp_conn_id
+
+def execute(self, context):
+hook = CloudVisionHook(gcp_conn_id=self.gcp_conn_id)
 
 Review comment:
   The operators implemented previously do create and hold hook as a property, 
e.g.:
   
   return self._hook.delete_product(
   
   Please check which is better and make it unified.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] coufon commented on a change in pull request #4791: [AIRFLOW-3908] Add more Google Cloud Vision operators

2019-02-28 Thread GitBox
coufon commented on a change in pull request #4791:  [AIRFLOW-3908] Add more 
Google Cloud Vision operators
URL: https://github.com/apache/airflow/pull/4791#discussion_r261363540
 
 

 ##
 File path: airflow/contrib/hooks/gcp_vision_hook.py
 ##
 @@ -157,17 +162,12 @@ def get_product_set(
 client = self.get_conn()
 name = ProductSearchClient.product_set_path(project_id, location, 
product_set_id)
 self.log.info('Retrieving ProductSet: %s', name)
-response = self._handle_request(
-lambda **kwargs: client.get_product_set(**kwargs),
-name=name,
-retry=retry,
-timeout=timeout,
-metadata=metadata,
-)
+response = client.get_product_set(name=name, retry=retry, 
timeout=timeout, metadata=metadata)
 
 Review comment:
   nit: maybe looks nicer to keep function arguments in the same layout, in row 
or column.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >