[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379197#comment-15379197 ] ASF GitHub Bot commented on FLINK-4017: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2115 > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > Fix For: 1.1.0 > > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15379186#comment-15379186 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 merging > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362623#comment-15362623 ] ASF GitHub Bot commented on FLINK-4017: --- Github user GEOFBOT commented on the issue: https://github.com/apache/flink/pull/2115 I've addressed the documentation issue, thanks. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362316#comment-15362316 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Found a small issue in the documentation, otherwise +1 to merge. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362300#comment-15362300 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r69535980 --- Diff: docs/apis/batch/dataset_transformations.md --- @@ -1010,7 +1013,10 @@ val output = input.aggregate(SUM, 0).and(MIN, 2) ~~~python -Not supported. +from flink.functions.Aggregation import Sum, Min + +input = # [...] +output = input.aggregate(Sum, 0).agg_and(Min, 2) --- End diff -- documentation wasn't updated for the rename to `and_agg`. The same applies to the `python.md` > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359054#comment-15359054 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 I'll have to try it out to be sure, but i can't a problem looking through the code. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15359045#comment-15359045 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Please write a comment when you update the PR, we don't get any notifications for pushed commits :) > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342677#comment-15342677 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Looks good overall. I found 1 big issue, the rest are minor things. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342669#comment-15342669 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r67950293 --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py --- @@ -192,6 +193,30 @@ def reduce(self, operator): self._env._sets.append(child) return child_set +def aggregate(self, aggregation, field): +""" +Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. +:param aggregation: The built-in aggregation function to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +child_set = self.reduce_group(aggregation(field), combinable=True) +child_set._info.name = "PythonAggregate" +return child_set + +def agg_and(self, aggregation, field): --- End diff -- i would name this method and_agg. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342550#comment-15342550 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r67940988 --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py --- @@ -192,6 +193,30 @@ def reduce(self, operator): self._env._sets.append(child) return child_set +def aggregate(self, aggregation, field): +""" +Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. +:param aggregation: The built-in aggregation function to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +child_set = self.reduce_group(aggregation(field), combinable=True) +child_set._info.name = "PythonAggregate" +return child_set + +def agg_and(self, aggregation, field): +""" +Applies an additional Aggregate transformation. +:param aggregation: The built-in aggregation operation to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +if self._info.name == "PythonAggregate": --- End diff -- this is a bit icky. it would be better to create a separate AggregateOperator class that inherits from DataSet and contains the agg_and method. (similar to how cross/join works) > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342552#comment-15342552 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r67941010 --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py --- @@ -192,6 +193,30 @@ def reduce(self, operator): self._env._sets.append(child) return child_set +def aggregate(self, aggregation, field): +""" +Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. +:param aggregation: The built-in aggregation function to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +child_set = self.reduce_group(aggregation(field), combinable=True) +child_set._info.name = "PythonAggregate" --- End diff -- Let's include the aggregation type in the name > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342547#comment-15342547 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r67940565 --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py --- @@ -192,6 +193,30 @@ def reduce(self, operator): self._env._sets.append(child) return child_set +def aggregate(self, aggregation, field): +""" +Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. +:param aggregation: The built-in aggregation function to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +child_set = self.reduce_group(aggregation(field), combinable=True) --- End diff -- there should also be a test case for a non-grouped aggregation. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342543#comment-15342543 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2115#discussion_r67940433 --- Diff: flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py --- @@ -192,6 +193,30 @@ def reduce(self, operator): self._env._sets.append(child) return child_set +def aggregate(self, aggregation, field): +""" +Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. +:param aggregation: The built-in aggregation function to apply on the DataSet. +:param field: The index of the Tuple field on which to perform the function. +:return: A GroupReduceOperator that represents the aggregated DataSet. +""" +child_set = self.reduce_group(aggregation(field), combinable=True) --- End diff -- self.reduce_group(AggregationFunction(aggregation, field), combinable=True) > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15340302#comment-15340302 ] ASF GitHub Bot commented on FLINK-4017: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2115 Thank you for your contribution, I will review this tomorrow :) > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15333746#comment-15333746 ] ASF GitHub Bot commented on FLINK-4017: --- GitHub user GEOFBOT opened a pull request: https://github.com/apache/flink/pull/2115 [FLINK-4017] [py] Add Aggregation support to Python API Adds Aggregation support in the Python API accessible through `.aggregate()` and `.agg_and()`. (I was unable to use `.and()` in Python because 'and' is a keyword.) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GEOFBOT/flink FLINK-4017 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2115 commit f3836f1e3919ca450baa3f633d9ece538008b106 Author: Geoffrey MonDate: 2016-06-02T16:10:59Z [FLINK-4017] [py] Add Aggregation support to Python API Assembles and applies a GroupReduceFunction using pre-defined AggregationOperations. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318713#comment-15318713 ] Geoffrey Mon commented on FLINK-4017: - Thanks for the advice. The issue that was hindering progress was that the type of the data was rejected by the Aggregation functions, and I wasn't sure how to deal with that. I assumed that the nature of Python lambdas had to do with it after poking around with a debugger. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318695#comment-15318695 ] Chesnay Schepler commented on FLINK-4017: - when using the python API data always exists as a byte array on the java side. you will either have to * implement the aggregation on the python side * apply a deserializationMap, groupBy, aggregate, and serializationMap on the java side. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318696#comment-15318696 ] Chesnay Schepler commented on FLINK-4017: - what other issues did you come across? > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4017) [py] Add Aggregation support to Python API
[ https://issues.apache.org/jira/browse/FLINK-4017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15318676#comment-15318676 ] Geoffrey Mon commented on FLINK-4017: - At least one of the issues I have come across is that aggregations are picky about the types of the data they are operating on, which is not explicit for Python lambdas. For example, in the WordCount example for Python, the return type of the lambda used in flat_map is assumed by Flink to be raw bytes, which are not Aggregator-able. > [py] Add Aggregation support to Python API > -- > > Key: FLINK-4017 > URL: https://issues.apache.org/jira/browse/FLINK-4017 > Project: Flink > Issue Type: Improvement > Components: Python API >Reporter: Geoffrey Mon >Priority: Minor > > Aggregations are not currently supported in the Python API. > I was getting started with setting up and working with Flink and figured this > would be a relatively simple task for me to get started with. Currently > working on this at https://github.com/geofbot/flink -- This message was sent by Atlassian JIRA (v6.3.4#6332)