Repository: flink Updated Branches: refs/heads/master a0c3b879b -> 2218cb4c1
[FLINK-4017] [py] Add Aggregation support to Python API Assembles and applies a GroupReduceFunction using pre-defined AggregationOperations in Python. References to aggregations in PythonOperationInfo and other Java classes in the Python API removed since aggregations are now handled by Python. This closes #2115 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2218cb4c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2218cb4c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2218cb4c Branch: refs/heads/master Commit: 2218cb4c1f21479704ff51b1841d8f1ffed67add Parents: a0c3b87 Author: Geoffrey Mon <geof...@gmail.com> Authored: Thu Jun 2 12:10:59 2016 -0400 Committer: zentol <ches...@apache.org> Committed: Fri Jul 15 12:21:57 2016 +0200 ---------------------------------------------------------------------- docs/apis/batch/dataset_transformations.md | 10 +- docs/apis/batch/python.md | 16 +++ .../flink/python/api/PythonOperationInfo.java | 37 ------ .../flink/python/api/PythonPlanBinder.java | 17 +-- .../python/api/flink/functions/Aggregation.py | 100 ++++++++++++++++ .../flink/python/api/flink/plan/DataSet.py | 120 +++++++++++++++++-- .../org/apache/flink/python/api/test_main2.py | 19 +++ 7 files changed, 254 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/docs/apis/batch/dataset_transformations.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/dataset_transformations.md b/docs/apis/batch/dataset_transformations.md index 9be9bc0..fe85b31 100644 --- a/docs/apis/batch/dataset_transformations.md +++ b/docs/apis/batch/dataset_transformations.md @@ -901,7 +901,10 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2) <div data-lang="python" markdown="1"> ~~~python -Not supported. +from flink.functions.Aggregation import Sum, Min + +input = # [...] +output = input.group_by(1).aggregate(Sum, 0).and_agg(Min, 2) ~~~ </div> @@ -1074,7 +1077,10 @@ val output = input.aggregate(SUM, 0).and(MIN, 2) <div data-lang="python" markdown="1"> ~~~python -Not supported. +from flink.functions.Aggregation import Sum, Min + +input = # [...] +output = input.aggregate(Sum, 0).and_agg(Min, 2) ~~~ </div> http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/docs/apis/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md index 9158dfb..b5e81c5 100644 --- a/docs/apis/batch/python.md +++ b/docs/apis/batch/python.md @@ -265,6 +265,22 @@ data.reduce_group(Adder()) </td> </tr> + <tr> + <td><strong>Aggregate</strong></td> + <td> + <p>Performs a built-in operation (sum, min, max) on one field of all the Tuples in a + data set or in each group of a data set. Aggregation can be applied on a full dataset + or on a grouped data set.</p> +{% highlight python %} +# This code finds the sum of all of the values in the first field and the maximum of all of the values in the second field +data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1) + +# min(), max(), and sum() syntactic sugar functions are also available +data.sum(0).and_agg(Aggregation.Max, 1) +{% endhighlight %} + </td> + </tr> + </tr> <td><strong>Join</strong></td> <td> http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 89aad22..5f3f9f1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -16,7 +16,6 @@ import java.io.IOException; import java.util.Arrays; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.aggregation.Aggregations; import org.apache.flink.api.java.tuple.Tuple; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; import org.apache.flink.core.fs.FileSystem.WriteMode; @@ -31,7 +30,6 @@ public class PythonOperationInfo { public String[] keys1; //join/cogroup keys public String[] keys2; //join/cogroup keys public TypeInformation<?> types; //typeinformation about output type - public AggregationEntry[] aggregates; public Object[] values; public int count; public String field; @@ -94,15 +92,6 @@ public class PythonOperationInfo { values[x] = streamer.getRecord(); } parallelism = (Integer) streamer.getRecord(true); - - /* - aggregates = new AggregationEntry[count]; - for (int x = 0; x < count; x++) { - int encodedAgg = (Integer) streamer.getRecord(true); - int field = (Integer) streamer.getRecord(true); - aggregates[x] = new AggregationEntry(encodedAgg, field); - } - */ } @Override @@ -116,7 +105,6 @@ public class PythonOperationInfo { sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n"); sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n"); sb.append("Keys: ").append(Arrays.toString(keys)).append("\n"); - sb.append("Aggregates: ").append(Arrays.toString(aggregates)).append("\n"); sb.append("Count: ").append(count).append("\n"); sb.append("Field: ").append(field).append("\n"); sb.append("Order: ").append(order.toString()).append("\n"); @@ -130,31 +118,6 @@ public class PythonOperationInfo { return sb.toString(); } - public static class AggregationEntry { - public Aggregations agg; - public int field; - - public AggregationEntry(int encodedAgg, int field) { - switch (encodedAgg) { - case 0: - agg = Aggregations.MAX; - break; - case 1: - agg = Aggregations.MIN; - break; - case 2: - agg = Aggregations.SUM; - break; - } - this.field = field; - } - - @Override - public String toString() { - return agg.toString() + " - " + field; - } - } - public enum DatasizeHint { NONE, TINY, http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 0c1781a..a6cbfa8 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -27,7 +27,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TupleCsvInputFormat; -import org.apache.flink.api.java.operators.AggregateOperator; import org.apache.flink.api.java.operators.CoGroupRawOperator; import org.apache.flink.api.java.operators.CrossOperator.DefaultCross; import org.apache.flink.api.java.operators.Grouping; @@ -274,7 +273,7 @@ public class PythonPlanBinder { */ protected enum Operation { SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, SINK_TEXT, SINK_PRINT, - SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE, + SORT, UNION, FIRST, DISTINCT, GROUPBY, REBALANCE, PARTITION_HASH, BROADCAST, COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION @@ -315,9 +314,6 @@ public class PythonPlanBinder { case BROADCAST: createBroadcastVariable(info); break; - case AGGREGATE: - createAggregationOperation(info); - break; case DISTINCT: createDistinctOperation(info); break; @@ -453,17 +449,6 @@ public class PythonPlanBinder { op1.withParameters(c); } - private void createAggregationOperation(PythonOperationInfo info) throws IOException { - DataSet op = (DataSet) sets.get(info.parentID); - AggregateOperator ao = op.aggregate(info.aggregates[0].agg, info.aggregates[0].field); - - for (int x = 1; x < info.count; x++) { - ao = ao.and(info.aggregates[x].agg, info.aggregates[x].field); - } - - sets.put(info.setID, ao.setParallelism(getParallelism(info)).name("Aggregation")); - } - @SuppressWarnings("unchecked") private void createDistinctOperation(PythonOperationInfo info) throws IOException { DataSet op = (DataSet) sets.get(info.parentID); http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py new file mode 100644 index 0000000..0ff1771 --- /dev/null +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py @@ -0,0 +1,100 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.functions.GroupReduceFunction import GroupReduceFunction + +class AggregationFunction(GroupReduceFunction): + def __init__(self, aggregation, field): + super(AggregationFunction, self).__init__() + self.aggregations = [aggregation(field)] + + def add_aggregation(self, aggregation, field): + """ + Add an additional aggregation operator + :param aggregation: Built-in aggregation operator to apply + :param field: Field on which to apply the specified aggregation + """ + self.aggregations.append(aggregation(field)) + + def reduce(self, iterator, collector): + # Reset each aggregator + for aggregator in self.aggregations: + aggregator.initialize_aggregation() + + # Tuple that will be filled in with aggregated values + item = None + + # Run each value through the aggregator + for x in iterator: + if item is None: + # Get first value that will be filled in + item = list(x) + + for aggregator in self.aggregations: + aggregator.aggregate(x[aggregator.field]) + + # Get results + for aggregator in self.aggregations: + item[aggregator.field] = aggregator.get_aggregate() + + collector.collect(tuple(item)) + + +class AggregationOperator(object): + def __init__(self, field): + self.field = field + + def initialize_aggregation(self): + """Set up or reset the aggregator operator.""" + self.agg = None + + def aggregate(self, value): + """Incorporate a value into the aggregation.""" + pass + + def get_aggregate(self): + """Return the result of the aggregation.""" + return self.agg + + +class Sum(AggregationOperator): + def initialize_aggregation(self): + self.agg = 0 + + def aggregate(self, value): + self.agg += value + + +class Min(AggregationOperator): + def aggregate(self, value): + if self.agg != None: + if value < self.agg: + self.agg = value + else: + self.agg = value + + +class Max(AggregationOperator): + def aggregate(self, value): + if self.agg != None: + if value > self.agg: + self.agg = value + else: + self.agg = value + + + http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index fa83259..caa4ae7 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -18,6 +18,7 @@ import collections import types as TYPES +from flink.functions.Aggregation import AggregationFunction, Min, Max, Sum from flink.plan.Constants import _Identifier, WriteMode, _createKeyValueTypeInfo, _createArrayTypeInfo from flink.plan.OperationInfo import OperationInfo from flink.functions.CoGroupFunction import CoGroupFunction @@ -152,20 +153,24 @@ class DataSet(object): :param operator: The GroupReduceFunction that is applied on the DataSet. :return:A GroupReduceOperator that represents the reduced DataSet. """ + child = self._reduce_group(operator, combinable) + child_set = OperatorSet(self._env, child) + self._info.children.append(child) + self._env._sets.append(child) + return child_set + + def _reduce_group(self, operator, combinable=False): if isinstance(operator, TYPES.FunctionType): f = operator operator = GroupReduceFunction() operator.reduce = f child = OperationInfo() - child_set = OperatorSet(self._env, child) child.identifier = _Identifier.GROUPREDUCE child.parent = self._info child.operator = operator child.types = _createArrayTypeInfo() child.name = "PythonGroupReduce" - self._info.children.append(child) - self._env._sets.append(child) - return child_set + return child def reduce(self, operator): """ @@ -192,6 +197,44 @@ class DataSet(object): self._env._sets.append(child) return child_set + def aggregate(self, aggregation, field): + """ + Applies an Aggregate transformation (using a GroupReduceFunction) on a non-grouped Tuple DataSet. + :param aggregation: The built-in aggregation function to apply on the DataSet. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated DataSet. + """ + child = self._reduce_group(AggregationFunction(aggregation, field), combinable=True) + child.name = "PythonAggregate" + aggregation.__name__ # include aggregation type in name + child_set = AggregateOperator(self._env, child) + self._info.children.append(child) + self._env._sets.append(child) + return child_set + + def min(self, field): + """ + Syntactic sugar for the minimum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated DataSet. + """ + return self.aggregate(Min, field) + + def max(self, field): + """ + Syntactic sugar for the maximum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated DataSet. + """ + return self.aggregate(Max, field) + + def sum(self, field): + """ + Syntactic sugar for the sum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated DataSet. + """ + return self.aggregate(Sum, field) + def project(self, *fields): """ Applies a Project transformation on a Tuple DataSet. @@ -671,24 +714,28 @@ class Grouping(object): :param operator: The GroupReduceFunction that is applied on the DataSet. :return:A GroupReduceOperator that represents the reduced DataSet. """ + child = self._reduce_group(operator, combinable) + child_set = OperatorSet(self._env, child) + self._info.parallelism = child.parallelism + self._info.children.append(child) + self._env._sets.append(child) + + return child_set + + def _reduce_group(self, operator, combinable=False): self._finalize() if isinstance(operator, TYPES.FunctionType): f = operator operator = GroupReduceFunction() operator.reduce = f child = OperationInfo() - child_set = OperatorSet(self._env, child) child.identifier = _Identifier.GROUPREDUCE child.parent = self._info child.operator = operator child.types = _createArrayTypeInfo() child.name = "PythonGroupReduce" child.key1 = self._child_chain[0].keys - self._info.parallelism = child.parallelism - self._info.children.append(child) - self._env._sets.append(child) - - return child_set + return child def sort_group(self, field, order): """ @@ -746,6 +793,44 @@ class UnsortedGrouping(Grouping): return child_set + def aggregate(self, aggregation, field): + """ + Applies an Aggregate transformation (using a GroupReduceFunction) on a Tuple UnsortedGrouping. + :param aggregation: The built-in aggregation function to apply on the UnsortedGrouping. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated UnsortedGrouping. + """ + child = self._reduce_group(AggregationFunction(aggregation, field), combinable=True) + child.name = "PythonAggregate" + aggregation.__name__ # include aggregation type in name + child_set = AggregateOperator(self._env, child) + self._env._sets.append(child) + self._info.children.append(child) + return child_set + + def min(self, field): + """ + Syntactic sugar for the minimum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated UnsortedGrouping. + """ + return self.aggregate(Min, field) + + def max(self, field): + """ + Syntactic sugar for the maximum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated UnsortedGrouping. + """ + return self.aggregate(Max, field) + + def sum(self, field): + """ + Syntactic sugar for the sum aggregation. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated UnsortedGrouping. + """ + return self.aggregate(Sum, field) + def _finalize(self): grouping = self._child_chain[0] keys = grouping.keys @@ -1108,3 +1193,18 @@ class CrossOperator(DataSet, Projectable): self._info.name = "PythonCross" self._info.uses_udf = True return OperatorSet(self._env, self._info) + + +class AggregateOperator(OperatorSet): + def __init__(self, env, info): + super(AggregateOperator, self).__init__(env, info) + + def and_agg(self, aggregation, field): + """ + Applies an additional Aggregate transformation. + :param aggregation: The built-in aggregation operation to apply on the DataSet. + :param field: The index of the Tuple field on which to perform the function. + :return: An AggregateOperator that represents the aggregated DataSet. + """ + self._info.operator.add_aggregation(aggregation, field) + return self http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 2ea6f91..ceb26d0 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -21,6 +21,7 @@ from flink.functions.MapFunction import MapFunction from flink.functions.CrossFunction import CrossFunction from flink.functions.JoinFunction import JoinFunction from flink.functions.CoGroupFunction import CoGroupFunction +from flink.functions.Aggregation import Max, Min, Sum from utils import Verify, Verify2 if __name__ == "__main__": @@ -38,6 +39,24 @@ if __name__ == "__main__": d6 = env.from_elements(1, 1, 12) + #Aggregate + d4 \ + .group_by(2).aggregate(Sum, 0).and_agg(Max, 1).and_agg(Min, 3) \ + .map_partition(Verify([(3, 0.5, "hello", False), (2, 0.4, "world", False)], "Grouped Aggregate")).output() + + d5 \ + .aggregate(Sum, 0).and_agg(Min, 1).and_agg(Max, 2) \ + .map_partition(Verify([(4.4 + 4.3 + 4.2 + 4.1, 4.1, 3)], "Ungrouped Aggregate")).output() + + #Aggregate syntactic sugar functions + d4 \ + .group_by(2).sum(0).and_agg(Max, 1).and_agg(Min, 3) \ + .map_partition(Verify([(3, 0.5, "hello", False), (2, 0.4, "world", False)], "Grouped Aggregate")).output() + + d5 \ + .sum(0).and_agg(Min, 1).and_agg(Max, 2) \ + .map_partition(Verify([(4.4 + 4.3 + 4.2 + 4.1, 4.1, 3)], "Ungrouped Aggregate")).output() + #Join class Join(JoinFunction): def join(self, value1, value2):