Repository: flink
Updated Branches:
  refs/heads/master a0c3b879b -> 2218cb4c1


[FLINK-4017] [py] Add Aggregation support to Python API

Assembles and applies a GroupReduceFunction using pre-defined
AggregationOperations in Python. References to aggregations in
PythonOperationInfo and other Java classes in the Python API
removed since aggregations are now handled by Python.

This closes #2115


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2218cb4c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2218cb4c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2218cb4c

Branch: refs/heads/master
Commit: 2218cb4c1f21479704ff51b1841d8f1ffed67add
Parents: a0c3b87
Author: Geoffrey Mon <geof...@gmail.com>
Authored: Thu Jun 2 12:10:59 2016 -0400
Committer: zentol <ches...@apache.org>
Committed: Fri Jul 15 12:21:57 2016 +0200

----------------------------------------------------------------------
 docs/apis/batch/dataset_transformations.md      |  10 +-
 docs/apis/batch/python.md                       |  16 +++
 .../flink/python/api/PythonOperationInfo.java   |  37 ------
 .../flink/python/api/PythonPlanBinder.java      |  17 +--
 .../python/api/flink/functions/Aggregation.py   | 100 ++++++++++++++++
 .../flink/python/api/flink/plan/DataSet.py      | 120 +++++++++++++++++--
 .../org/apache/flink/python/api/test_main2.py   |  19 +++
 7 files changed, 254 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/docs/apis/batch/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/dataset_transformations.md 
b/docs/apis/batch/dataset_transformations.md
index 9be9bc0..fe85b31 100644
--- a/docs/apis/batch/dataset_transformations.md
+++ b/docs/apis/batch/dataset_transformations.md
@@ -901,7 +901,10 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
 <div data-lang="python" markdown="1">
 
 ~~~python
-Not supported.
+from flink.functions.Aggregation import Sum, Min
+
+input = # [...]
+output = input.group_by(1).aggregate(Sum, 0).and_agg(Min, 2)
 ~~~
 
 </div>
@@ -1074,7 +1077,10 @@ val output = input.aggregate(SUM, 0).and(MIN, 2)
 <div data-lang="python" markdown="1">
 
 ~~~python
-Not supported.
+from flink.functions.Aggregation import Sum, Min
+
+input = # [...]
+output = input.aggregate(Sum, 0).and_agg(Min, 2)
 ~~~
 
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 9158dfb..b5e81c5 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -265,6 +265,22 @@ data.reduce_group(Adder())
       </td>
     </tr>
 
+    <tr>
+      <td><strong>Aggregate</strong></td>
+      <td>
+        <p>Performs a built-in operation (sum, min, max) on one field of all 
the Tuples in a
+        data set or in each group of a data set. Aggregation can be applied on 
a full dataset
+        or on a grouped data set.</p>
+{% highlight python %}
+# This code finds the sum of all of the values in the first field and the 
maximum of all of the values in the second field
+data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)
+
+# min(), max(), and sum() syntactic sugar functions are also available
+data.sum(0).and_agg(Aggregation.Max, 1)
+{% endhighlight %}
+      </td>
+    </tr>
+
     </tr>
       <td><strong>Join</strong></td>
       <td>

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 89aad22..5f3f9f1 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -16,7 +16,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
@@ -31,7 +30,6 @@ public class PythonOperationInfo {
        public String[] keys1; //join/cogroup keys
        public String[] keys2; //join/cogroup keys
        public TypeInformation<?> types; //typeinformation about output type
-       public AggregationEntry[] aggregates;
        public Object[] values;
        public int count;
        public String field;
@@ -94,15 +92,6 @@ public class PythonOperationInfo {
                        values[x] = streamer.getRecord();
                }
                parallelism = (Integer) streamer.getRecord(true);
-
-               /*
-               aggregates = new AggregationEntry[count];
-               for (int x = 0; x < count; x++) {
-                       int encodedAgg = (Integer) streamer.getRecord(true);
-                       int field = (Integer) streamer.getRecord(true);
-                       aggregates[x] = new AggregationEntry(encodedAgg, field);
-               }
-               */
        }
 
        @Override
