[jira] [Assigned] (AIRFLOW-2508) Handle non string types in render_template_from_field

2018-12-06 Thread Galak (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak reassigned AIRFLOW-2508:
--

Assignee: Galak  (was: Eugene Brown)

> Handle non string types in render_template_from_field
> -
>
> Key: AIRFLOW-2508
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2508
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Affects Versions: 2.0.0
>Reporter: Eugene Brown
>Assignee: Galak
>Priority: Minor
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> The render_template_from_field method of the BaseOperator class raises an 
> exception when it encounters content that is not a string_type, list, tuple 
> or dict.
> Example exception:
> {noformat}
> airflow.exceptions.AirflowException: Type '' used for parameter 
> 'job_flow_overrides[Instances][InstanceGroups][InstanceCount]' is not 
> supported for templating{noformat}
> I propose instead that when it encounters content of other types it returns 
> the content unchanged, rather than raising an exception.
> Consider this case: I extended the EmrCreateJobFlowOperator to make the 
> job_flow_overrides argument a templatable field. job_flow_overrides is a 
> dictionary with a mix of strings, integers and booleans for values.
> When I extended the class as such:
> {code:java}
> class EmrCreateJobFlowOperatorTemplateOverrides(EmrCreateJobFlowOperator):
> template_fields = ['job_flow_overrides']{code}
> And added a task to my dag with this format:
> {code:java}
> step_create_cluster = EmrCreateJobFlowOperatorTemplateOverrides(
> task_id="create_cluster",
> job_flow_overrides={
> "Name": "my-cluster {{ dag_run.conf['run_date'] }}",
> "Instances": {
> "InstanceGroups": [
> {
> "Name": "Master nodes",
> "InstanceType": "c3.4xlarge",
> "InstanceCount": 1
> },
> {
> "Name": "Slave nodes",
> "InstanceType": "c3.4xlarge",
> "InstanceCount": 4
> },
> "TerminationProtected": False
> ]
> },
> "BootstrapActions": [{
>  "Name": "Custom action",
>  "ScriptBootstrapAction": {
>  "Path": "s3://repo/{{ dag_run.conf['branch'] 
> }}/requirements.txt"
>  }
> }],
>},
>aws_conn_id='aws_default',
>emr_conn_id='aws_default',
>dag=dag
> )
> {code}
> The exception I gave above was raised and the step failed. I think it would 
> be preferable for the method to instead pass over numeric and boolean values 
> as users may want to use template_fields in the way I have to template string 
> values in dictionaries or lists of mixed types.
> Here is the render_template_from_field method from the BaseOperator:
> {code:java}
> def render_template_from_field(self, attr, content, context, jinja_env):
> """
> Renders a template from a field. If the field is a string, it will
> simply render the string and return the result. If it is a collection or
> nested set of collections, it will traverse the structure and render
> all strings in it.
> """
> rt = self.render_template
> if isinstance(content, six.string_types):
> result = jinja_env.from_string(content).render(**context)
> elif isinstance(content, (list, tuple)):
> result = [rt(attr, e, context) for e in content]
> elif isinstance(content, dict):
> result = {
> k: rt("{}[{}]".format(attr, k), v, context)
> for k, v in list(content.items())}
> else:
> param_type = type(content)
> msg = (
> "Type '{param_type}' used for parameter '{attr}' is "
> "not supported for templating").format(**locals())
> raise AirflowException(msg)
> return result{code}
>  I propose that the method returns content unchanged if the content is of one 
> of (int, float, complex, bool) types. So my solution would include an extra 
> elif in the form:
> {code}
> elif isinstance(content, (int, float, complex, bool)):
> result = content
> {code}
>  Are there any reasons this would be a bad idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2508) Handle non string types in render_template_from_field

2018-12-06 Thread Galak (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711502#comment-16711502
 ] 

Galak commented on AIRFLOW-2508:


This issue is partially resolved by AIRFLOW-2415, but only for numeric types...
IMHO, it should be extended to any other type (date, datetime, UUID, even 
custom classes...)

I would be glad to work on this issue and to submit a PR.

Any objection ?

