Repository: flink Updated Branches: refs/heads/master 54b52c9be -> ab8470714
http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index a2279c3..4f2c5e3 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -159,8 +159,8 @@ class Environment(object): if plan_mode: port = int(sys.stdin.readline().rstrip('\n')) self._connection = Connection.PureTCPConnection(port) - self._iterator = Iterator.TypedIterator(self._connection, self) - self._collector = Collector.TypedCollector(self._connection, self) + self._iterator = Iterator.PlanIterator(self._connection, self) + self._collector = Collector.PlanCollector(self._connection, self) self._send_plan() result = self._receive_result() self._connection.close() @@ -175,13 +175,13 @@ class Environment(object): input_path = sys.stdin.readline().rstrip('\n') output_path = sys.stdin.readline().rstrip('\n') + used_set = None operator = None for set in self._sets: if set.id == id: + used_set = set operator = set.operator - if set.id == -id: - operator = set.combineop - operator._configure(input_path, output_path, port, self) + operator._configure(input_path, output_path, port, self, used_set) operator._go() operator._close() sys.stdout.flush() @@ -211,7 +211,7 @@ class Environment(object): if child_type in chainable: parent = child.parent if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0: - parent.operator._chain(child.operator) + parent.chained_info = child parent.name += " -> " + child.name parent.types = child.types for grand_child in child.children: @@ -297,11 +297,8 @@ class Environment(object): break if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST): collect(set.other.id) + collect(set.uses_udf) collect(set.types) - collect(len(set.projections)) - for p in set.projections: - collect(p[0]) - collect(p[1]) collect(set.name) break if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE): @@ -312,11 +309,8 @@ class Environment(object): collect(set.key1) collect(set.key2) collect(set.other.id) + collect(set.uses_udf) collect(set.types) - collect(len(set.projections)) - for p in set.projections: - collect(p[0]) - collect(p[1]) collect(set.name) break if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER): http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py index c47fab5..3605d7f 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/OperationInfo.py @@ -23,6 +23,8 @@ class OperationInfo(): if info is None: self.parent = None self.other = None + self.parent_set = None + self.other_set = None self.identifier = None self.field = None self.order = None @@ -31,6 +33,7 @@ class OperationInfo(): self.key2 = None self.types = None self.operator = None + self.uses_udf = False self.name = None self.delimiter_line = "\n" self.delimiter_field = "," @@ -43,6 +46,7 @@ class OperationInfo(): self.bcvars = [] self.id = None self.to_err = False + self.chained_info = None else: self.__dict__.update(info.__dict__) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index c9bc404..3e718d3 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -22,7 +22,8 @@ from flink.functions.FilterFunction import FilterFunction from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.ReduceFunction import ReduceFunction from flink.functions.GroupReduceFunction import GroupReduceFunction -from flink.plan.Constants import INT, STRING, FLOAT, BOOL, BYTES, CUSTOM, Order, WriteMode +from flink.plan.Constants import Order, WriteMode +from flink.plan.Constants import INT, STRING import struct #Utilities @@ -73,7 +74,7 @@ if __name__ == "__main__": d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)) - d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3)) + d5 = env.from_elements((1, 2.4), (1, 3.7), (1, 0.4), (1, 5.4)) d6 = env.from_elements(1, 1, 12) @@ -89,19 +90,19 @@ if __name__ == "__main__": #Types env.from_elements(bytearray(b"hello"), bytearray(b"world"))\ - .map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output() + .map(Id()).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte")).output() env.from_elements(1, 2, 3, 4, 5)\ - .map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output() + .map(Id()).map_partition(Verify([1,2,3,4,5], "Int")).output() env.from_elements(True, True, False)\ - .map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output() + .map(Id()).map_partition(Verify([True, True, False], "Bool")).output() env.from_elements(1.4, 1.7, 12312.23)\ - .map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output() + .map(Id()).map_partition(Verify([1.4, 1.7, 12312.23], "Float")).output() env.from_elements("hello", "world")\ - .map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output() + .map(Id()).map_partition(Verify(["hello", "world"], "String")).output() #Custom Serialization class Ext(MapPartitionFunction): @@ -125,16 +126,16 @@ if __name__ == "__main__": env.register_type(MyObj, MySerializer(), MyDeserializer()) env.from_elements(MyObj(2), MyObj(4)) \ - .map(Id(), CUSTOM).map_partition(Ext(), INT) \ - .map_partition(Verify([2, 4], "CustomTypeSerialization"), STRING).output() + .map(Id()).map_partition(Ext()) \ + .map_partition(Verify([2, 4], "CustomTypeSerialization")).output() #Map class Mapper(MapFunction): def map(self, value): return value * value d1 \ - .map((lambda x: x * x), INT).map(Mapper(), INT) \ - .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output() + .map((lambda x: x * x)).map(Mapper()) \ + .map_partition(Verify([1, 1296, 20736], "Map")).output() #FlatMap class FlatMap(FlatMapFunction): @@ -142,8 +143,8 @@ if __name__ == "__main__": collector.collect(value) collector.collect(value * 2) d1 \ - .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \ - .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output() + .flat_map(FlatMap()).flat_map(FlatMap()) \ + .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap")).output() #MapPartition class MapPartition(MapPartitionFunction): @@ -151,8 +152,8 @@ if __name__ == "__main__": for value in iterator: collector.collect(value * 2) d1 \ - .map_partition(MapPartition(), INT) \ - .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output() + .map_partition(MapPartition()) \ + .map_partition(Verify([2, 12, 24], "MapPartition")).output() #Filter class Filter(FilterFunction): @@ -164,7 +165,7 @@ if __name__ == "__main__": return value > self.limit d1 \ .filter(Filter(5)).filter(Filter(8)) \ - .map_partition(Verify([12], "Filter"), STRING).output() + .map_partition(Verify([12], "Filter")).output() #Reduce class Reduce(ReduceFunction): @@ -176,7 +177,11 @@ if __name__ == "__main__": return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3]) d1 \ .reduce(Reduce()) \ - .map_partition(Verify([19], "AllReduce"), STRING).output() + .map_partition(Verify([19], "AllReduce")).output() + + d4 \ + .group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "GroupedReduce")).output() #GroupReduce class GroupReduce(GroupReduceFunction): @@ -193,9 +198,31 @@ if __name__ == "__main__": def reduce(self, iterator, collector): for value in iterator: collector.collect(value) + + d4 \ + .reduce_group(GroupReduce2()) \ + .map_partition(Verify([(1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce")).output() + d4 \ + .group_by(lambda x: x[2]).reduce_group(GroupReduce(), combinable=False) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "GroupReduceWithKeySelector")).output() d4 \ - .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \ - .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output() + .group_by(2).reduce_group(GroupReduce()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "GroupReduce")).output() + d5 \ + .group_by(0).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce2(), combinable=True) \ + .map_partition(Verify([(1, 0.4), (1, 2.4), (1, 3.7), (1, 5.4)], "SortedGroupReduceAsc")).output() + d5 \ + .group_by(0).sort_group(1, Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \ + .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], "SortedGroupReduceDes")).output() + d5 \ + .group_by(lambda x: x[0]).sort_group(1, Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \ + .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], "SortedGroupReduceKeySelG")).output() + d5 \ + .group_by(0).sort_group(lambda x: x[1], Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \ + .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], "SortedGroupReduceKeySelS")).output() + d5 \ + .group_by(lambda x: x[0]).sort_group(lambda x: x[1], Order.DESCENDING).reduce_group(GroupReduce2(), combinable=True) \ + .map_partition(Verify([(1, 5.4), (1, 3.7), (1, 2.4), (1, 0.4)], "SortedGroupReduceKeySelGS")).output() #Execution env.set_parallelism(1) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 56e3250..6bf1fab 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -22,7 +22,6 @@ from flink.functions.MapPartitionFunction import MapPartitionFunction from flink.functions.CrossFunction import CrossFunction from flink.functions.JoinFunction import JoinFunction from flink.functions.CoGroupFunction import CoGroupFunction -from flink.plan.Constants import BOOL, INT, FLOAT, STRING #Utilities @@ -85,28 +84,32 @@ if __name__ == "__main__": else: return value2[0] + str(value1[1]) d2 \ - .join(d3).where(2).equal_to(0).using(Join(), STRING) \ - .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output() + .join(d3).where(2).equal_to(0).using(Join()) \ + .map_partition(Verify(["hello1", "world0.4"], "Join")).output() + d2 \ + .join(d3).where(lambda x: x[2]).equal_to(0).using(Join()) \ + .map_partition(Verify(["hello1", "world0.4"], "JoinWithKeySelector")).output() d2 \ .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \ - .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output() + .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join")).output() d2 \ .join(d3).where(2).equal_to(0) \ - .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output() + .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join")).output() #Cross class Cross(CrossFunction): def cross(self, value1, value2): return (value1, value2[3]) d1 \ - .cross(d2).using(Cross(), (INT, BOOL)) \ - .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output() + .cross(d2).using(Cross()) \ + .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross")).output() d1 \ .cross(d3) \ - .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output() + .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross")).output() + d2 \ .cross(d3).project_second(0).project_first(0, 1) \ - .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output() + .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross")).output() #CoGroup class CoGroup(CoGroupFunction): @@ -114,8 +117,8 @@ if __name__ == "__main__": while iterator1.has_next() and iterator2.has_next(): collector.collect((iterator1.next(), iterator2.next())) d4 \ - .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \ - .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output() + .co_group(d5).where(0).equal_to(2).using(CoGroup()) \ + .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup")).output() #Broadcast class MapperBcv(MapFunction): @@ -123,22 +126,23 @@ if __name__ == "__main__": factor = self.context.get_broadcast_variable("test")[0][0] return value * factor d1 \ - .map(MapperBcv(), INT).with_broadcast_set("test", d2) \ - .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output() + .map(MapperBcv()).with_broadcast_set("test", d2) \ + .map_partition(Verify([1, 6, 12], "Broadcast")).output() #Misc class Mapper(MapFunction): def map(self, value): return value * value d1 \ - .map(Mapper(), INT).map((lambda x: x * x), INT) \ - .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output() + .map(Mapper()).map((lambda x: x * x)) \ + .map_partition(Verify([1, 1296, 20736], "Chained Lambda")).output() d2 \ .project(0, 1, 2) \ - .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output() + .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project")).output() d2 \ .union(d4) \ - .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() + .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union")).output() + #Execution env.set_parallelism(1) http://git-wip-us.apache.org/repos/asf/flink/blob/ab847071/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py deleted file mode 100644 index 1ff3f92..0000000 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py +++ /dev/null @@ -1,73 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from flink.plan.Environment import get_environment -from flink.plan.Constants import BOOL, STRING -from flink.functions.MapPartitionFunction import MapPartitionFunction - - -class Verify(MapPartitionFunction): - def __init__(self, msg): - super(Verify, self).__init__() - self.msg = msg - - def map_partition(self, iterator, collector): - if self.msg is None: - return - else: - raise Exception("Type Deduction failed: " + self.msg) - -if __name__ == "__main__": - env = get_environment() - - d1 = env.from_elements(("hello", 4, 3.2, True)) - - d2 = env.from_elements("world") - - direct_from_source = d1.filter(lambda x:True) - - msg = None - - if direct_from_source._info.types != ("hello", 4, 3.2, True): - msg = "Error deducting type directly from source." - - from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True) - - if from_common_udf._info.types != BOOL: - msg = "Error deducting type from common udf." - - through_projection = d1.project(3, 2).filter(lambda x:True) - - if through_projection._info.types != (True, 3.2): - msg = "Error deducting type through projection." - - through_default_op = d1.cross(d2).filter(lambda x:True) - - if through_default_op._info.types != (("hello", 4, 3.2, True), "world"): - msg = "Error deducting type through default J/C." +str(through_default_op._info.types) - - through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True) - - if through_prj_op._info.types != (4, "hello", "world", True, 3.2): - msg = "Error deducting type through projection J/C. "+str(through_prj_op._info.types) - - - env = get_environment() - - env.from_elements("dummy").map_partition(Verify(msg), STRING).output() - - env.execute(local=True)