[jira] [Assigned] (AIRFLOW-2508) Handle non string types in render_template_from_field
[ https://issues.apache.org/jira/browse/AIRFLOW-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak reassigned AIRFLOW-2508: -- Assignee: Galak (was: Eugene Brown) > Handle non string types in render_template_from_field > - > > Key: AIRFLOW-2508 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2508 > Project: Apache Airflow > Issue Type: Bug > Components: models >Affects Versions: 2.0.0 >Reporter: Eugene Brown >Assignee: Galak >Priority: Minor > Labels: easyfix, newbie > Original Estimate: 2h > Remaining Estimate: 2h > > The render_template_from_field method of the BaseOperator class raises an > exception when it encounters content that is not a string_type, list, tuple > or dict. > Example exception: > {noformat} > airflow.exceptions.AirflowException: Type '' used for parameter > 'job_flow_overrides[Instances][InstanceGroups][InstanceCount]' is not > supported for templating{noformat} > I propose instead that when it encounters content of other types it returns > the content unchanged, rather than raising an exception. > Consider this case: I extended the EmrCreateJobFlowOperator to make the > job_flow_overrides argument a templatable field. job_flow_overrides is a > dictionary with a mix of strings, integers and booleans for values. > When I extended the class as such: > {code:java} > class EmrCreateJobFlowOperatorTemplateOverrides(EmrCreateJobFlowOperator): > template_fields = ['job_flow_overrides']{code} > And added a task to my dag with this format: > {code:java} > step_create_cluster = EmrCreateJobFlowOperatorTemplateOverrides( > task_id="create_cluster", > job_flow_overrides={ > "Name": "my-cluster {{ dag_run.conf['run_date'] }}", > "Instances": { > "InstanceGroups": [ > { > "Name": "Master nodes", > "InstanceType": "c3.4xlarge", > "InstanceCount": 1 > }, > { > "Name": "Slave nodes", > "InstanceType": "c3.4xlarge", > "InstanceCount": 4 > }, > "TerminationProtected": False > ] > }, > "BootstrapActions": [{ > "Name": "Custom action", > "ScriptBootstrapAction": { > "Path": "s3://repo/{{ dag_run.conf['branch'] > }}/requirements.txt" > } > }], >}, >aws_conn_id='aws_default', >emr_conn_id='aws_default', >dag=dag > ) > {code} > The exception I gave above was raised and the step failed. I think it would > be preferable for the method to instead pass over numeric and boolean values > as users may want to use template_fields in the way I have to template string > values in dictionaries or lists of mixed types. > Here is the render_template_from_field method from the BaseOperator: > {code:java} > def render_template_from_field(self, attr, content, context, jinja_env): > """ > Renders a template from a field. If the field is a string, it will > simply render the string and return the result. If it is a collection or > nested set of collections, it will traverse the structure and render > all strings in it. > """ > rt = self.render_template > if isinstance(content, six.string_types): > result = jinja_env.from_string(content).render(**context) > elif isinstance(content, (list, tuple)): > result = [rt(attr, e, context) for e in content] > elif isinstance(content, dict): > result = { > k: rt("{}[{}]".format(attr, k), v, context) > for k, v in list(content.items())} > else: > param_type = type(content) > msg = ( > "Type '{param_type}' used for parameter '{attr}' is " > "not supported for templating").format(**locals()) > raise AirflowException(msg) > return result{code} > I propose that the method returns content unchanged if the content is of one > of (int, float, complex, bool) types. So my solution would include an extra > elif in the form: > {code} > elif isinstance(content, (int, float, complex, bool)): > result = content > {code} > Are there any reasons this would be a bad idea? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2508) Handle non string types in render_template_from_field
[ https://issues.apache.org/jira/browse/AIRFLOW-2508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16711502#comment-16711502 ] Galak commented on AIRFLOW-2508: This issue is partially resolved by AIRFLOW-2415, but only for numeric types... IMHO, it should be extended to any other type (date, datetime, UUID, even custom classes...) I would be glad to work on this issue and to submit a PR. Any objection ? > Handle non string types in render_template_from_field > - > > Key: AIRFLOW-2508 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2508 > Project: Apache Airflow > Issue Type: Bug > Components: models >Affects Versions: 2.0.0 >Reporter: Eugene Brown >Assignee: Eugene Brown >Priority: Minor > Labels: easyfix, newbie > Original Estimate: 2h > Remaining Estimate: 2h > > The render_template_from_field method of the BaseOperator class raises an > exception when it encounters content that is not a string_type, list, tuple > or dict. > Example exception: > {noformat} > airflow.exceptions.AirflowException: Type '' used for parameter > 'job_flow_overrides[Instances][InstanceGroups][InstanceCount]' is not > supported for templating{noformat} > I propose instead that when it encounters content of other types it returns > the content unchanged, rather than raising an exception. > Consider this case: I extended the EmrCreateJobFlowOperator to make the > job_flow_overrides argument a templatable field. job_flow_overrides is a > dictionary with a mix of strings, integers and booleans for values. > When I extended the class as such: > {code:java} > class EmrCreateJobFlowOperatorTemplateOverrides(EmrCreateJobFlowOperator): > template_fields = ['job_flow_overrides']{code} > And added a task to my dag with this format: > {code:java} > step_create_cluster = EmrCreateJobFlowOperatorTemplateOverrides( > task_id="create_cluster", > job_flow_overrides={ > "Name": "my-cluster {{ dag_run.conf['run_date'] }}", > "Instances": { > "InstanceGroups": [ > { > "Name": "Master nodes", > "InstanceType": "c3.4xlarge", > "InstanceCount": 1 > }, > { > "Name": "Slave nodes", > "InstanceType": "c3.4xlarge", > "InstanceCount": 4 > }, > "TerminationProtected": False > ] > }, > "BootstrapActions": [{ > "Name": "Custom action", > "ScriptBootstrapAction": { > "Path": "s3://repo/{{ dag_run.conf['branch'] > }}/requirements.txt" > } > }], >}, >aws_conn_id='aws_default', >emr_conn_id='aws_default', >dag=dag > ) > {code} > The exception I gave above was raised and the step failed. I think it would > be preferable for the method to instead pass over numeric and boolean values > as users may want to use template_fields in the way I have to template string > values in dictionaries or lists of mixed types. > Here is the render_template_from_field method from the BaseOperator: > {code:java} > def render_template_from_field(self, attr, content, context, jinja_env): > """ > Renders a template from a field. If the field is a string, it will > simply render the string and return the result. If it is a collection or > nested set of collections, it will traverse the structure and render > all strings in it. > """ > rt = self.render_template > if isinstance(content, six.string_types): > result = jinja_env.from_string(content).render(**context) > elif isinstance(content, (list, tuple)): > result = [rt(attr, e, context) for e in content] > elif isinstance(content, dict): > result = { > k: rt("{}[{}]".format(attr, k), v, context) > for k, v in list(content.items())} > else: > param_type = type(content) > msg = ( > "Type '{param_type}' used for parameter '{attr}' is " > "not supported for templating").format(**locals()) > raise AirflowException(msg) > return result{code} > I propose that the method returns content unchanged if the content is of one > of (int, float, complex, bool) types. So my solution would include an extra > elif in the form: > {code} > elif isinstance(content, (int, float, complex, bool)): > result = content > {code} > Are there any reasons this would be a bad idea? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote} {{\{connectionpool.py\} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote} {{ \{connectionpool.py\} WARNING - Connection pool is full, discarding connection: my.api.example.org }} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximum. Each additional worker is stopped with the > following warning: > {quote} > {{\{connectionpool.py\} WARNING - Connection pool is full, discarding > connection: my.api.example.org}} > {quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote} {{ \{connectionpool.py\} WARNING - Connection pool is full, discarding connection: my.api.example.org }} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}}{quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximum. Each additional worker is stopped with the > following warning: > {quote} > {{ \{connectionpool.py\} WARNING - Connection pool is full, discarding > connection: my.api.example.org }} > {quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}}{quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximum. Each additional worker is stopped with the > following warning: > {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding > connection: my.api.example.org}}{quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximum. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote} {{\{connectionpool.py\} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximum. Each additional worker is stopped with the > following warning: > {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding > connection: my.api.example.org}} > {quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote} {{\{connectionpool.py\} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote}{{ Unknown macro: \{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximim. Each additional worker is stopped with the > following warning: > {quote} > {{\{connectionpool.py\} WARNING - Connection pool is full, discarding > connection: my.api.example.org}} > {quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
[ https://issues.apache.org/jira/browse/AIRFLOW-2010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-2010: --- Description: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Airflow Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote}{{ Unknown macro: \{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}} {quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration was: HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}}{quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration > Make HttpHook inner connection pool configurable > > > Key: AIRFLOW-2010 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Affects Versions: 1.8.0 >Reporter: Galak >Priority: Major > > HttpHook is using request module to perform http/https calls. but it is > hidden inside implementation. Therefore, it is not possible to choose any > value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. > (see [request module > documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) > _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed > through Airflow Connection extra parameters ? > As a consequence, calling a REST API concurrently (using > [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) > is limited to 10 workers maximim. Each additional worker is stopped with the > following warning: > {quote}{{ > Unknown macro: \{connectionpool.py} > WARNING - Connection pool is full, discarding connection: my.api.example.org}} > {quote} > See [this question on > stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] > about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2010) Make HttpHook inner connection pool configurable
Galak created AIRFLOW-2010: -- Summary: Make HttpHook inner connection pool configurable Key: AIRFLOW-2010 URL: https://issues.apache.org/jira/browse/AIRFLOW-2010 Project: Apache Airflow Issue Type: Improvement Components: hooks Affects Versions: 1.8.0 Reporter: Galak HttpHook is using request module to perform http/https calls. but it is hidden inside implementation. Therefore, it is not possible to choose any value for _pool_connections_ or _pool_maxsize_ parameters, defaulting to 10. (see [request module documentation|http://docs.python-requests.org/en/latest/api/#lower-lower-level-classes]) _{{requests.adapters.HTTPAdapter}}_ parameters could probably be passed through Connection extra parameters ? As a consequence, calling a REST API concurrently (using [ThreadPoolExecutor|https://docs.python.org/3/library/concurrent.futures.html#threadpoolexecutor]) is limited to 10 workers maximim. Each additional worker is stopped with the following warning: {quote}{{{connectionpool.py} WARNING - Connection pool is full, discarding connection: my.api.example.org}}{quote} See [this question on stackoverflow|https://stackoverflow.com/questions/23632794/in-requests-library-how-can-i-avoid-httpconnectionpool-is-full-discarding-con] about Http connexion pools configuration -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields
[ https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252547#comment-16252547 ] Galak commented on AIRFLOW-1814: I discovered a blocker issue for this change while using the work-around class: {code} class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} all non string arguments passed to this operator will make the task fail with an {{AirflowException}} raised by {{BaseOperator.render_template_from_field}}... example: {code} value_consumer_task = MyPythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}", 3 # // << this argument makes the task fail with message: airflow.exceptions.AirflowException: Type '' used for parameter 'op_args' is not supported for templating ], dag=dag ) {code} Would it be possible for {{BaseOperator.render_template_from_field}} to log a warning and simply return the value itself when it is not a string (nor a collection, nor a dictionary)? At this point, I should probably work on the change and send a pull request, isn't it? But I don't have a lot of free time, so I would like to be sure I'm not going in a wrong direction... > Add op_args and op_kwargs in PythonOperator templated fields > > > Key: AIRFLOW-1814 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1814 > Project: Apache Airflow > Issue Type: Wish > Components: operators >Affects Versions: Airflow 1.8, 1.8.0 >Reporter: Galak >Priority: Minor > > *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters > could be templated.* > I have 2 different use cases where this change could help a lot: > +1/ Provide some job execution information as a python callable argument:+ > let's explain it through a simple example: > {code} > simple_task = PythonOperator( > task_id='simple_task', > provide_context=True, > python_callable=extract_data, > op_args=[ > "my_db_connection_id" > "select * from my_table" > "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv" > ], > dag=dag > ) > {code} > "extract_data" python function seems to be simple here, but it could be > anything re-usable in multiple dags... > +2/ Provide some XCom value as a python callable argument:+ > Let's say I a have a task which is retrieving or calculating a value, and > then storing it in an XCom for further use by other tasks: > {code} > value_producer_task = PythonOperator( > task_id='value_producer_task', > provide_context=True, > python_callable=produce_value, > op_args=[ > "my_db_connection_id", > "some_other_static_parameter", > "my_xcom_key" > ], > dag=dag > ) > {code} > Then I can just configure a PythonCallable task to use the produced value: > {code} > value_consumer_task = PythonOperator( > task_id='value_consumer_task', > provide_context=True, > python_callable=consume_value, > op_args=[ > "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" > ], > dag=dag > ) > {code} > I quickly tried the following class: > {code} > from airflow.operators.python_operator import PythonOperator > class MyPythonOperator(PythonOperator): > template_fields = PythonOperator.template_fields + ('op_args', > 'op_kwargs') > {code} > and it worked like a charm. > So could these 2 arguments be added to templated_fields? Or did I miss some > major drawback to this change? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields
[ https://issues.apache.org/jira/browse/AIRFLOW-1814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Galak updated AIRFLOW-1814: --- Description: *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.* I have 2 different use cases where this change could help a lot: +1/ Provide some job execution information as a python callable argument:+ let's explain it through a simple example: {code} simple_task = PythonOperator( task_id='simple_task', provide_context=True, python_callable=extract_data, op_args=[ "my_db_connection_id" "select * from my_table" "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv" ], dag=dag ) {code} "extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags... +2/ Provide some XCom value as a python callable argument:+ Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks: {code} value_producer_task = PythonOperator( task_id='value_producer_task', provide_context=True, python_callable=produce_value, op_args=[ "my_db_connection_id", "some_other_static_parameter", "my_xcom_key" ], dag=dag ) {code} Then I can just configure a PythonCallable task to use the produced value: {code} value_consumer_task = PythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" ], dag=dag ) {code} I quickly tried the following class: {code} from airflow.operators.python_operator import PythonOperator class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} and it worked like a charm. So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change? was: *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.* I have 2 different use cases where this change could help a lot: +1/ Provide some job execution information as a python callable argument:+ let's explain it through a simple example: {code} simple_task = PythonOperator( task_id='simple_task', provide_context=True, python_callable=extract_data, op_args=[ "my_db_connection_id" "select * from my_table" "/data/{dag.dag_id}/{ts}/my_export.csv" ], dag=dag ) {code} "extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags... +2/ Provide some XCom value as a python callable argument:+ Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks: {code} value_producer_task = PythonOperator( task_id='value_producer_task', provide_context=True, python_callable=produce_value, op_args=[ "my_db_connection_id", "some_other_static_parameter", "my_xcom_key" ], dag=dag ) {code} Then I can just configure a PythonCallable task to use the produced value: {code} value_consumer_task = PythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" ], dag=dag ) {code} I quickly tried the following class: {code} from airflow.operators.python_operator import PythonOperator class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} and it worked like a charm. So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change? > Add op_args and op_kwargs in PythonOperator templated fields > > > Key: AIRFLOW-1814 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1814 > Project: Apache Airflow > Issue Type: Wish > Components: operators >Affects Versions: Airflow 1.8, 1.8.0 >Reporter: Galak >Priority: Minor > > *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters > could be templated.* > I have 2 different use cases where this change could help a lot: > +1/ Provide some job execution information as a python callable argument:+ > let's explain it through a simple example: > {code} > simple_task = PythonOperator( > task_id='simple_task', > provide_context=True, > python_callable=extract_data, > op_args=[ > "my_db_connection_id" > "select * from my_table" > "/data/{{ dag.dag_id }}/{{ ts }}/my_export.csv" > ], > dag=dag > ) > {code} > "extract_data" python function seems to be
[jira] [Created] (AIRFLOW-1814) Add op_args and op_kwargs in PythonOperator templated fields
Galak created AIRFLOW-1814: -- Summary: Add op_args and op_kwargs in PythonOperator templated fields Key: AIRFLOW-1814 URL: https://issues.apache.org/jira/browse/AIRFLOW-1814 Project: Apache Airflow Issue Type: Wish Components: operators Affects Versions: Airflow 1.8, 1.8.0 Reporter: Galak Priority: Minor *I'm wondering if "_op_args_" and "_op_kwargs_" PythonOperator parameters could be templated.* I have 2 different use cases where this change could help a lot: +1/ Provide some job execution information as a python callable argument:+ let's explain it through a simple example: {code} simple_task = PythonOperator( task_id='simple_task', provide_context=True, python_callable=extract_data, op_args=[ "my_db_connection_id" "select * from my_table" "/data/{dag.dag_id}/{ts}/my_export.csv" ], dag=dag ) {code} "extract_data" python function seems to be simple here, but it could be anything re-usable in multiple dags... +2/ Provide some XCom value as a python callable argument:+ Let's say I a have a task which is retrieving or calculating a value, and then storing it in an XCom for further use by other tasks: {code} value_producer_task = PythonOperator( task_id='value_producer_task', provide_context=True, python_callable=produce_value, op_args=[ "my_db_connection_id", "some_other_static_parameter", "my_xcom_key" ], dag=dag ) {code} Then I can just configure a PythonCallable task to use the produced value: {code} value_consumer_task = PythonOperator( task_id='value_consumer_task', provide_context=True, python_callable=consume_value, op_args=[ "{{ task_instance.xcom_pull(task_ids=None, key='my_xcom_key') }}" ], dag=dag ) {code} I quickly tried the following class: {code} from airflow.operators.python_operator import PythonOperator class MyPythonOperator(PythonOperator): template_fields = PythonOperator.template_fields + ('op_args', 'op_kwargs') {code} and it worked like a charm. So could these 2 arguments be added to templated_fields? Or did I miss some major drawback to this change? -- This message was sent by Atlassian JIRA (v6.4.14#64029)