http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py new file mode 100644 index 0000000..a906fb2 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py @@ -0,0 +1,907 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import inspect +import copy +import types as TYPES + +from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING +from flink.functions.CoGroupFunction import CoGroupFunction +from flink.functions.FilterFunction import FilterFunction +from flink.functions.FlatMapFunction import FlatMapFunction +from flink.functions.CrossFunction import CrossFunction +from flink.functions.GroupReduceFunction import GroupReduceFunction +from flink.functions.JoinFunction import JoinFunction +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.functions.ReduceFunction import ReduceFunction + + +def deduct_output_type(dataset): + skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION]) + source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, _Identifier.SOURCE_VALUE]) + default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN]) + + while True: + dataset_type = dataset[_Fields.IDENTIFIER] + if dataset_type in skip: + dataset = dataset[_Fields.PARENT] + continue + if dataset_type in source: + if dataset_type == _Identifier.SOURCE_TEXT: + return STRING + if dataset_type == _Identifier.SOURCE_VALUE: + return dataset[_Fields.VALUES][0] + if dataset_type == _Identifier.SOURCE_CSV: + return dataset[_Fields.TYPES] + if dataset_type == _Identifier.PROJECTION: + return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k in dataset[_Fields.KEYS]]) + if dataset_type in default: + if dataset[_Fields.OPERATOR] is not None: #udf-join/cross + return dataset[_Fields.TYPES] + if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross + return (deduct_output_type(dataset[_Fields.PARENT]), deduct_output_type(dataset[_Fields.OTHER])) + else: #projectjoin/-cross + t1 = deduct_output_type(dataset[_Fields.PARENT]) + t2 = deduct_output_type(dataset[_Fields.OTHER]) + out_type = [] + for prj in dataset[_Fields.PROJECTIONS]: + if len(prj[1]) == 0: #projection on non-tuple dataset + if prj[0] == "first": + out_type.append(t1) + else: + out_type.append(t2) + else: #projection on tuple dataset + for key in prj[1]: + if prj[0] == "first": + out_type.append(t1[key]) + else: + out_type.append(t2[key]) + return tuple(out_type) + return dataset[_Fields.TYPES] + + +class Set(object): + def __init__(self, env, info, copy_set=False): + self._env = env + self._info = info + if not copy_set: + self._info[_Fields.ID] = env._counter + self._info[_Fields.BCVARS] = [] + self._info[_Fields.CHILDREN] = [] + self._info[_Fields.SINKS] = [] + self._info[_Fields.NAME] = None + env._counter += 1 + + def output(self, to_error=False): + """ + Writes a DataSet to the standard output stream (stdout). + """ + child = dict() + child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT + child[_Fields.PARENT] = self._info + child[_Fields.TO_ERR] = to_error + self._info[_Fields.SINKS].append(child) + self._env._sinks.append(child) + + def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE): + """ + Writes a DataSet as a text file to the specified location. + + :param path: he path pointing to the location the text file is written to. + :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten + """ + child = dict() + child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT + child[_Fields.PARENT] = self._info + child[_Fields.PATH] = path + child[_Fields.WRITE_MODE] = write_mode + self._info[_Fields.SINKS].append(child) + self._env._sinks.append(child) + + def write_csv(self, path, line_delimiter="\n", field_delimiter=',', write_mode=WriteMode.NO_OVERWRITE): + """ + Writes a Tuple DataSet as a CSV file to the specified location. + + Note: Only a Tuple DataSet can written as a CSV file. + :param path: The path pointing to the location the CSV file is written to. + :param write_mode: OutputFormat.WriteMode value, indicating whether files should be overwritten + """ + child = dict() + child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV + child[_Fields.PATH] = path + child[_Fields.PARENT] = self._info + child[_Fields.DELIMITER_FIELD] = field_delimiter + child[_Fields.DELIMITER_LINE] = line_delimiter + child[_Fields.WRITE_MODE] = write_mode + self._info[_Fields.SINKS].append(child) + self._env._sinks.append(child) + + def reduce_group(self, operator, types, combinable=False): + """ + Applies a GroupReduce transformation. + + The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a + non-grouped DataSet. + The GroupReduceFunction can iterate over all elements of the DataSet and + emit any number of output elements including none. + + :param operator: The GroupReduceFunction that is applied on the DataSet. + :param types: The type of the resulting DataSet. + :return:A GroupReduceOperator that represents the reduced DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = GroupReduceFunction() + operator.reduce = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = copy.deepcopy(operator) + child[_Fields.OPERATOR]._combine = False + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.TYPES] = types + child[_Fields.COMBINE] = combinable + child[_Fields.COMBINEOP] = operator + child[_Fields.COMBINEOP]._combine = True + child[_Fields.NAME] = "PythonGroupReduce" + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + +class ReduceSet(Set): + def __init__(self, env, info, copy_set=False): + super(ReduceSet, self).__init__(env, info, copy_set) + if not copy_set: + self._is_chained = False + + def reduce(self, operator): + """ + Applies a Reduce transformation on a non-grouped DataSet. + + The transformation consecutively calls a ReduceFunction until only a single element remains which is the result + of the transformation. A ReduceFunction combines two elements into one new element of the same type. + + :param operator:The ReduceFunction that is applied on the DataSet. + :return:A ReduceOperator that represents the reduced DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = ReduceFunction() + operator.reduce = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.REDUCE + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = operator + child[_Fields.COMBINEOP] = operator + child[_Fields.COMBINE] = False + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.NAME] = "PythonReduce" + child[_Fields.TYPES] = deduct_output_type(self._info) + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + +class DataSet(ReduceSet): + def __init__(self, env, info, copy_set=False): + super(DataSet, self).__init__(env, info, copy_set) + + def project(self, *fields): + """ + Applies a Project transformation on a Tuple DataSet. + + Note: Only Tuple DataSets can be projected. The transformation projects each Tuple of the DataSet onto a + (sub)set of fields. + + :param fields: The field indexes of the input tuples that are retained. + The order of fields in the output tuple corresponds to the order of field indexes. + :return: The projected DataSet. + + """ + child = dict() + child_set = DataSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.PROJECTION + child[_Fields.PARENT] = self._info + child[_Fields.KEYS] = fields + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def group_by(self, *keys): + """ + Groups a Tuple DataSet using field position keys. + Note: Field position keys only be specified for Tuple DataSets. + The field position keys specify the fields of Tuples on which the DataSet is grouped. + This method returns an UnsortedGrouping on which one of the following grouping transformation can be applied. + sort_group() to get a SortedGrouping. + reduce() to apply a Reduce transformation. + group_reduce() to apply a GroupReduce transformation. + + :param keys: One or more field positions on which the DataSet will be grouped. + :return:A Grouping on which a transformation needs to be applied to obtain a transformed DataSet. + """ + child = dict() + child_chain = [] + child_set = UnsortedGrouping(self._env, child, child_chain) + child[_Fields.IDENTIFIER] = _Identifier.GROUP + child[_Fields.PARENT] = self._info + child[_Fields.KEYS] = keys + child_chain.append(child) + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def co_group(self, other_set): + """ + Initiates a CoGroup transformation which combines the elements of two DataSets into on DataSet. + + It groups each DataSet individually on a key and gives groups of both DataSets with equal keys together into a + CoGroupFunction. If a DataSet has a group with no matching key in the other DataSet, + the CoGroupFunction is called with an empty group for the non-existing group. + The CoGroupFunction can iterate over the elements of both groups and return any number of elements + including none. + + :param other_set: The other DataSet of the CoGroup transformation. + :return:A CoGroupOperator to continue the definition of the CoGroup transformation. + """ + child = dict() + other_set._info[_Fields.CHILDREN].append(child) + child_set = CoGroupOperatorWhere(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.COGROUP + child[_Fields.PARENT] = self._info + child[_Fields.OTHER] = other_set._info + self._info[_Fields.CHILDREN].append(child) + return child_set + + def cross(self, other_set): + """ + Initiates a Cross transformation which combines the elements of two DataSets into one DataSet. + + It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product. + + :param other_set: The other DataSet with which this DataSet is crossed. + :return:A CrossOperator to continue the definition of the Cross transformation. + """ + return self._cross(other_set, _Identifier.CROSS) + + def cross_with_huge(self, other_set): + """ + Initiates a Cross transformation which combines the elements of two DataSets into one DataSet. + + It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product. + This method also gives the hint to the optimizer that + the second DataSet to cross is much larger than the first one. + + :param other_set: The other DataSet with which this DataSet is crossed. + :return:A CrossOperator to continue the definition of the Cross transformation. + """ + return self._cross(other_set, _Identifier.CROSSH) + + def cross_with_tiny(self, other_set): + """ + Initiates a Cross transformation which combines the elements of two DataSets into one DataSet. + + It builds all pair combinations of elements of both DataSets, i.e., it builds a Cartesian product. + This method also gives the hint to the optimizer that + the second DataSet to cross is much smaller than the first one. + + :param other_set: The other DataSet with which this DataSet is crossed. + :return:A CrossOperator to continue the definition of the Cross transformation. + """ + return self._cross(other_set, _Identifier.CROSST) + + def _cross(self, other_set, identifier): + child = dict() + child_set = CrossOperator(self._env, child) + child[_Fields.IDENTIFIER] = identifier + child[_Fields.PARENT] = self._info + child[_Fields.OTHER] = other_set._info + child[_Fields.PROJECTIONS] = [] + child[_Fields.OPERATOR] = None + child[_Fields.META] = None + self._info[_Fields.CHILDREN].append(child) + other_set._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def filter(self, operator): + """ + Applies a Filter transformation on a DataSet. + + he transformation calls a FilterFunction for each element of the DataSet and retains only those element + for which the function returns true. Elements for which the function returns false are filtered. + + :param operator: The FilterFunction that is called for each element of the DataSet. + :return:A FilterOperator that represents the filtered DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = FilterFunction() + operator.filter = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.FILTER + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = operator + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.NAME] = "PythonFilter" + child[_Fields.TYPES] = deduct_output_type(self._info) + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def flat_map(self, operator, types): + """ + Applies a FlatMap transformation on a DataSet. + + The transformation calls a FlatMapFunction for each element of the DataSet. + Each FlatMapFunction call can return any number of elements including none. + + :param operator: The FlatMapFunction that is called for each element of the DataSet. + :param types: The type of the resulting DataSet. + :return:A FlatMapOperator that represents the transformed DataSe + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = FlatMapFunction() + operator.flat_map = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.FLATMAP + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = operator + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.TYPES] = types + child[_Fields.NAME] = "PythonFlatMap" + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def join(self, other_set): + """ + Initiates a Join transformation. + + A Join transformation joins the elements of two DataSets on key equality. + + :param other_set: The other DataSet with which this DataSet is joined + :return:A JoinOperator to continue the definition of the Join transformation. + """ + return self._join(other_set, _Identifier.JOIN) + + def join_with_huge(self, other_set): + """ + Initiates a Join transformation. + + A Join transformation joins the elements of two DataSets on key equality. + This method also gives the hint to the optimizer that + the second DataSet to join is much larger than the first one. + + :param other_set: The other DataSet with which this DataSet is joined + :return:A JoinOperator to continue the definition of the Join transformation. + """ + return self._join(other_set, _Identifier.JOINH) + + def join_with_tiny(self, other_set): + """ + Initiates a Join transformation. + + A Join transformation joins the elements of two DataSets on key equality. + This method also gives the hint to the optimizer that + the second DataSet to join is much smaller than the first one. + + :param other_set: The other DataSet with which this DataSet is joined + :return:A JoinOperator to continue the definition of the Join transformation. + """ + return self._join(other_set, _Identifier.JOINT) + + def _join(self, other_set, identifier): + child = dict() + child_set = JoinOperatorWhere(self._env, child) + child[_Fields.IDENTIFIER] = identifier + child[_Fields.PARENT] = self._info + child[_Fields.OTHER] = other_set._info + child[_Fields.OPERATOR] = None + child[_Fields.META] = None + child[_Fields.PROJECTIONS] = [] + self._info[_Fields.CHILDREN].append(child) + other_set._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def map(self, operator, types): + """ + Applies a Map transformation on a DataSet. + + The transformation calls a MapFunction for each element of the DataSet. + Each MapFunction call returns exactly one element. + + :param operator: The MapFunction that is called for each element of the DataSet. + :param types: The type of the resulting DataSet + :return:A MapOperator that represents the transformed DataSet + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = MapFunction() + operator.map = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.MAP + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = operator + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.TYPES] = types + child[_Fields.NAME] = "PythonMap" + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def map_partition(self, operator, types): + """ + Applies a MapPartition transformation on a DataSet. + + The transformation calls a MapPartitionFunction once per parallel partition of the DataSet. + The entire partition is available through the given Iterator. + Each MapPartitionFunction may return an arbitrary number of results. + + The number of elements that each instance of the MapPartition function + sees is non deterministic and depends on the degree of parallelism of the operation. + + :param operator: The MapFunction that is called for each element of the DataSet. + :param types: The type of the resulting DataSet + :return:A MapOperator that represents the transformed DataSet + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = MapPartitionFunction() + operator.map_partition = f + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = operator + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.TYPES] = types + child[_Fields.NAME] = "PythonMapPartition" + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + def union(self, other_set): + """ + Creates a union of this DataSet with an other DataSet. + + The other DataSet must be of the same data type. + + :param other_set: The other DataSet which is unioned with the current DataSet. + :return:The resulting DataSet. + """ + child = dict() + child_set = DataSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.UNION + child[_Fields.PARENT] = self._info + child[_Fields.OTHER] = other_set._info + self._info[_Fields.CHILDREN].append(child) + other_set._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + return child_set + + +class OperatorSet(DataSet): + def __init__(self, env, info, copy_set=False): + super(OperatorSet, self).__init__(env, info, copy_set) + + def with_broadcast_set(self, name, set): + child = dict() + child[_Fields.PARENT] = self._info + child[_Fields.OTHER] = set._info + child[_Fields.NAME] = name + self._info[_Fields.BCVARS].append(child) + set._info[_Fields.CHILDREN].append(child) + self._env._broadcast.append(child) + return self + + +class Grouping(object): + def __init__(self, env, info, child_chain): + self._env = env + self._child_chain = child_chain + self._info = info + info[_Fields.ID] = env._counter + info[_Fields.CHILDREN] = [] + info[_Fields.SINKS] = [] + env._counter += 1 + + def reduce_group(self, operator, types, combinable=False): + """ + Applies a GroupReduce transformation. + + The transformation calls a GroupReduceFunction once for each group of the DataSet, or one when applied on a + non-grouped DataSet. + The GroupReduceFunction can iterate over all elements of the DataSet and + emit any number of output elements including none. + + :param operator: The GroupReduceFunction that is applied on the DataSet. + :param types: The type of the resulting DataSet. + :return:A GroupReduceOperator that represents the reduced DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = GroupReduceFunction() + operator.reduce = f + operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS]) + operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in self._child_chain[1:]]) + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = copy.deepcopy(operator) + child[_Fields.OPERATOR]._combine = False + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.TYPES] = types + child[_Fields.COMBINE] = combinable + child[_Fields.COMBINEOP] = operator + child[_Fields.COMBINEOP]._combine = True + child[_Fields.NAME] = "PythonGroupReduce" + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + + return child_set + + def sort_group(self, field, order): + """ + Sorts Tuple elements within a group on the specified field in the specified Order. + + Note: Only groups of Tuple elements can be sorted. + Groups can be sorted by multiple fields by chaining sort_group() calls. + + :param field:The Tuple field on which the group is sorted. + :param order: The Order in which the specified Tuple field is sorted. See DataSet.Order. + :return:A SortedGrouping with specified order of group element. + """ + child = dict() + child_set = SortedGrouping(self._env, child, self._child_chain) + child[_Fields.IDENTIFIER] = _Identifier.SORT + child[_Fields.PARENT] = self._info + child[_Fields.FIELD] = field + child[_Fields.ORDER] = order + self._info[_Fields.CHILDREN].append(child) + self._child_chain.append(child) + self._env._sets.append(child) + return child_set + + +class UnsortedGrouping(Grouping): + def __init__(self, env, info, child_chain): + super(UnsortedGrouping, self).__init__(env, info, child_chain) + + def reduce(self, operator): + """ + Applies a Reduce transformation on a non-grouped DataSet. + + The transformation consecutively calls a ReduceFunction until only a single element remains which is the result + of the transformation. A ReduceFunction combines two elements into one new element of the same type. + + :param operator:The ReduceFunction that is applied on the DataSet. + :return:A ReduceOperator that represents the reduced DataSet. + """ + operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS]) + for i in self._child_chain: + self._env._sets.append(i) + child = dict() + child_set = OperatorSet(self._env, child) + child[_Fields.IDENTIFIER] = _Identifier.REDUCE + child[_Fields.PARENT] = self._info + child[_Fields.OPERATOR] = copy.deepcopy(operator) + child[_Fields.OPERATOR]._combine = False + child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + child[_Fields.COMBINE] = True + child[_Fields.COMBINEOP] = operator + child[_Fields.COMBINEOP]._combine = True + child[_Fields.NAME] = "PythonReduce" + child[_Fields.TYPES] = deduct_output_type(self._info) + self._info[_Fields.CHILDREN].append(child) + self._env._sets.append(child) + + return child_set + + +class SortedGrouping(Grouping): + def __init__(self, env, info, child_chain): + super(SortedGrouping, self).__init__(env, info, child_chain) + + +class CoGroupOperatorWhere(object): + def __init__(self, env, info): + self._env = env + self._info = info + + def where(self, *fields): + """ + Continues a CoGroup transformation. + + Defines the Tuple fields of the first co-grouped DataSet that should be used as grouping keys. + Note: Fields can only be selected as grouping keys on Tuple DataSets. + + :param fields: The indexes of the Tuple fields of the first co-grouped DataSets that should be used as keys. + :return: An incomplete CoGroup transformation. + """ + self._info[_Fields.KEY1] = fields + return CoGroupOperatorTo(self._env, self._info) + + +class CoGroupOperatorTo(object): + def __init__(self, env, info): + self._env = env + self._info = info + + def equal_to(self, *fields): + """ + Continues a CoGroup transformation. + + Defines the Tuple fields of the second co-grouped DataSet that should be used as grouping keys. + Note: Fields can only be selected as grouping keys on Tuple DataSets. + + :param fields: The indexes of the Tuple fields of the second co-grouped DataSet that should be used as keys. + :return: An incomplete CoGroup transformation. + """ + self._info[_Fields.KEY2] = fields + return CoGroupOperatorUsing(self._env, self._info) + + +class CoGroupOperatorUsing(object): + def __init__(self, env, info): + self._env = env + self._info = info + + def using(self, operator, types): + """ + Finalizes a CoGroup transformation. + + Applies a CoGroupFunction to groups of elements with identical keys. + Each CoGroupFunction call returns an arbitrary number of keys. + + :param operator: The CoGroupFunction that is called for all groups of elements with identical keys. + :param types: The type of the resulting DataSet. + :return:An CoGroupOperator that represents the co-grouped result DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = CoGroupFunction() + operator.co_group = f + new_set = OperatorSet(self._env, self._info) + operator._keys1 = self._info[_Fields.KEY1] + operator._keys2 = self._info[_Fields.KEY2] + self._info[_Fields.OPERATOR] = operator + self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + self._info[_Fields.TYPES] = types + self._info[_Fields.NAME] = "PythonCoGroup" + self._env._sets.append(self._info) + return new_set + + +class JoinOperatorWhere(object): + def __init__(self, env, info): + self._env = env + self._info = info + + def where(self, *fields): + """ + Continues a Join transformation. + + Defines the Tuple fields of the first join DataSet that should be used as join keys. + Note: Fields can only be selected as join keys on Tuple DataSets. + + :param fields: The indexes of the Tuple fields of the first join DataSets that should be used as keys. + :return:An incomplete Join transformation. + + """ + self._info[_Fields.KEY1] = fields + return JoinOperatorTo(self._env, self._info) + + +class JoinOperatorTo(object): + def __init__(self, env, info): + self._env = env + self._info = info + + def equal_to(self, *fields): + """ + Continues a Join transformation. + + Defines the Tuple fields of the second join DataSet that should be used as join keys. + Note: Fields can only be selected as join keys on Tuple DataSets. + + :param fields:The indexes of the Tuple fields of the second join DataSet that should be used as keys. + :return:An incomplete Join Transformation. + """ + self._info[_Fields.KEY2] = fields + return JoinOperator(self._env, self._info) + + +class JoinOperatorProjection(DataSet): + def __init__(self, env, info): + super(JoinOperatorProjection, self).__init__(env, info) + + def project_first(self, *fields): + """ + Initiates a ProjectJoin transformation. + + Projects the first join input. + If the first join input is a Tuple DataSet, fields can be selected by their index. + If the first join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete JoinProjection. + """ + self._info[_Fields.PROJECTIONS].append(("first", fields)) + return self + + def project_second(self, *fields): + """ + Initiates a ProjectJoin transformation. + + Projects the second join input. + If the second join input is a Tuple DataSet, fields can be selected by their index. + If the second join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete JoinProjection. + """ + self._info[_Fields.PROJECTIONS].append(("second", fields)) + return self + + +class JoinOperator(DataSet): + def __init__(self, env, info): + super(JoinOperator, self).__init__(env, info) + self._info[_Fields.TYPES] = None + + def project_first(self, *fields): + """ + Initiates a ProjectJoin transformation. + + Projects the first join input. + If the first join input is a Tuple DataSet, fields can be selected by their index. + If the first join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete JoinProjection. + """ + return JoinOperatorProjection(self._env, self._info).project_first(*fields) + + def project_second(self, *fields): + """ + Initiates a ProjectJoin transformation. + + Projects the second join input. + If the second join input is a Tuple DataSet, fields can be selected by their index. + If the second join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete JoinProjection. + """ + return JoinOperatorProjection(self._env, self._info).project_second(*fields) + + def using(self, operator, types): + """ + Finalizes a Join transformation. + + Applies a JoinFunction to each pair of joined elements. Each JoinFunction call returns exactly one element. + + :param operator:The JoinFunction that is called for each pair of joined elements. + :param types: + :return:An Set that represents the joined result DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = JoinFunction() + operator.join = f + self._info[_Fields.OPERATOR] = operator + self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + self._info[_Fields.TYPES] = types + self._info[_Fields.NAME] = "PythonJoin" + self._env._sets.append(self._info) + return OperatorSet(self._env, self._info, copy_set=True) + + +class CrossOperatorProjection(DataSet): + def __init__(self, env, info): + super(CrossOperatorProjection, self).__init__(env, info) + + def project_first(self, *fields): + """ + Initiates a ProjectCross transformation. + + Projects the first join input. + If the first join input is a Tuple DataSet, fields can be selected by their index. + If the first join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete CrossProjection. + """ + self._info[_Fields.PROJECTIONS].append(("first", fields)) + return self + + def project_second(self, *fields): + """ + Initiates a ProjectCross transformation. + + Projects the second join input. + If the second join input is a Tuple DataSet, fields can be selected by their index. + If the second join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete CrossProjection. + """ + self._info[_Fields.PROJECTIONS].append(("second", fields)) + return self + + +class CrossOperator(DataSet): + def __init__(self, env, info): + super(CrossOperator, self).__init__(env, info) + info[_Fields.TYPES] = None + + def project_first(self, *fields): + """ + Initiates a ProjectCross transformation. + + Projects the first join input. + If the first join input is a Tuple DataSet, fields can be selected by their index. + If the first join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete CrossProjection. + """ + return CrossOperatorProjection(self._env, self._info).project_first(*fields) + + def project_second(self, *fields): + """ + Initiates a ProjectCross transformation. + + Projects the second join input. + If the second join input is a Tuple DataSet, fields can be selected by their index. + If the second join input is not a Tuple DataSet, no parameters should be passed. + + :param fields: The indexes of the selected fields. + :return: An incomplete CrossProjection. + """ + return CrossOperatorProjection(self._env, self._info).project_second(*fields) + + def using(self, operator, types): + """ + Finalizes a Cross transformation. + + Applies a CrossFunction to each pair of joined elements. Each CrossFunction call returns exactly one element. + + :param operator:The CrossFunction that is called for each pair of joined elements. + :param types: The type of the resulting DataSet. + :return:An Set that represents the joined result DataSet. + """ + if isinstance(operator, TYPES.FunctionType): + f = operator + operator = CrossFunction() + operator.cross = f + self._info[_Fields.OPERATOR] = operator + self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + str(operator.__class__.__name__) + self._info[_Fields.TYPES] = types + self._info[_Fields.NAME] = "PythonCross" + return OperatorSet(self._env, self._info, copy_set=True)
http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py new file mode 100644 index 0000000..61c077f --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py @@ -0,0 +1,339 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import inspect +from flink.connection import Connection +from flink.connection import Collector +from flink.plan.DataSet import DataSet +from flink.plan.Constants import _Fields, _Identifier +from flink.utilities import Switch +import dill +import copy + + +def get_environment(): + """ + Creates an execution environment that represents the context in which the program is currently executed. + + :return:The execution environment of the context in which the program is executed. + """ + return Environment() + + +def _dump(function): + return dill.dumps(function, protocol=0, byref=True) + + +class Environment(object): + def __init__(self): + # util + self._connection = Connection.OneWayBusyBufferingMappedFileConnection() + self._collector = Collector.TypedCollector(self._connection) + self._counter = 0 + + #parameters + self._parameters = [] + + #sets + self._sources = [] + self._sets = [] + self._sinks = [] + + #specials + self._broadcast = [] + + def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','): + """ + Create a DataSet that represents the tuples produced by reading the given CSV file. + + :param path: The path of the CSV file. + :param types: Specifies the types for the CSV fields. + :return:A CsvReader that can be used to configure the CSV input. + """ + child = dict() + child_set = DataSet(self, child) + child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV + child[_Fields.DELIMITER_LINE] = line_delimiter + child[_Fields.DELIMITER_FIELD] = field_delimiter + child[_Fields.PATH] = path + child[_Fields.TYPES] = types + self._sources.append(child) + return child_set + + def read_text(self, path): + """ + Creates a DataSet that represents the Strings produced by reading the given file line wise. + + The file will be read with the system's default character set. + + :param path: The path of the file, as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). + :return: A DataSet that represents the data read from the given file as text lines. + """ + child = dict() + child_set = DataSet(self, child) + child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT + child[_Fields.PATH] = path + self._sources.append(child) + return child_set + + def from_elements(self, *elements): + """ + Creates a new data set that contains the given elements. + + The elements must all be of the same type, for example, all of the String or Integer. + The sequence of elements must not be empty. + + :param elements: The elements to make up the data set. + :return: A DataSet representing the given list of elements. + """ + child = dict() + child_set = DataSet(self, child) + child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE + child[_Fields.VALUES] = elements + self._sources.append(child) + return child_set + + def set_degree_of_parallelism(self, degree): + """ + Sets the degree of parallelism (DOP) for operations executed through this environment. + + Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances. + + :param degreeOfParallelism: The degree of parallelism + """ + self._parameters.append(("dop", degree)) + + def execute(self, local=False, debug=False): + """ + Triggers the program execution. + + The environment will execute all parts of the program that have resulted in a "sink" operation. + """ + if debug: + local = True + self._parameters.append(("mode", local)) + self._parameters.append(("debug", debug)) + self._optimize_plan() + self._send_plan() + self._connection._write_buffer() + + def _optimize_plan(self): + self._find_chains() + + def _find_chains(self): + udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, _Identifier.MAPPARTITION, + _Identifier.GROUPREDUCE, _Identifier.REDUCE, _Identifier.COGROUP, + _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, + _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT]) + chainable = set([_Identifier.MAP, _Identifier.FILTER, _Identifier.FLATMAP, _Identifier.GROUPREDUCE, _Identifier.REDUCE]) + multi_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION]) + x = len(self._sets) - 1 + while x > -1: + child = self._sets[x] + child_type = child[_Fields.IDENTIFIER] + if child_type in chainable: + parent = child[_Fields.PARENT] + parent_type = parent[_Fields.IDENTIFIER] + if len(parent[_Fields.SINKS]) == 0: + if child_type == _Identifier.GROUPREDUCE or child_type == _Identifier.REDUCE: + if child[_Fields.COMBINE]: + while parent_type == _Identifier.GROUP or parent_type == _Identifier.SORT: + parent = parent[_Fields.PARENT] + parent_type = parent[_Fields.IDENTIFIER] + if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1: + if parent[_Fields.OPERATOR] is not None: + function = child[_Fields.COMBINEOP] + meta = str(inspect.getmodule(function)) + "|" + str(function.__class__.__name__) + parent[_Fields.OPERATOR]._chain(_dump(function), meta) + child[_Fields.COMBINE] = False + parent[_Fields.NAME] += " -> PythonCombine" + for bcvar in child[_Fields.BCVARS]: + bcvar_copy = copy.deepcopy(bcvar) + bcvar_copy[_Fields.PARENT] = parent + self._broadcast.append(bcvar_copy) + else: + if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1: + parent_op = parent[_Fields.OPERATOR] + if parent_op is not None: + function = child[_Fields.OPERATOR] + meta = str(inspect.getmodule(function)) + "|" + str(function.__class__.__name__) + parent_op._chain(_dump(function), meta) + parent[_Fields.NAME] += " -> " + child[_Fields.NAME] + parent[_Fields.TYPES] = child[_Fields.TYPES] + for grand_child in child[_Fields.CHILDREN]: + if grand_child[_Fields.IDENTIFIER] in multi_input: + if grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]: + grand_child[_Fields.PARENT] = parent + else: + grand_child[_Fields.OTHER] = parent + else: + grand_child[_Fields.PARENT] = parent + parent[_Fields.CHILDREN].append(grand_child) + parent[_Fields.CHILDREN].remove(child) + for sink in child[_Fields.SINKS]: + sink[_Fields.PARENT] = parent + parent[_Fields.SINKS].append(sink) + for bcvar in child[_Fields.BCVARS]: + bcvar[_Fields.PARENT] = parent + parent[_Fields.BCVARS].append(bcvar) + self._remove_set((child)) + x -= 1 + + def _remove_set(self, set): + self._sets[:] = [s for s in self._sets if s[_Fields.ID]!=set[_Fields.ID]] + + def _send_plan(self): + self._send_parameters() + self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast)) + self._send_sources() + self._send_operations() + self._send_sinks() + self._send_broadcast() + + def _send_parameters(self): + self._collector.collect(len(self._parameters)) + for parameter in self._parameters: + self._collector.collect(parameter) + + def _send_sources(self): + for source in self._sources: + identifier = source[_Fields.IDENTIFIER] + collect = self._collector.collect + collect(identifier) + collect(source[_Fields.ID]) + for case in Switch(identifier): + if case(_Identifier.SOURCE_CSV): + collect(source[_Fields.PATH]) + collect(source[_Fields.DELIMITER_FIELD]) + collect(source[_Fields.DELIMITER_LINE]) + collect(source[_Fields.TYPES]) + break + if case(_Identifier.SOURCE_TEXT): + collect(source[_Fields.PATH]) + break + if case(_Identifier.SOURCE_VALUE): + collect(len(source[_Fields.VALUES])) + for value in source[_Fields.VALUES]: + collect(value) + break + + def _send_operations(self): + collect = self._collector.collect + collectBytes = self._collector.collectBytes + for set in self._sets: + identifier = set.get(_Fields.IDENTIFIER) + collect(set[_Fields.IDENTIFIER]) + collect(set[_Fields.ID]) + collect(set[_Fields.PARENT][_Fields.ID]) + for case in Switch(identifier): + if case(_Identifier.SORT): + collect(set[_Fields.FIELD]) + collect(set[_Fields.ORDER]) + break + if case(_Identifier.GROUP): + collect(set[_Fields.KEYS]) + break + if case(_Identifier.COGROUP): + collect(set[_Fields.OTHER][_Fields.ID]) + collect(set[_Fields.KEY1]) + collect(set[_Fields.KEY2]) + collectBytes(_dump(set[_Fields.OPERATOR])) + collect(set[_Fields.META]) + collect(set[_Fields.TYPES]) + collect(set[_Fields.NAME]) + break + if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST): + collect(set[_Fields.OTHER][_Fields.ID]) + if set[_Fields.OPERATOR] is None: + collect(set[_Fields.OPERATOR]) + else: + collectBytes(_dump(set[_Fields.OPERATOR])) + collect(set[_Fields.META]) + collect(set[_Fields.TYPES]) + collect(len(set[_Fields.PROJECTIONS])) + for p in set[_Fields.PROJECTIONS]: + collect(p[0]) + collect(p[1]) + collect(set[_Fields.NAME]) + break + if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE): + collectBytes(_dump(set[_Fields.OPERATOR])) + collectBytes(_dump(set[_Fields.COMBINEOP])) + collect(set[_Fields.META]) + collect(set[_Fields.TYPES]) + collect(set[_Fields.COMBINE]) + collect(set[_Fields.NAME]) + break + if case(_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT): + collect(set[_Fields.KEY1]) + collect(set[_Fields.KEY2]) + collect(set[_Fields.OTHER][_Fields.ID]) + if set[_Fields.OPERATOR] is None: + collect(set[_Fields.OPERATOR]) + else: + collectBytes(_dump(set[_Fields.OPERATOR])) + collect(set[_Fields.META]) + collect(set[_Fields.TYPES]) + collect(len(set[_Fields.PROJECTIONS])) + for p in set[_Fields.PROJECTIONS]: + collect(p[0]) + collect(p[1]) + collect(set[_Fields.NAME]) + break + if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER): + collectBytes(_dump(set[_Fields.OPERATOR])) + collect(set[_Fields.META]) + collect(set[_Fields.TYPES]) + collect(set[_Fields.NAME]) + break + if case(_Identifier.UNION): + collect(set[_Fields.OTHER][_Fields.ID]) + break + if case(_Identifier.PROJECTION): + collect(set[_Fields.KEYS]) + break + if case(): + raise KeyError("Environment._send_child_sets(): Invalid operation identifier: " + str(identifier)) + + def _send_sinks(self): + for sink in self._sinks: + identifier = sink[_Fields.IDENTIFIER] + collect = self._collector.collect + collect(identifier) + collect(sink[_Fields.PARENT][_Fields.ID]) + for case in Switch(identifier): + if case(_Identifier.SINK_CSV): + collect(sink[_Fields.PATH]) + collect(sink[_Fields.DELIMITER_FIELD]) + collect(sink[_Fields.DELIMITER_LINE]) + collect(sink[_Fields.WRITE_MODE]) + break; + if case(_Identifier.SINK_TEXT): + collect(sink[_Fields.PATH]) + collect(sink[_Fields.WRITE_MODE]) + break + if case(_Identifier.SINK_PRINT): + collect(sink[_Fields.TO_ERR]) + break + + def _send_broadcast(self): + collect = self._collector.collect + for entry in self._broadcast: + collect(_Identifier.BROADCAST) + collect(entry[_Fields.PARENT][_Fields.ID]) + collect(entry[_Fields.OTHER][_Fields.ID]) + collect(entry[_Fields.NAME]) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py new file mode 100644 index 0000000..d35bf39 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py @@ -0,0 +1,17 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py new file mode 100644 index 0000000..faae78a --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py @@ -0,0 +1,36 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +class Switch(object): + def __init__(self, value): + self.value = value + self.fall = False + + def __iter__(self): + yield self.match + raise StopIteration + + def match(self, *args): + if self.fall or not args: + return True + elif self.value in args: + self.fall = True + return True + else: + return False http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py new file mode 100644 index 0000000..82447e9 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py @@ -0,0 +1,33 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from distutils.core import setup + +setup( + name='flink', + version='1.0', + packages=['flink', + 'flink.connection', + 'flink.functions', + 'flink.plan', + 'flink.utilities'], + url='http://flink.apache.org', + license='Licensed under the Apache License, Version 2.0', + author='', + author_email='', + description='Flink Python API' +) http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java b/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java new file mode 100644 index 0000000..4295e93 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.languagebinding.api.java.python; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_2; +import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_3; +import org.junit.Test; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PythonPlanBinderTest { + private static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class); + + private static boolean python2Supported = true; + private static boolean python3Supported = true; + private static List<String> TEST_FILES; + + @BeforeClass + public static void setup() throws Exception { + findTestFiles(); + checkPythonSupport(); + } + + private static void findTestFiles() throws Exception { + TEST_FILES = new ArrayList(); + FileSystem fs = FileSystem.getLocalFileSystem(); + FileStatus[] status = fs.listStatus( + new Path(fs.getWorkingDirectory().toString() + + "/src/test/python/org/apache/flink/languagebinding/api/python/flink/test")); + for (FileStatus f : status) { + String file = f.getPath().toString(); + if (file.endsWith(".py")) { + TEST_FILES.add(file); + } + } + } + + private static void checkPythonSupport() { + try { + Runtime.getRuntime().exec("python"); + } catch (IOException ex) { + python2Supported = false; + LOG.info("No Python 2 runtime detected."); + } + try { + Runtime.getRuntime().exec("python3"); + } catch (IOException ex) { + python3Supported = false; + LOG.info("No Python 3 runtime detected."); + } + } + + @Test + public void testPython2() throws Exception { + if (python2Supported) { + for (String file : TEST_FILES) { + LOG.info("testing " + file); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file}); + } + } + } + + @Test + public void testPython3() throws Exception { + if (python3Supported) { + for (String file : TEST_FILES) { + LOG.info("testing " + file); + PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file}); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv new file mode 100644 index 0000000..a103a5c --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv @@ -0,0 +1,2 @@ +4,2,hello +3,2,world http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text new file mode 100644 index 0000000..e7be084 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text @@ -0,0 +1,2 @@ +sup guys +i am the world http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py new file mode 100644 index 0000000..c48179b --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.plan.Constants import INT, STRING + +if __name__ == "__main__": + env = get_environment() + + d1 = env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv", (INT, INT, STRING)) + + d1.write_csv("/tmp/flink/result") + + env.set_degree_of_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py new file mode 100644 index 0000000..29113f3 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py @@ -0,0 +1,264 @@ +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.FlatMapFunction import FlatMapFunction +from flink.functions.FilterFunction import FilterFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.functions.ReduceFunction import ReduceFunction +from flink.functions.CrossFunction import CrossFunction +from flink.functions.JoinFunction import JoinFunction +from flink.functions.GroupReduceFunction import GroupReduceFunction +from flink.functions.CoGroupFunction import CoGroupFunction +from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order + + +class Mapper(MapFunction): + def map(self, value): + return value * value + + +class Filter(FilterFunction): + def __init__(self, limit): + super(Filter, self).__init__() + self.limit = limit + + def filter(self, value): + return value > self.limit + + +class FlatMap(FlatMapFunction): + def flat_map(self, value, collector): + collector.collect(value) + collector.collect(value * 2) + + +class MapPartition(MapPartitionFunction): + def map_partition(self, iterator, collector): + for value in iterator: + collector.collect(value * 2) + + +class Reduce(ReduceFunction): + def reduce(self, value1, value2): + return value1 + value2 + + +class Reduce2(ReduceFunction): + def reduce(self, value1, value2): + return (value1[0] + value2[0], value1[1] + value2[1], value1[2], value1[3] or value2[3]) + + +class Cross(CrossFunction): + def cross(self, value1, value2): + return (value1, value2[3]) + + +class MapperBcv(MapFunction): + def map(self, value): + factor = self.context.get_broadcast_variable("test")[0][0] + return value * factor + + +class Join(JoinFunction): + def join(self, value1, value2): + if value1[3]: + return value2[0] + str(value1[0]) + else: + return value2[0] + str(value1[1]) + + +class GroupReduce(GroupReduceFunction): + def reduce(self, iterator, collector): + if iterator.has_next(): + i, f, s, b = iterator.next() + for value in iterator: + i += value[0] + f += value[1] + b |= value[3] + collector.collect((i, f, s, b)) + + +class GroupReduce2(GroupReduceFunction): + def reduce(self, iterator, collector): + for value in iterator: + collector.collect(value) + + +class GroupReduce3(GroupReduceFunction): + def reduce(self, iterator, collector): + collector.collect(iterator.next()) + + def combine(self, iterator, collector): + if iterator.has_next(): + v1 = iterator.next() + if iterator.has_next(): + v2 = iterator.next() + if v1[0] < v2[0]: + collector.collect(v1) + else: + collector.collect(v2) + + +class CoGroup(CoGroupFunction): + def co_group(self, iterator1, iterator2, collector): + while iterator1.has_next() and iterator2.has_next(): + collector.collect((iterator1.next(), iterator2.next())) + + +class Id(MapFunction): + def map(self, value): + return value + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + index = 0 + for value in iterator: + if value != self.expected[index]: + print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + raise Exception(self.name + " failed!") + index += 1 + collector.collect(self.name + " successful!") + + +class Verify2(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify2, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + for value in iterator: + if value in self.expected: + try: + self.expected.remove(value) + except Exception: + raise Exception(self.name + " failed!") + collector.collect(self.name + " successful!") + + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(1, 6, 12) + + d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d3 = env.from_elements(("hello",), ("world",)) + + d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)) + + d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 4.1, 3)) + + d1 \ + .map((lambda x: x * x), INT).map(Mapper(), INT) \ + .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output() + + d1 \ + .map(Mapper(), INT).map((lambda x: x * x), INT) \ + .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), STRING).output() + + d1 \ + .filter(Filter(5)).filter(Filter(8)) \ + .map_partition(Verify([12], "Filter"), STRING).output() + + d1 \ + .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \ + .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], "FlatMap"), STRING).output() + + d1 \ + .map_partition(MapPartition(), INT) \ + .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output() + + d1 \ + .reduce(Reduce()) \ + .map_partition(Verify([19], "AllReduce"), STRING).output() + + d4 \ + .group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineReduce"), STRING).output() + + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedReduce"), STRING).output() + + d1 \ + .map(MapperBcv(), INT).with_broadcast_set("test", d2) \ + .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output() + + d1 \ + .cross(d2).using(Cross(), (INT, BOOL)) \ + .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), (12, True), (12, False)], "Cross"), STRING).output() + + d1 \ + .cross(d3) \ + .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, ("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default Cross"), STRING).output() + + d2 \ + .cross(d3).project_second(0).project_first(0, 1) \ + .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0).using(Join(), STRING) \ + .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \ + .map_partition(Verify([(1, True, "hello"), (2, False, "world")], "Project Join"), STRING).output() + + d2 \ + .join(d3).where(2).equal_to(0) \ + .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 0.4, "world", False), ("world",))], "Default Join"), STRING).output() + + d2 \ + .project(0, 1).project(2) \ + .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], "Project"), STRING).output() + + d2 \ + .union(d4) \ + .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() + + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=False) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "AllGroupReduce"), STRING).output() + + d4 \ + .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "ChainedGroupReduce"), STRING).output() + + d4 \ + .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), combinable=True) \ + .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", False)], "CombineGroupReduce"), STRING).output() + + d5 \ + .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), combinable=True) \ + .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() + + d4 \ + .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, STRING, BOOL), (FLOAT, FLOAT, INT))) \ + .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output() + + env.set_degree_of_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py new file mode 100644 index 0000000..83b78fa --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py @@ -0,0 +1,30 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.plan.Constants import WriteMode + +if __name__ == "__main__": + env = get_environment() + + d1 = env.read_text("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text") + + d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE) + + env.set_degree_of_parallelism(1) + + env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py new file mode 100644 index 0000000..a3b8d07 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py @@ -0,0 +1,63 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.plan.Constants import _Fields +from flink.plan.Constants import INT, STRING, BOOL, FLOAT +import sys + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(("hello", 4, 3.2, True)) + + d2 = env.from_elements("world") + + direct_from_source = d1.filter(lambda x:True) + + if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True): + sys.exit("Error deducting type directly from source.") + + from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True) + + if from_common_udf._info[_Fields.TYPES] != BOOL: + sys.exit("Error deducting type from common udf.") + + through_projection = d1.project(3, 2).filter(lambda x:True) + + if through_projection._info[_Fields.TYPES] != (True, 3.2): + sys.exit("Error deducting type through projection.") + + through_default_op = d1.cross(d2).filter(lambda x:True) + + if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), "world"): + sys.exit("Error deducting type through default J/C." +str(through_default_op._info[_Fields.TYPES])) + + through_prj_op = d1.cross(d2).project_first(1, 0).project_second().project_first(3, 2).filter(lambda x:True) + + if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2): + sys.exit("Error deducting type through projection J/C. "+str(through_prj_op._info[_Fields.TYPES])) + + + env = get_environment() + + msg = env.from_elements("Type deduction test successful.") + + msg.output() + + env.execute() + http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py new file mode 100644 index 0000000..f5f3ee4 --- /dev/null +++ b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py @@ -0,0 +1,70 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.MapPartitionFunction import MapPartitionFunction +from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES + + +class Verify(MapPartitionFunction): + def __init__(self, expected, name): + super(Verify, self).__init__() + self.expected = expected + self.name = name + + def map_partition(self, iterator, collector): + index = 0 + for value in iterator: + if value != self.expected[index]: + print(self.name + " Test failed. Expected: " + str(self.expected[index]) + " Actual: " + str(value)) + raise Exception(self.name + " failed!") + index += 1 + collector.collect(self.name + " successful!") + + +class Id(MapFunction): + def map(self, value): + return value + + +if __name__ == "__main__": + env = get_environment() + + d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world")) + + d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), bytearray(b"world")], "Byte"), STRING).output() + + d2 = env.from_elements(1,2,3,4,5) + + d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), STRING).output() + + d3 = env.from_elements(True, True, False) + + d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), STRING).output() + + d4 = env.from_elements(1.4, 1.7, 12312.23) + + d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), STRING).output() + + d5 = env.from_elements("hello", "world") + + d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), STRING).output() + + env.set_degree_of_parallelism(1) + + env.execute(local=True) \ No newline at end of file