@@ -116,7 +105,6 @@ public class PythonOperationInfo {
                sb.append("Keys1: 
").append(Arrays.toString(keys1)).append("\n");
                sb.append("Keys2: 
").append(Arrays.toString(keys2)).append("\n");
                sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
-               sb.append("Aggregates: 
").append(Arrays.toString(aggregates)).append("\n");
                sb.append("Count: ").append(count).append("\n");
                sb.append("Field: ").append(field).append("\n");
                sb.append("Order: ").append(order.toString()).append("\n");
@@ -130,31 +118,6 @@ public class PythonOperationInfo {
                return sb.toString();
        }
 
-       public static class AggregationEntry {
-               public Aggregations agg;
-               public int field;
-
-               public AggregationEntry(int encodedAgg, int field) {
-                       switch (encodedAgg) {
-                               case 0:
-                                       agg = Aggregations.MAX;
-                                       break;
-                               case 1:
-                                       agg = Aggregations.MIN;
-                                       break;
-                               case 2:
-                                       agg = Aggregations.SUM;
-                                       break;
-                       }
-                       this.field = field;
-               }
-
-               @Override
-               public String toString() {
-                       return agg.toString() + " - " + field;
-               }
-       }
-
        public enum DatasizeHint {
                NONE,
                TINY,

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 0c1781a..a6cbfa8 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -27,7 +27,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TupleCsvInputFormat;
-import org.apache.flink.api.java.operators.AggregateOperator;
 import org.apache.flink.api.java.operators.CoGroupRawOperator;
 import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
 import org.apache.flink.api.java.operators.Grouping;
@@ -274,7 +273,7 @@ public class PythonPlanBinder {
         */
        protected enum Operation {
                SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, 
SINK_TEXT, SINK_PRINT,
-               SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
+               SORT, UNION, FIRST, DISTINCT, GROUPBY,
                REBALANCE, PARTITION_HASH,
                BROADCAST,
                COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, 
JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION
@@ -315,9 +314,6 @@ public class PythonPlanBinder {
                                case BROADCAST:
                                        createBroadcastVariable(info);
                                        break;
-                               case AGGREGATE:
-                                       createAggregationOperation(info);
-                                       break;
                                case DISTINCT:
                                        createDistinctOperation(info);
                                        break;
@@ -453,17 +449,6 @@ public class PythonPlanBinder {
                op1.withParameters(c);
        }
 
-       private void createAggregationOperation(PythonOperationInfo info) 
throws IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               AggregateOperator ao = op.aggregate(info.aggregates[0].agg, 
info.aggregates[0].field);
-
-               for (int x = 1; x < info.count; x++) {
-                       ao = ao.and(info.aggregates[x].agg, 
info.aggregates[x].field);
-               }
-
-               sets.put(info.setID, 
ao.setParallelism(getParallelism(info)).name("Aggregation"));
-       }
-
        @SuppressWarnings("unchecked")
        private void createDistinctOperation(PythonOperationInfo info) throws 
IOException {
                DataSet op = (DataSet) sets.get(info.parentID);

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py
new file mode 100644
index 0000000..0ff1771
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Aggregation.py
@@ -0,0 +1,100 @@
+# 
###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class AggregationFunction(GroupReduceFunction):
+    def __init__(self, aggregation, field):
+        super(AggregationFunction, self).__init__()
+        self.aggregations = [aggregation(field)]
+
+    def add_aggregation(self, aggregation, field):
+        """
+        Add an additional aggregation operator
+        :param aggregation: Built-in aggregation operator to apply
+        :param field: Field on which to apply the specified aggregation
+        """
+        self.aggregations.append(aggregation(field))
+
+    def reduce(self, iterator, collector):
+        # Reset each aggregator
+        for aggregator in self.aggregations:
+            aggregator.initialize_aggregation()
+
+        # Tuple that will be filled in with aggregated values
+        item = None
+
+        # Run each value through the aggregator
+        for x in iterator:
+            if item is None:
+                # Get first value that will be filled in
+                item = list(x)
+
+            for aggregator in self.aggregations:
+                aggregator.aggregate(x[aggregator.field])
+
+        # Get results
+        for aggregator in self.aggregations:
+            item[aggregator.field] = aggregator.get_aggregate()
+
+        collector.collect(tuple(item))
+
+
+class AggregationOperator(object):
+    def __init__(self, field):
+        self.field = field
+
+    def initialize_aggregation(self):
+        """Set up or reset the aggregator operator."""
+        self.agg = None
+
+    def aggregate(self, value):
+        """Incorporate a value into the aggregation."""
+        pass
+
+    def get_aggregate(self):
+        """Return the result of the aggregation."""
+        return self.agg
+
+
+class Sum(AggregationOperator):
+    def initialize_aggregation(self):
+        self.agg = 0
+
+    def aggregate(self, value):
+        self.agg += value
+
+
+class Min(AggregationOperator):
+    def aggregate(self, value):
+        if self.agg != None:
+            if value < self.agg:
+                self.agg = value
+        else:
+            self.agg = value
+
+
+class Max(AggregationOperator):
+    def aggregate(self, value):
+        if self.agg != None:
+            if value > self.agg:
+                self.agg = value
+        else:
+            self.agg = value
+
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index fa83259..caa4ae7 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -18,6 +18,7 @@
 import collections
 import types as TYPES
 
+from flink.functions.Aggregation import AggregationFunction, Min, Max, Sum
 from flink.plan.Constants import _Identifier, WriteMode, 
_createKeyValueTypeInfo, _createArrayTypeInfo
 from flink.plan.OperationInfo import OperationInfo
 from flink.functions.CoGroupFunction import CoGroupFunction
@@ -152,20 +153,24 @@ class DataSet(object):
         :param operator: The GroupReduceFunction that is applied on the 
DataSet.
         :return:A GroupReduceOperator that represents the reduced DataSet.
         """
+        child = self._reduce_group(operator, combinable)
+        child_set = OperatorSet(self._env, child)
+        self._info.children.append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def _reduce_group(self, operator, combinable=False):
         if isinstance(operator, TYPES.FunctionType):
             f = operator
             operator = GroupReduceFunction()
             operator.reduce = f
         child = OperationInfo()
-        child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
         child.operator = operator
         child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
-        self._info.children.append(child)
-        self._env._sets.append(child)
-        return child_set
+        return child
 
     def reduce(self, operator):
         """
@@ -192,6 +197,44 @@ class DataSet(object):
         self._env._sets.append(child)
         return child_set
 
+    def aggregate(self, aggregation, field):
+        """
+        Applies an Aggregate transformation (using a GroupReduceFunction) on a 
non-grouped Tuple DataSet.
+        :param aggregation: The built-in aggregation function to apply on the 
DataSet.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated DataSet.
+        """
+        child = self._reduce_group(AggregationFunction(aggregation, field), 
combinable=True)
+        child.name = "PythonAggregate" + aggregation.__name__  # include 
aggregation type in name
+        child_set = AggregateOperator(self._env, child)
+        self._info.children.append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def min(self, field):
+        """
+        Syntactic sugar for the minimum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated DataSet.
+        """
+        return self.aggregate(Min, field)
+
+    def max(self, field):
+        """
+        Syntactic sugar for the maximum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated DataSet.
+        """
+        return self.aggregate(Max, field)
+
+    def sum(self, field):
+        """
+        Syntactic sugar for the sum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated DataSet.
+        """
+        return self.aggregate(Sum, field)
+
     def project(self, *fields):
         """
         Applies a Project transformation on a Tuple DataSet.
@@ -671,24 +714,28 @@ class Grouping(object):
         :param operator: The GroupReduceFunction that is applied on the 
DataSet.
         :return:A GroupReduceOperator that represents the reduced DataSet.
         """
+        child = self._reduce_group(operator, combinable)
+        child_set = OperatorSet(self._env, child)
+        self._info.parallelism = child.parallelism
+        self._info.children.append(child)
+        self._env._sets.append(child)
+
+        return child_set
+
+    def _reduce_group(self, operator, combinable=False):
         self._finalize()
         if isinstance(operator, TYPES.FunctionType):
             f = operator
             operator = GroupReduceFunction()
             operator.reduce = f
         child = OperationInfo()
-        child_set = OperatorSet(self._env, child)
         child.identifier = _Identifier.GROUPREDUCE
         child.parent = self._info
         child.operator = operator
         child.types = _createArrayTypeInfo()
         child.name = "PythonGroupReduce"
         child.key1 = self._child_chain[0].keys
-        self._info.parallelism = child.parallelism
-        self._info.children.append(child)
-        self._env._sets.append(child)
-
-        return child_set
+        return child
 
     def sort_group(self, field, order):
         """
@@ -746,6 +793,44 @@ class UnsortedGrouping(Grouping):
 
         return child_set
 
+    def aggregate(self, aggregation, field):
+        """
+        Applies an Aggregate transformation (using a GroupReduceFunction) on a 
Tuple UnsortedGrouping.
+        :param aggregation: The built-in aggregation function to apply on the 
UnsortedGrouping.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated 
UnsortedGrouping.
+        """
+        child = self._reduce_group(AggregationFunction(aggregation, field), 
combinable=True)
+        child.name = "PythonAggregate" + aggregation.__name__  # include 
aggregation type in name
+        child_set = AggregateOperator(self._env, child)
+        self._env._sets.append(child)
+        self._info.children.append(child)
+        return child_set
+
+    def min(self, field):
+        """
+        Syntactic sugar for the minimum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated 
UnsortedGrouping.
+        """
+        return self.aggregate(Min, field)
+
+    def max(self, field):
+        """
+        Syntactic sugar for the maximum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated 
UnsortedGrouping.
+        """
+        return self.aggregate(Max, field)
+
+    def sum(self, field):
+        """
+        Syntactic sugar for the sum aggregation.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated 
UnsortedGrouping.
+        """
+        return self.aggregate(Sum, field)
+
     def _finalize(self):
         grouping = self._child_chain[0]
         keys = grouping.keys
@@ -1108,3 +1193,18 @@ class CrossOperator(DataSet, Projectable):
         self._info.name = "PythonCross"
         self._info.uses_udf = True
         return OperatorSet(self._env, self._info)
+
+
+class AggregateOperator(OperatorSet):
+    def __init__(self, env, info):
+        super(AggregateOperator, self).__init__(env, info)
+
+    def and_agg(self, aggregation, field):
+        """
+        Applies an additional Aggregate transformation.
+        :param aggregation: The built-in aggregation operation to apply on the 
DataSet.
+        :param field: The index of the Tuple field on which to perform the 
function.
+        :return: An AggregateOperator that represents the aggregated DataSet.
+        """
+        self._info.operator.add_aggregation(aggregation, field)
+        return self

http://git-wip-us.apache.org/repos/asf/flink/blob/2218cb4c/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index 2ea6f91..ceb26d0 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -21,6 +21,7 @@ from flink.functions.MapFunction import MapFunction
 from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.functions.Aggregation import Max, Min, Sum
 from utils import Verify, Verify2
 
 if __name__ == "__main__":
@@ -38,6 +39,24 @@ if __name__ == "__main__":
 
     d6 = env.from_elements(1, 1, 12)
 
+    #Aggregate
+    d4 \
+        .group_by(2).aggregate(Sum, 0).and_agg(Max, 1).and_agg(Min, 3) \
+        .map_partition(Verify([(3, 0.5, "hello", False), (2, 0.4, "world", 
False)], "Grouped Aggregate")).output()
+
+    d5 \
+        .aggregate(Sum, 0).and_agg(Min, 1).and_agg(Max, 2) \
+        .map_partition(Verify([(4.4 + 4.3 + 4.2 + 4.1, 4.1, 3)], "Ungrouped 
Aggregate")).output()
+
+    #Aggregate syntactic sugar functions
+    d4 \
+        .group_by(2).sum(0).and_agg(Max, 1).and_agg(Min, 3) \
+        .map_partition(Verify([(3, 0.5, "hello", False), (2, 0.4, "world", 
False)], "Grouped Aggregate")).output()
+
+    d5 \
+        .sum(0).and_agg(Min, 1).and_agg(Max, 2) \
+        .map_partition(Verify([(4.4 + 4.3 + 4.2 + 4.1, 4.1, 3)], "Ungrouped 
Aggregate")).output()
+
     #Join
     class Join(JoinFunction):
         def join(self, value1, value2):

Reply via email to