Repository: flink
Updated Branches:
  refs/heads/master 54b52c9be -> ab8470714


http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index a2279c3..4f2c5e3 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -159,8 +159,8 @@ class Environment(object):
         if plan_mode:
             port = int(sys.stdin.readline().rstrip('\n'))
             self._connection = Connection.PureTCPConnection(port)
-            self._iterator = Iterator.TypedIterator(self._connection, self)
-            self._collector = Collector.TypedCollector(self._connection, self)
+            self._iterator = Iterator.PlanIterator(self._connection, self)
+            self._collector = Collector.PlanCollector(self._connection, self)
             self._send_plan()
             result = self._receive_result()
             self._connection.close()
@@ -175,13 +175,13 @@ class Environment(object):
                 input_path = sys.stdin.readline().rstrip('\n')
                 output_path = sys.stdin.readline().rstrip('\n')
 
+                used_set = None
                 operator = None
                 for set in self._sets:
                     if set.id == id:
+                        used_set = set
                         operator = set.operator
-                    if set.id == -id:
-                        operator = set.combineop
-                operator._configure(input_path, output_path, port, self)
+                operator._configure(input_path, output_path, port, self, 
used_set)
                 operator._go()
                 operator._close()
                 sys.stdout.flush()
@@ -211,7 +211,7 @@ class Environment(object):
             if child_type in chainable:
                 parent = child.parent
                 if parent.operator is not None and len(parent.children) == 1 
and len(parent.sinks) == 0:
-                    parent.operator._chain(child.operator)
+                    parent.chained_info = child
                     parent.name += " -> " + child.name
                     parent.types = child.types
                     for grand_child in child.children:
@@ -297,11 +297,8 @@ class Environment(object):
                     break
                 if case(_Identifier.CROSS, _Identifier.CROSSH, 
_Identifier.CROSST):
                     collect(set.other.id)
+                    collect(set.uses_udf)
                     collect(set.types)
-                    collect(len(set.projections))
-                    for p in set.projections:
-                        collect(p[0])
-                        collect(p[1])
                     collect(set.name)
                     break
                 if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
@@ -312,11 +309,8 @@ class Environment(object):
                     collect(set.key1)
                     collect(set.key2)
                     collect(set.other.id)
+                    collect(set.uses_udf)
                     collect(set.types)
-                    collect(len(set.projections))
-                    for p in set.projections:
-                        collect(p[0])
-                        collect(p[1])
                     collect(set.name)
                     break
                 if case(_Identifier.MAP, _Identifier.MAPPARTITION, 
_Identifier.FLATMAP, _Identifier.FILTER):

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
index c47fab5..3605d7f 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py
@@ -23,6 +23,8 @@ class OperationInfo():
         if info is None:
             self.parent = None
             self.other = None
+            self.parent_set = None
+            self.other_set = None
             self.identifier = None
             self.field = None
             self.order = None
@@ -31,6 +33,7 @@ class OperationInfo():
             self.key2 = None
             self.types = None
             self.operator = None
+            self.uses_udf = False
             self.name = None
             self.delimiter_line = "\n"
             self.delimiter_field = ","
@@ -43,6 +46,7 @@ class OperationInfo():
             self.bcvars = []
             self.id = None
             self.to_err = False
+            self.chained_info = None
         else:
             self.__dict__.update(info.__dict__)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
index c9bc404..3e718d3 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -22,7 +22,8 @@ from flink.functions.FilterFunction import FilterFunction
 from flink.functions.MapPartitionFunction import MapPartitionFunction
 from flink.functions.ReduceFunction import ReduceFunction
 from flink.functions.GroupReduceFunction import GroupReduceFunction
-from flink.plan.Constants import INT, STRING, FLOAT, BOOL, BYTES, CUSTOM, 
Order, WriteMode
+from flink.plan.Constants import Order, WriteMode
+from flink.plan.Constants import INT, STRING
 import struct
 
 #Utilities