> Handle non string types in render_template_from_field
> -
>
> Key: AIRFLOW-2508
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2508
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Affects Versions: 2.0.0
>Reporter: Eugene Brown
>Assignee: Eugene Brown
>Priority: Minor
>  Labels: easyfix, newbie
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> The render_template_from_field method of the BaseOperator class raises an 
> exception when it encounters content that is not a string_type, list, tuple 
> or dict.
> Example exception:
> {noformat}
> airflow.exceptions.AirflowException: Type '' used for parameter 
> 'job_flow_overrides[Instances][InstanceGroups][InstanceCount]' is not 
> supported for templating{noformat}
> I propose instead that when it encounters content of other types it returns 
> the content unchanged, rather than raising an exception.
> Consider this case: I extended the EmrCreateJobFlowOperator to make the 
> job_flow_overrides argument a templatable field. job_flow_overrides is a 
> dictionary with a mix of strings, integers and booleans for values.
> When I extended the class as such:
> {code:java}
> class EmrCreateJobFlowOperatorTemplateOverrides(EmrCreateJobFlowOperator):
> template_fields = ['job_flow_overrides']{code}
> And added a task to my dag with this format:
> {code:java}
> step_create_cluster = EmrCreateJobFlowOperatorTemplateOverrides(
> task_id="create_cluster",
> job_flow_overrides={
> "Name": "my-cluster {{ dag_run.conf['run_date'] }}",
> "Instances": {
> "InstanceGroups": [
> {
> "Name": "Master nodes",
> "InstanceType": "c3.4xlarge",
> "InstanceCount": 1
> },
> {
> "Name": "Slave nodes",
> "InstanceType": "c3.4xlarge",
> "InstanceCount": 4
> },
> "TerminationProtected": False
> ]
> },
> "BootstrapActions": [{
>  "Name": "Custom action",
>  "ScriptBootstrapAction": {
>  "Path": "s3://repo/{{ dag_run.conf['branch'] 
> }}/requirements.txt"
>  }
> }],
>},
>aws_conn_id='aws_default',
>emr_conn_id='aws_default',
>dag=dag
> )
> {code}
> The exception I gave above was raised and the step failed. I think it would 
> be preferable for the method to instead pass over numeric and boolean values 
> as users may want to use template_fields in the way I have to template string 
> values in dictionaries or lists of mixed types.
> Here is the render_template_from_field method from the BaseOperator:
> {code:java}
> def render_template_from_field(self, attr, content, context, jinja_env):
> """
> Renders a template from a field. If the field is a string, it will
> simply render the string and return the result. If it is a collection or
> nested set of collections, it will traverse the structure and render
> all strings in it.
> """
> rt = self.render_template
> if isinstance(content, six.string_types):
> result = jinja_env.from_string(content).render(**context)
> elif isinstance(content, (list, tuple)):
> result = [rt(attr, e, context) for e in content]
> elif isinstance(content, dict):
> result = {
> k: rt("{}[{}]".format(attr, k), v, context)
> for k, v in list(content.items())}
> else:
> param_type = type(content)
> msg = (
> "Type '{param_type}' used for parameter '{attr}' is "
> "not supported for templating").format(**locals())
> raise AirflowException(msg)
> return result{code}
>  I propose that the method returns content unchanged if the content is of one 
> of (int, float, complex, bool) types. So my solution would include an extra 
> elif in the form:
> {code}
> elif isinstance(content, (int, float, complex, bool)):
> result = content
> {code}
>  Are there any reasons this would be a bad idea?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-18 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}
{{\{connectionpool.py\} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}
{{ \{connectionpool.py\} WARNING - Connection pool is full, discarding 
connection: my.api.example.org }}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximum. Each additional worker is stopped with the 
> following warning:
> {quote}
> {{\{connectionpool.py\} WARNING - Connection pool is full, discarding 
> connection: my.api.example.org}}
> {quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-18 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}
{{ \{connectionpool.py\} WARNING - Connection pool is full, discarding 
connection: my.api.example.org }}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximum. Each additional worker is stopped with the 
> following warning:
> {quote}
> {{ \{connectionpool.py\} WARNING - Connection pool is full, discarding 
> connection: my.api.example.org }}
> {quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-18 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximum. Each additional worker is stopped with the 
> following warning:
> {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
> connection: my.api.example.org}}{quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-18 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximum. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}
{{\{connectionpool.py\} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximum. Each additional worker is stopped with the 
> following warning:
> {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
> connection: my.api.example.org}}
> {quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-17 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}
{{\{connectionpool.py\} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}{{
Unknown macro: \{connectionpool.py}
WARNING - Connection pool is full, discarding connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximim. Each additional worker is stopped with the 
> following warning:
> {quote}
> {{\{connectionpool.py\} WARNING - Connection pool is full, discarding 
> connection: my.api.example.org}}
> {quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-17 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-2010:
---
Description: 
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Airflow Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}{{
Unknown macro: \{connectionpool.py}
WARNING - Connection pool is full, discarding connection: my.api.example.org}}
{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration

  was:
HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration


> Make HttpHook inner connection pool configurable
> 
>
> Key: AIRFLOW-2010
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: hooks
>Affects Versions: 1.8.0
>Reporter: Galak
>Priority: Major
>
> HttpHook is using request module to perform http/https calls. but it is 
> hidden inside implementation. Therefore, it is not possible to choose any 
> value for _pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. 
> (see [request module 
> documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])
> _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed 
> through Airflow Connection extra parameters ?
> As a consequence, calling a REST API concurrently (using 
> [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
>  is limited to 10 workers maximim. Each additional worker is stopped with the 
> following warning:
> {quote}{{
> Unknown macro: \{connectionpool.py}
> WARNING - Connection pool is full, discarding connection: my.api.example.org}}
> {quote}
> See [this question on 
> stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
>  about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (AIRFLOW-2010) Make HttpHook inner connection pool configurable

2018-01-17 Thread Galak (JIRA)
Galak created AIRFLOW-2010:
--

 Summary: Make HttpHook inner connection pool configurable
 Key: AIRFLOW-2010
 URL: https://issues.apache.org/jira/browse/AIRFLOW-2010
 Project: Apache Airflow
  Issue Type: Improvement
  Components: hooks
Affects Versions: 1.8.0
Reporter: Galak


HttpHook is using request module to perform http/https calls. but it is hidden 
inside implementation. Therefore, it is not possible to choose any value for 
_pool_connections_ or  _pool_maxsize_ parameters, defaulting to 10. (see 
[request module 
documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes])

_{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through 
Connection extra parameters ?

As a consequence, calling a REST API concurrently (using 
[ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor])
 is limited to 10 workers maximim. Each additional worker is stopped with the 
following warning:
{quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding 
connection: my.api.example.org}}{quote}
See [this question on 
stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con]
 about Http connexion pools configuration



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields

2017-11-14 Thread Galak (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252547#comment-16252547
 ] 

Galak commented on AIRFLOW-1814:


I discovered a blocker issue for this change while using the work-around class:
{code}
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

all non string arguments passed to this operator will make the task fail with 
an {{AirflowException}} raised by {{BaseOperator.render_template_from_field}}...

example:
{code}
value_consumer_task = MyPythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}",
3  # // << this argument makes the task fail with message: 
airflow.exceptions.AirflowException: Type '' used for parameter 
'op_args' is not supported for templating
],
dag=dag
)
{code}

Would it be possible for {{BaseOperator.render_template_from_field}} to log a 
warning and simply return the value itself when it is not a string (nor a 
collection, nor a dictionary)?

At this point, I should probably work on the change and send a pull request, 
isn't it? But I don't have a lot of free time, so I would like to be sure I'm 
not going in a wrong direction...



> Add op_args and op_kwargs in PythonOperator templated fields
> 
>
> Key: AIRFLOW-1814
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
> Project: Apache Airflow
>  Issue Type: Wish
>  Components: operators
>Affects Versions: Airflow 1.8, 1.8.0
>Reporter: Galak
>Priority: Minor
>
> *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters 
> could be templated.*
> I have 2 different use cases where this change could help a lot:
> +1/ Provide some job execution information as a python callable argument:+
> let's explain it through a simple example:
> {code}
> simple_task = PythonOperator(
> task_id='simple_task',
> provide_context=True,
> python_callable=extract_data,
> op_args=[
>   "my_db_connection_id"
>   "select * from my_table"
>   "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
> ],
> dag=dag
> )
> {code}
> "extract_data" python function seems to be simple here, but it could be 
> anything re-usable in multiple dags...
> +2/ Provide some XCom value as a python callable argument:+
> Let's say I a have a task which is retrieving or calculating a value, and 
> then storing it in an XCom for further use by other tasks:
> {code}
> value_producer_task = PythonOperator(
> task_id='value_producer_task',
> provide_context=True,
> python_callable=produce_value,
> op_args=[
>   "my_db_connection_id",
>   "some_other_static_parameter",
>   "my_xcom_key"
> ],
> dag=dag
> )
> {code}
> Then I can just configure a PythonCallable task to use the produced value:
> {code}
> value_consumer_task = PythonOperator(
> task_id='value_consumer_task',
> provide_context=True,
> python_callable=consume_value,
> op_args=[
>   "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
> ],
> dag=dag
> )
> {code}
> I quickly tried the following class:
> {code}
> from airflow.operators.python_operator import PythonOperator
> class MyPythonOperator(PythonOperator):
> template_fields = PythonOperator.template_fields + ('op_args', 
> 'op_kwargs')
> {code}
> and it worked like a charm.
> So could these 2 arguments be added to templated_fields? Or did I miss some 
> major drawback to this change?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields

2017-11-14 Thread Galak (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Galak updated AIRFLOW-1814:
---
Description: 
*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could 
be templated.*

I have 2 different use cases where this change could help a lot:

+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
task_id='simple_task',
provide_context=True,
python_callable=extract_data,
op_args=[
"my_db_connection_id"
"select * from my_table"
"/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
],
dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be 
anything re-usable in multiple dags...


+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then 
storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
task_id='value_producer_task',
provide_context=True,
python_callable=produce_value,
op_args=[
"my_db_connection_id",
"some_other_static_parameter",
"my_xcom_key"
],
dag=dag
)
{code}

Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
],
dag=dag
)
{code}


I quickly tried the following class:

{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

and it worked like a charm.

So could these 2 arguments be added to templated_fields? Or did I miss some 
major drawback to this change?


  was:
*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could 
be templated.*

I have 2 different use cases where this change could help a lot:

+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
task_id='simple_task',
provide_context=True,
python_callable=extract_data,
op_args=[
"my_db_connection_id"
"select * from my_table"
"/data/{dag.dag_id}/{ts}/my_export.csv"
],
dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be 
anything re-usable in multiple dags...


+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then 
storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
task_id='value_producer_task',
provide_context=True,
python_callable=produce_value,
op_args=[
"my_db_connection_id",
"some_other_static_parameter",
"my_xcom_key"
],
dag=dag
)
{code}

Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
],
dag=dag
)
{code}


I quickly tried the following class:

{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

and it worked like a charm.

So could these 2 arguments be added to templated_fields? Or did I miss some 
major drawback to this change?



> Add op_args and op_kwargs in PythonOperator templated fields
> 
>
> Key: AIRFLOW-1814
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
> Project: Apache Airflow
>  Issue Type: Wish
>  Components: operators
>Affects Versions: Airflow 1.8, 1.8.0
>Reporter: Galak
>Priority: Minor
>
> *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters 
> could be templated.*
> I have 2 different use cases where this change could help a lot:
> +1/ Provide some job execution information as a python callable argument:+
> let's explain it through a simple example:
> {code}
> simple_task = PythonOperator(
> task_id='simple_task',
> provide_context=True,
> python_callable=extract_data,
> op_args=[
>   "my_db_connection_id"
>   "select * from my_table"
>   "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv"
> ],
> dag=dag
> )
> {code}
> "extract_data" python function seems to be 

[jira] [Created] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields

2017-11-13 Thread Galak (JIRA)
Galak created AIRFLOW-1814:
--

 Summary: Add op_args and op_kwargs in PythonOperator templated 
fields
 Key: AIRFLOW-1814
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1814
 Project: Apache Airflow
  Issue Type: Wish
  Components: operators
Affects Versions: Airflow 1.8, 1.8.0
Reporter: Galak
Priority: Minor


*I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could 
be templated.*

I have 2 different use cases where this change could help a lot:

+1/ Provide some job execution information as a python callable argument:+
let's explain it through a simple example:
{code}
simple_task = PythonOperator(
task_id='simple_task',
provide_context=True,
python_callable=extract_data,
op_args=[
"my_db_connection_id"
"select * from my_table"
"/data/{dag.dag_id}/{ts}/my_export.csv"
],
dag=dag
)
{code}
"extract_data" python function seems to be simple here, but it could be 
anything re-usable in multiple dags...


+2/ Provide some XCom value as a python callable argument:+
Let's say I a have a task which is retrieving or calculating a value, and then 
storing it in an XCom for further use by other tasks:
{code}
value_producer_task = PythonOperator(
task_id='value_producer_task',
provide_context=True,
python_callable=produce_value,
op_args=[
"my_db_connection_id",
"some_other_static_parameter",
"my_xcom_key"
],
dag=dag
)
{code}

Then I can just configure a PythonCallable task to use the produced value:
{code}
value_consumer_task = PythonOperator(
task_id='value_consumer_task',
provide_context=True,
python_callable=consume_value,
op_args=[
"{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}"
],
dag=dag
)
{code}


I quickly tried the following class:

{code}
from airflow.operators.python_operator import PythonOperator
class MyPythonOperator(PythonOperator):
template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs')
{code}

and it worked like a charm.

So could these 2 arguments be added to templated_fields? Or did I miss some 
major drawback to this change?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)