@@ -73,7 +74,7 @@ if __name__ == "__main__":
 
     d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), 
(1, 0.5, "hello", True), (2, 0.4, "world", False))
 
-    d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 
4.1, 3))
+    d5 = env.from_elements((1, 2.4), (1, 3.7), (1, 0.4), (1, 5.4))
 
     d6 = env.from_elements(1, 1, 12)
 
@@ -89,19 +90,19 @@ if __name__ == "__main__":
 
     #Types
     env.from_elements(bytearray(b"hello"), bytearray(b"world"))\
-        .map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), 
bytearray(b"world")], "Byte"), STRING).output()
+        .map(Id()).map_partition(Verify([bytearray(b"hello"), 
bytearray(b"world")], "Byte")).output()
 
     env.from_elements(1, 2, 3, 4, 5)\
-        .map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), 
STRING).output()
+        .map(Id()).map_partition(Verify([1,2,3,4,5], "Int")).output()
 
     env.from_elements(True, True, False)\
-        .map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), 
STRING).output()
+        .map(Id()).map_partition(Verify([True, True, False], "Bool")).output()
 
     env.from_elements(1.4, 1.7, 12312.23)\
-        .map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), 
STRING).output()
+        .map(Id()).map_partition(Verify([1.4, 1.7, 12312.23], 
"Float")).output()
 
     env.from_elements("hello", "world")\
-        .map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), 
STRING).output()
+        .map(Id()).map_partition(Verify(["hello", "world"], "String")).output()
 
     #Custom Serialization
     class Ext(MapPartitionFunction):
@@ -125,16 +126,16 @@ if __name__ == "__main__":
     env.register_type(MyObj, MySerializer(), MyDeserializer())
 
     env.from_elements(MyObj(2), MyObj(4)) \
-        .map(Id(), CUSTOM).map_partition(Ext(), INT) \
-        .map_partition(Verify([2, 4], "CustomTypeSerialization"), 
STRING).output()
+        .map(Id()).map_partition(Ext()) \
+        .map_partition(Verify([2, 4], "CustomTypeSerialization")).output()
 
     #Map
     class Mapper(MapFunction):
         def map(self, value):
             return value * value
     d1 \
-        .map((lambda x: x * x), INT).map(Mapper(), INT) \
-        .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
+        .map((lambda x: x * x)).map(Mapper()) \
+        .map_partition(Verify([1, 1296, 20736], "Map")).output()
 
     #FlatMap
     class FlatMap(FlatMapFunction):
@@ -142,8 +143,8 @@ if __name__ == "__main__":
             collector.collect(value)
             collector.collect(value * 2)
     d1 \
-        .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
-        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], 
"FlatMap"), STRING).output()
+        .flat_map(FlatMap()).flat_map(FlatMap()) \
+        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], 
"FlatMap")).output()
 
     #MapPartition
     class MapPartition(MapPartitionFunction):
@@ -151,8 +152,8 @@ if __name__ == "__main__":
             for value in iterator:
                 collector.collect(value * 2)
     d1 \
-        .map_partition(MapPartition(), INT) \
-        .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
+        .map_partition(MapPartition()) \
+        .map_partition(Verify([2, 12, 24], "MapPartition")).output()
 
     #Filter
     class Filter(FilterFunction):
@@ -164,7 +165,7 @@ if __name__ == "__main__":
             return value > self.limit
     d1 \
         .filter(Filter(5)).filter(Filter(8)) \
-        .map_partition(Verify([12], "Filter"), STRING).output()
+        .map_partition(Verify([12], "Filter")).output()
 
     #Reduce
     class Reduce(ReduceFunction):
@@ -176,7 +177,11 @@ if __name__ == "__main__":
             return (value1[0] + value2[0], value1[1] + value2[1], value1[2], 
value1[3] or value2[3])
     d1 \
         .reduce(Reduce()) \
-        .map_partition(Verify([19], "AllReduce"), STRING).output()
+        .map_partition(Verify([19], "AllReduce")).output()
+
+    d4 \
+        .group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "GroupedReduce")).output()
 
     #GroupReduce
     class GroupReduce(GroupReduceFunction):
@@ -193,9 +198,31 @@ if __name__ == "__main__":
         def reduce(self, iterator, collector):
             for value in iterator:
                 collector.collect(value)
+
+    d4 \
+        .reduce_group(GroupReduce2()) \
+        .map_partition(Verify([(1, 0.5, "hello", True), (1, 0.4, "hello", 
False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], 
"AllGroupReduce")).output()
+    d4 \
+        .group_by(lambda x: x[2]).reduce_group(GroupReduce(), 
combinable=False) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "GroupReduceWithKeySelector")).output()
     d4 \
-        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=False) \
-        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "AllGroupReduce"), STRING).output()
+        .group_by(2).reduce_group(GroupReduce()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "GroupReduce")).output()
+    d5 \
+        .group_by(0).sort_group(1, 
Order.ASCENDING).reduce_group(GroupReduce2(), combinable=True) \
+        .map_partition(Verify([(1, 0.4), (1, 2.4), (1, 3.7), (1, 5.4)], 
"SortedGroupReduceAsc")).output()
+    d5 \
+        .group_by(0).sort_group(1, 
Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \
+        .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], 
"SortedGroupReduceDes")).output()
+    d5 \
+        .group_by(lambda x: x[0]).sort_group(1, 
Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \
+        .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], 
"SortedGroupReduceKeySelG")).output()
+    d5 \
+        .group_by(0).sort_group(lambda x: x[1], 
Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \
+        .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], 
"SortedGroupReduceKeySelS")).output()
+    d5 \
+        .group_by(lambda x: x[0]).sort_group(lambda x: x[1], 
Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \
+        .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], 
"SortedGroupReduceKeySelGS")).output()
 
     #Execution
     env.set_parallelism(1)

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
index 56e3250..6bf1fab 100644
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py
@@ -22,7 +22,6 @@ from flink.functions.MapPartitionFunction import 
MapPartitionFunction
 from flink.functions.CrossFunction import CrossFunction
 from flink.functions.JoinFunction import JoinFunction
 from flink.functions.CoGroupFunction import CoGroupFunction
-from flink.plan.Constants import BOOL, INT, FLOAT, STRING
 
 
 #Utilities
@@ -85,28 +84,32 @@ if __name__ == "__main__":
             else:
                 return value2[0] + str(value1[1])
     d2 \
-        .join(d3).where(2).equal_to(0).using(Join(), STRING) \
-        .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
+        .join(d3).where(2).equal_to(0).using(Join()) \
+        .map_partition(Verify(["hello1", "world0.4"], "Join")).output()
+    d2 \
+        .join(d3).where(lambda x: x[2]).equal_to(0).using(Join()) \
+        .map_partition(Verify(["hello1", "world0.4"], 
"JoinWithKeySelector")).output()
     d2 \
         .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
-        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], 
"Project Join"), STRING).output()
+        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], 
"Project Join")).output()
     d2 \
         .join(d3).where(2).equal_to(0) \
-        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 
0.4, "world", False), ("world",))], "Default Join"), STRING).output()
+        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 
0.4, "world", False), ("world",))], "Default Join")).output()
 
     #Cross
     class Cross(CrossFunction):
         def cross(self, value1, value2):
             return (value1, value2[3])
     d1 \
-        .cross(d2).using(Cross(), (INT, BOOL)) \
-        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), 
(12, True), (12, False)], "Cross"), STRING).output()
+        .cross(d2).using(Cross()) \
+        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), 
(12, True), (12, False)], "Cross")).output()
     d1 \
         .cross(d3) \
-        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, 
("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default 
Cross"), STRING).output()
+        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, 
("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default 
Cross")).output()
+
     d2 \
         .cross(d3).project_second(0).project_first(0, 1) \
-        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 
2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
+        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 
2, 0.4), ("world", 2, 0.4)], "Project Cross")).output()
 
     #CoGroup
     class CoGroup(CoGroupFunction):
@@ -114,8 +117,8 @@ if __name__ == "__main__":
             while iterator1.has_next() and iterator2.has_next():
                 collector.collect((iterator1.next(), iterator2.next()))
     d4 \
-        .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, 
STRING, BOOL), (FLOAT, FLOAT, INT))) \
-        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 
0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
+        .co_group(d5).where(0).equal_to(2).using(CoGroup()) \
+        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 
0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup")).output()
 
     #Broadcast
     class MapperBcv(MapFunction):
@@ -123,22 +126,23 @@ if __name__ == "__main__":
             factor = self.context.get_broadcast_variable("test")[0][0]
             return value * factor
     d1 \
-        .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
-        .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
+        .map(MapperBcv()).with_broadcast_set("test", d2) \
+        .map_partition(Verify([1, 6, 12], "Broadcast")).output()
 
     #Misc
     class Mapper(MapFunction):
         def map(self, value):
             return value * value
     d1 \
-        .map(Mapper(), INT).map((lambda x: x * x), INT) \
-        .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), 
STRING).output()
+        .map(Mapper()).map((lambda x: x * x)) \
+        .map_partition(Verify([1, 1296, 20736], "Chained Lambda")).output()
     d2 \
         .project(0, 1, 2) \
-        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], 
"Project"), STRING).output()
+        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], 
"Project")).output()
     d2 \
         .union(d4) \
-        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", 
False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", 
True), (2, 0.4, "world", False)], "Union"), STRING).output()
+        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", 
False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", 
True), (2, 0.4, "world", False)], "Union")).output()
+
 
     #Execution
     env.set_parallelism(1)

http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
deleted file mode 100644
index 1ff3f92..0000000
--- 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
+++ /dev/null
@@ -1,73 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import BOOL, STRING
-from flink.functions.MapPartitionFunction import MapPartitionFunction
-
-
-class Verify(MapPartitionFunction):
-    def __init__(self, msg):
-        super(Verify, self).__init__()
-        self.msg = msg
-
-    def map_partition(self, iterator, collector):
-        if self.msg is None:
-            return
-        else:
-            raise Exception("Type Deduction failed: " + self.msg)
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    d1 = env.from_elements(("hello", 4, 3.2, True))
-
-    d2 = env.from_elements("world")
-
-    direct_from_source = d1.filter(lambda x:True)
-
-    msg = None
-
-    if direct_from_source._info.types != ("hello", 4, 3.2, True):
-        msg = "Error deducting type directly from source."
-
-    from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
-
-    if from_common_udf._info.types != BOOL:
-        msg = "Error deducting type from common udf."
-
-    through_projection = d1.project(3, 2).filter(lambda x:True)
-
-    if through_projection._info.types != (True, 3.2):
-        msg = "Error deducting type through projection."
-
-    through_default_op = d1.cross(d2).filter(lambda x:True)
-
-    if through_default_op._info.types != (("hello", 4, 3.2, True), "world"):
-        msg = "Error deducting type through default J/C." 
+str(through_default_op._info.types)
-
-    through_prj_op = d1.cross(d2).project_first(1, 
0).project_second().project_first(3, 2).filter(lambda x:True)
-
-    if through_prj_op._info.types != (4, "hello", "world", True, 3.2):
-        msg = "Error deducting type through projection J/C. 
"+str(through_prj_op._info.types)
-
-
-    env = get_environment()
-
-    env.from_elements("dummy").map_partition(Verify(msg), STRING).output()
-
-    env.execute(local=True)

Reply via email to