http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
new file mode 100644
index 0000000..a906fb2
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/DataSet.py
@@ -0,0 +1,907 @@
+# 
###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import inspect
+import copy
+import types as TYPES
+
+from flink.plan.Constants import _Fields, _Identifier, WriteMode, STRING
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+
+
+def deduct_output_type(dataset):
+    skip = set([_Identifier.GROUP, _Identifier.SORT, _Identifier.UNION])
+    source = set([_Identifier.SOURCE_CSV, _Identifier.SOURCE_TEXT, 
_Identifier.SOURCE_VALUE])
+    default = set([_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, 
_Identifier.JOINT, _Identifier.JOINH, _Identifier.JOIN])
+
+    while True:
+        dataset_type = dataset[_Fields.IDENTIFIER]
+        if dataset_type in skip:
+            dataset = dataset[_Fields.PARENT]
+            continue
+        if dataset_type in source:
+            if dataset_type == _Identifier.SOURCE_TEXT:
+                return STRING
+            if dataset_type == _Identifier.SOURCE_VALUE:
+                return dataset[_Fields.VALUES][0]
+            if dataset_type == _Identifier.SOURCE_CSV:
+                return dataset[_Fields.TYPES]
+        if dataset_type == _Identifier.PROJECTION:
+            return tuple([deduct_output_type(dataset[_Fields.PARENT])[k] for k 
in dataset[_Fields.KEYS]])
+        if dataset_type in default:
+            if dataset[_Fields.OPERATOR] is not None: #udf-join/cross
+                return dataset[_Fields.TYPES]
+            if len(dataset[_Fields.PROJECTIONS]) == 0: #defaultjoin/-cross
+                return (deduct_output_type(dataset[_Fields.PARENT]), 
deduct_output_type(dataset[_Fields.OTHER]))
+            else: #projectjoin/-cross
+                t1 = deduct_output_type(dataset[_Fields.PARENT])
+                t2 = deduct_output_type(dataset[_Fields.OTHER])
+                out_type = []
+                for prj in dataset[_Fields.PROJECTIONS]:
+                    if len(prj[1]) == 0: #projection on non-tuple dataset
+                        if prj[0] == "first":
+                            out_type.append(t1)
+                        else:
+                            out_type.append(t2)
+                    else: #projection on tuple dataset
+                        for key in prj[1]:
+                            if prj[0] == "first":
+                                out_type.append(t1[key])
+                            else:
+                                out_type.append(t2[key])
+                return tuple(out_type)
+        return dataset[_Fields.TYPES]
+
+
+class Set(object):
+    def __init__(self, env, info, copy_set=False):
+        self._env = env
+        self._info = info
+        if not copy_set:
+            self._info[_Fields.ID] = env._counter
+            self._info[_Fields.BCVARS] = []
+            self._info[_Fields.CHILDREN] = []
+            self._info[_Fields.SINKS] = []
+            self._info[_Fields.NAME] = None
+            env._counter += 1
+
+    def output(self, to_error=False):
+        """
+        Writes a DataSet to the standard output stream (stdout).
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_PRINT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.TO_ERR] = to_error
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def write_text(self, path, write_mode=WriteMode.NO_OVERWRITE):
+        """
+        Writes a DataSet as a text file to the specified location.
+
+        :param path: he path pointing to the location the text file is written 
to.
+        :param write_mode: OutputFormat.WriteMode value, indicating whether 
files should be overwritten
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_TEXT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.PATH] = path
+        child[_Fields.WRITE_MODE] = write_mode
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def write_csv(self, path, line_delimiter="\n", field_delimiter=',', 
write_mode=WriteMode.NO_OVERWRITE):
+        """
+        Writes a Tuple DataSet as a CSV file to the specified location.
+
+        Note: Only a Tuple DataSet can written as a CSV file.
+        :param path: The path pointing to the location the CSV file is written 
to.
+        :param write_mode: OutputFormat.WriteMode value, indicating whether 
files should be overwritten
+        """
+        child = dict()
+        child[_Fields.IDENTIFIER] = _Identifier.SINK_CSV
+        child[_Fields.PATH] = path
+        child[_Fields.PARENT] = self._info
+        child[_Fields.DELIMITER_FIELD] = field_delimiter
+        child[_Fields.DELIMITER_LINE] = line_delimiter
+        child[_Fields.WRITE_MODE] = write_mode
+        self._info[_Fields.SINKS].append(child)
+        self._env._sinks.append(child)
+
+    def reduce_group(self, operator, types, combinable=False):
+        """
+        Applies a GroupReduce transformation.
+
+        The transformation calls a GroupReduceFunction once for each group of 
the DataSet, or one when applied on a
+        non-grouped DataSet.
+        The GroupReduceFunction can iterate over all elements of the DataSet 
and
+        emit any number of output elements including none.
+
+        :param operator: The GroupReduceFunction that is applied on the 
DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A GroupReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = GroupReduceFunction()
+            operator.reduce = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.COMBINE] = combinable
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonGroupReduce"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class ReduceSet(Set):
+    def __init__(self, env, info, copy_set=False):
+        super(ReduceSet, self).__init__(env, info, copy_set)
+        if not copy_set:
+            self._is_chained = False
+
+    def reduce(self, operator):
+        """
+        Applies a Reduce transformation on a non-grouped DataSet.
+
+        The transformation consecutively calls a ReduceFunction until only a 
single element remains which is the result
+        of the transformation. A ReduceFunction combines two elements into one 
new element of the same type.
+
+        :param operator:The ReduceFunction that is applied on the DataSet.
+        :return:A ReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = ReduceFunction()
+            operator.reduce = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINE] = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.NAME] = "PythonReduce"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class DataSet(ReduceSet):
+    def __init__(self, env, info, copy_set=False):
+        super(DataSet, self).__init__(env, info, copy_set)
+
+    def project(self, *fields):
+        """
+        Applies a Project transformation on a Tuple DataSet.
+
+        Note: Only Tuple DataSets can be projected. The transformation 
projects each Tuple of the DataSet onto a
+        (sub)set of fields.
+
+        :param fields: The field indexes of the input tuples that are retained.
+                        The order of fields in the output tuple corresponds to 
the order of field indexes.
+        :return: The projected DataSet.
+
+        """
+        child = dict()
+        child_set = DataSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.PROJECTION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.KEYS] = fields
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def group_by(self, *keys):
+        """
+        Groups a Tuple DataSet using field position keys.
+        Note: Field position keys only be specified for Tuple DataSets.
+        The field position keys specify the fields of Tuples on which the 
DataSet is grouped.
+        This method returns an UnsortedGrouping on which one of the following 
grouping transformation can be applied.
+        sort_group() to get a SortedGrouping.
+        reduce() to apply a Reduce transformation.
+        group_reduce() to apply a GroupReduce transformation.
+
+        :param keys: One or more field positions on which the DataSet will be 
grouped.
+        :return:A Grouping on which a transformation needs to be applied to 
obtain a transformed DataSet.
+        """
+        child = dict()
+        child_chain = []
+        child_set = UnsortedGrouping(self._env, child, child_chain)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.KEYS] = keys
+        child_chain.append(child)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def co_group(self, other_set):
+        """
+        Initiates a CoGroup transformation which combines the elements of two 
DataSets into on DataSet.
+
+        It groups each DataSet individually on a key and gives groups of both 
DataSets with equal keys together into a
+        CoGroupFunction. If a DataSet has a group with no matching key in the 
other DataSet,
+        the CoGroupFunction is called with an empty group for the non-existing 
group.
+        The CoGroupFunction can iterate over the elements of both groups and 
return any number of elements
+        including none.
+
+        :param other_set: The other DataSet of the CoGroup transformation.
+        :return:A CoGroupOperator to continue the definition of the CoGroup 
transformation.
+        """
+        child = dict()
+        other_set._info[_Fields.CHILDREN].append(child)
+        child_set = CoGroupOperatorWhere(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.COGROUP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        self._info[_Fields.CHILDREN].append(child)
+        return child_set
+
+    def cross(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two 
DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it 
builds a Cartesian product.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross 
transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSS)
+
+    def cross_with_huge(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two 
DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it 
builds a Cartesian product.
+        This method also gives the hint to the optimizer that
+        the second DataSet to cross is much larger than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross 
transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSSH)
+
+    def cross_with_tiny(self, other_set):
+        """
+        Initiates a Cross transformation which combines the elements of two 
DataSets into one DataSet.
+
+        It builds all pair combinations of elements of both DataSets, i.e., it 
builds a Cartesian product.
+        This method also gives the hint to the optimizer that
+        the second DataSet to cross is much smaller than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is crossed.
+        :return:A CrossOperator to continue the definition of the Cross 
transformation.
+        """
+        return self._cross(other_set, _Identifier.CROSST)
+
+    def _cross(self, other_set, identifier):
+        child = dict()
+        child_set = CrossOperator(self._env, child)
+        child[_Fields.IDENTIFIER] = identifier
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        child[_Fields.PROJECTIONS] = []
+        child[_Fields.OPERATOR] = None
+        child[_Fields.META] = None
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def filter(self, operator):
+        """
+        Applies a Filter transformation on a DataSet.
+
+        he transformation calls a FilterFunction for each element of the 
DataSet and retains only those element
+        for which the function returns true. Elements for which the function 
returns false are filtered.
+
+        :param operator: The FilterFunction that is called for each element of 
the DataSet.
+        :return:A FilterOperator that represents the filtered DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = FilterFunction()
+            operator.filter = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.FILTER
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.NAME] = "PythonFilter"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def flat_map(self, operator, types):
+        """
+        Applies a FlatMap transformation on a DataSet.
+
+        The transformation calls a FlatMapFunction for each element of the 
DataSet.
+        Each FlatMapFunction call can return any number of elements including 
none.
+
+        :param operator: The FlatMapFunction that is called for each element 
of the DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A FlatMapOperator that represents the transformed DataSe
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = FlatMapFunction()
+            operator.flat_map = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.FLATMAP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonFlatMap"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def join(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key 
equality.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join 
transformation.
+        """
+        return self._join(other_set, _Identifier.JOIN)
+
+    def join_with_huge(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key 
equality.
+        This method also gives the hint to the optimizer that
+        the second DataSet to join is much larger than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join 
transformation.
+        """
+        return self._join(other_set, _Identifier.JOINH)
+
+    def join_with_tiny(self, other_set):
+        """
+        Initiates a Join transformation.
+
+        A Join transformation joins the elements of two DataSets on key 
equality.
+        This method also gives the hint to the optimizer that
+        the second DataSet to join is much smaller than the first one.
+
+        :param other_set: The other DataSet with which this DataSet is joined
+        :return:A JoinOperator to continue the definition of the Join 
transformation.
+        """
+        return self._join(other_set, _Identifier.JOINT)
+
+    def _join(self, other_set, identifier):
+        child = dict()
+        child_set = JoinOperatorWhere(self._env, child)
+        child[_Fields.IDENTIFIER] = identifier
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        child[_Fields.OPERATOR] = None
+        child[_Fields.META] = None
+        child[_Fields.PROJECTIONS] = []
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def map(self, operator, types):
+        """
+        Applies a Map transformation on a DataSet.
+
+        The transformation calls a MapFunction for each element of the DataSet.
+        Each MapFunction call returns exactly one element.
+
+        :param operator: The MapFunction that is called for each element of 
the DataSet.
+        :param types: The type of the resulting DataSet
+        :return:A MapOperator that represents the transformed DataSet
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = MapFunction()
+            operator.map = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.MAP
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonMap"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def map_partition(self, operator, types):
+        """
+        Applies a MapPartition transformation on a DataSet.
+
+        The transformation calls a MapPartitionFunction once per parallel 
partition of the DataSet.
+        The entire partition is available through the given Iterator.
+        Each MapPartitionFunction may return an arbitrary number of results.
+
+        The number of elements that each instance of the MapPartition function
+        sees is non deterministic and depends on the degree of parallelism of 
the operation.
+
+        :param operator: The MapFunction that is called for each element of 
the DataSet.
+        :param types: The type of the resulting DataSet
+        :return:A MapOperator that represents the transformed DataSet
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = MapPartitionFunction()
+            operator.map_partition = f
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.MAPPARTITION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = operator
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.NAME] = "PythonMapPartition"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+    def union(self, other_set):
+        """
+        Creates a union of this DataSet with an other DataSet.
+
+        The other DataSet must be of the same data type.
+
+        :param other_set: The other DataSet which is unioned with the current 
DataSet.
+        :return:The resulting DataSet.
+        """
+        child = dict()
+        child_set = DataSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.UNION
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = other_set._info
+        self._info[_Fields.CHILDREN].append(child)
+        other_set._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class OperatorSet(DataSet):
+    def __init__(self, env, info, copy_set=False):
+        super(OperatorSet, self).__init__(env, info, copy_set)
+
+    def with_broadcast_set(self, name, set):
+        child = dict()
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OTHER] = set._info
+        child[_Fields.NAME] = name
+        self._info[_Fields.BCVARS].append(child)
+        set._info[_Fields.CHILDREN].append(child)
+        self._env._broadcast.append(child)
+        return self
+
+
+class Grouping(object):
+    def __init__(self, env, info, child_chain):
+        self._env = env
+        self._child_chain = child_chain
+        self._info = info
+        info[_Fields.ID] = env._counter
+        info[_Fields.CHILDREN] = []
+        info[_Fields.SINKS] = []
+        env._counter += 1
+
+    def reduce_group(self, operator, types, combinable=False):
+        """
+        Applies a GroupReduce transformation.
+
+        The transformation calls a GroupReduceFunction once for each group of 
the DataSet, or one when applied on a
+        non-grouped DataSet.
+        The GroupReduceFunction can iterate over all elements of the DataSet 
and
+        emit any number of output elements including none.
+
+        :param operator: The GroupReduceFunction that is applied on the 
DataSet.
+        :param types: The type of the resulting DataSet.
+        :return:A GroupReduceOperator that represents the reduced DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = GroupReduceFunction()
+            operator.reduce = f
+        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+        operator._set_sort_ops([(x[_Fields.FIELD], x[_Fields.ORDER]) for x in 
self._child_chain[1:]])
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.GROUPREDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.TYPES] = types
+        child[_Fields.COMBINE] = combinable
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonGroupReduce"
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+
+        return child_set
+
+    def sort_group(self, field, order):
+        """
+        Sorts Tuple elements within a group on the specified field in the 
specified Order.
+
+        Note: Only groups of Tuple elements can be sorted.
+        Groups can be sorted by multiple fields by chaining sort_group() calls.
+
+        :param field:The Tuple field on which the group is sorted.
+        :param order: The Order in which the specified Tuple field is sorted. 
See DataSet.Order.
+        :return:A SortedGrouping with specified order of group element.
+        """
+        child = dict()
+        child_set = SortedGrouping(self._env, child, self._child_chain)
+        child[_Fields.IDENTIFIER] = _Identifier.SORT
+        child[_Fields.PARENT] = self._info
+        child[_Fields.FIELD] = field
+        child[_Fields.ORDER] = order
+        self._info[_Fields.CHILDREN].append(child)
+        self._child_chain.append(child)
+        self._env._sets.append(child)
+        return child_set
+
+
+class UnsortedGrouping(Grouping):
+    def __init__(self, env, info, child_chain):
+        super(UnsortedGrouping, self).__init__(env, info, child_chain)
+
+    def reduce(self, operator):
+        """
+        Applies a Reduce transformation on a non-grouped DataSet.
+
+        The transformation consecutively calls a ReduceFunction until only a 
single element remains which is the result
+        of the transformation. A ReduceFunction combines two elements into one 
new element of the same type.
+
+        :param operator:The ReduceFunction that is applied on the DataSet.
+        :return:A ReduceOperator that represents the reduced DataSet.
+        """
+        operator._set_grouping_keys(self._child_chain[0][_Fields.KEYS])
+        for i in self._child_chain:
+            self._env._sets.append(i)
+        child = dict()
+        child_set = OperatorSet(self._env, child)
+        child[_Fields.IDENTIFIER] = _Identifier.REDUCE
+        child[_Fields.PARENT] = self._info
+        child[_Fields.OPERATOR] = copy.deepcopy(operator)
+        child[_Fields.OPERATOR]._combine = False
+        child[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        child[_Fields.COMBINE] = True
+        child[_Fields.COMBINEOP] = operator
+        child[_Fields.COMBINEOP]._combine = True
+        child[_Fields.NAME] = "PythonReduce"
+        child[_Fields.TYPES] = deduct_output_type(self._info)
+        self._info[_Fields.CHILDREN].append(child)
+        self._env._sets.append(child)
+
+        return child_set
+
+
+class SortedGrouping(Grouping):
+    def __init__(self, env, info, child_chain):
+        super(SortedGrouping, self).__init__(env, info, child_chain)
+
+
+class CoGroupOperatorWhere(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def where(self, *fields):
+        """
+        Continues a CoGroup transformation.
+
+        Defines the Tuple fields of the first co-grouped DataSet that should 
be used as grouping keys.
+        Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the first co-grouped 
DataSets that should be used as keys.
+        :return: An incomplete CoGroup transformation.
+        """
+        self._info[_Fields.KEY1] = fields
+        return CoGroupOperatorTo(self._env, self._info)
+
+
+class CoGroupOperatorTo(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def equal_to(self, *fields):
+        """
+        Continues a CoGroup transformation.
+
+        Defines the Tuple fields of the second co-grouped DataSet that should 
be used as grouping keys.
+        Note: Fields can only be selected as grouping keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the second 
co-grouped DataSet that should be used as keys.
+        :return: An incomplete CoGroup transformation.
+        """
+        self._info[_Fields.KEY2] = fields
+        return CoGroupOperatorUsing(self._env, self._info)
+
+
+class CoGroupOperatorUsing(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def using(self, operator, types):
+        """
+        Finalizes a CoGroup transformation.
+
+        Applies a CoGroupFunction to groups of elements with identical keys.
+        Each CoGroupFunction call returns an arbitrary number of keys.
+
+        :param operator: The CoGroupFunction that is called for all groups of 
elements with identical keys.
+        :param types: The type of the resulting DataSet.
+        :return:An CoGroupOperator that represents the co-grouped result 
DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = CoGroupFunction()
+            operator.co_group = f
+        new_set = OperatorSet(self._env, self._info)
+        operator._keys1 = self._info[_Fields.KEY1]
+        operator._keys2 = self._info[_Fields.KEY2]
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonCoGroup"
+        self._env._sets.append(self._info)
+        return new_set
+
+
+class JoinOperatorWhere(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def where(self, *fields):
+        """
+        Continues a Join transformation.
+
+        Defines the Tuple fields of the first join DataSet that should be used 
as join keys.
+        Note: Fields can only be selected as join keys on Tuple DataSets.
+
+        :param fields: The indexes of the Tuple fields of the first join 
DataSets that should be used as keys.
+        :return:An incomplete Join transformation.
+
+        """
+        self._info[_Fields.KEY1] = fields
+        return JoinOperatorTo(self._env, self._info)
+
+
+class JoinOperatorTo(object):
+    def __init__(self, env, info):
+        self._env = env
+        self._info = info
+
+    def equal_to(self, *fields):
+        """
+        Continues a Join transformation.
+
+        Defines the Tuple fields of the second join DataSet that should be 
used as join keys.
+        Note: Fields can only be selected as join keys on Tuple DataSets.
+
+        :param fields:The indexes of the Tuple fields of the second join 
DataSet that should be used as keys.
+        :return:An incomplete Join Transformation.
+        """
+        self._info[_Fields.KEY2] = fields
+        return JoinOperator(self._env, self._info)
+
+
+class JoinOperatorProjection(DataSet):
+    def __init__(self, env, info):
+        super(JoinOperatorProjection, self).__init__(env, info)
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the first join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        return self
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the second join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        return self
+
+
+class JoinOperator(DataSet):
+    def __init__(self, env, info):
+        super(JoinOperator, self).__init__(env, info)
+        self._info[_Fields.TYPES] = None
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the first join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        return JoinOperatorProjection(self._env, 
self._info).project_first(*fields)
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectJoin transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the second join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete JoinProjection.
+        """
+        return JoinOperatorProjection(self._env, 
self._info).project_second(*fields)
+
+    def using(self, operator, types):
+        """
+        Finalizes a Join transformation.
+
+        Applies a JoinFunction to each pair of joined elements. Each 
JoinFunction call returns exactly one element.
+
+        :param operator:The JoinFunction that is called for each pair of 
joined elements.
+        :param types:
+        :return:An Set that represents the joined result DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = JoinFunction()
+            operator.join = f
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonJoin"
+        self._env._sets.append(self._info)
+        return OperatorSet(self._env, self._info, copy_set=True)
+
+
+class CrossOperatorProjection(DataSet):
+    def __init__(self, env, info):
+        super(CrossOperatorProjection, self).__init__(env, info)
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the first join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("first", fields))
+        return self
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the second join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        self._info[_Fields.PROJECTIONS].append(("second", fields))
+        return self
+
+
+class CrossOperator(DataSet):
+    def __init__(self, env, info):
+        super(CrossOperator, self).__init__(env, info)
+        info[_Fields.TYPES] = None
+
+    def project_first(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the first join input.
+        If the first join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the first join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        return CrossOperatorProjection(self._env, 
self._info).project_first(*fields)
+
+    def project_second(self, *fields):
+        """
+        Initiates a ProjectCross transformation.
+
+        Projects the second join input.
+        If the second join input is a Tuple DataSet, fields can be selected by 
their index.
+        If the second join input is not a Tuple DataSet, no parameters should 
be passed.
+
+        :param fields: The indexes of the selected fields.
+        :return: An incomplete CrossProjection.
+        """
+        return CrossOperatorProjection(self._env, 
self._info).project_second(*fields)
+
+    def using(self, operator, types):
+        """
+        Finalizes a Cross transformation.
+
+        Applies a CrossFunction to each pair of joined elements. Each 
CrossFunction call returns exactly one element.
+
+        :param operator:The CrossFunction that is called for each pair of 
joined elements.
+        :param types: The type of the resulting DataSet.
+        :return:An Set that represents the joined result DataSet.
+        """
+        if isinstance(operator, TYPES.FunctionType):
+            f = operator
+            operator = CrossFunction()
+            operator.cross = f
+        self._info[_Fields.OPERATOR] = operator
+        self._info[_Fields.META] = str(inspect.getmodule(operator)) + "|" + 
str(operator.__class__.__name__)
+        self._info[_Fields.TYPES] = types
+        self._info[_Fields.NAME] = "PythonCross"
+        return OperatorSet(self._env, self._info, copy_set=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
new file mode 100644
index 0000000..61c077f
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
@@ -0,0 +1,339 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import inspect
+from flink.connection import Connection
+from flink.connection import Collector
+from flink.plan.DataSet import DataSet
+from flink.plan.Constants import _Fields, _Identifier
+from flink.utilities import Switch
+import dill
+import copy
+
+
+def get_environment():
+    """
+    Creates an execution environment that represents the context in which the 
program is currently executed.
+    
+    :return:The execution environment of the context in which the program is 
executed.
+    """
+    return Environment()
+
+
+def _dump(function):
+    return dill.dumps(function, protocol=0, byref=True)
+
+
+class Environment(object):
+    def __init__(self):
+        # util
+        self._connection = Connection.OneWayBusyBufferingMappedFileConnection()
+        self._collector = Collector.TypedCollector(self._connection)
+        self._counter = 0
+
+        #parameters
+        self._parameters = []
+
+        #sets
+        self._sources = []
+        self._sets = []
+        self._sinks = []
+
+        #specials
+        self._broadcast = []
+
+    def read_csv(self, path, types, line_delimiter="\n", field_delimiter=','):
+        """
+        Create a DataSet that represents the tuples produced by reading the 
given CSV file.
+
+        :param path: The path of the CSV file.
+        :param types: Specifies the types for the CSV fields.
+        :return:A CsvReader that can be used to configure the CSV input.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_CSV
+        child[_Fields.DELIMITER_LINE] = line_delimiter
+        child[_Fields.DELIMITER_FIELD] = field_delimiter
+        child[_Fields.PATH] = path
+        child[_Fields.TYPES] = types
+        self._sources.append(child)
+        return child_set
+
+    def read_text(self, path):
+        """
+        Creates a DataSet that represents the Strings produced by reading the 
given file line wise.
+
+        The file will be read with the system's default character set.
+
+        :param path: The path of the file, as a URI (e.g., 
"file:///some/local/file" or "hdfs://host:port/file/path").
+        :return: A DataSet that represents the data read from the given file 
as text lines.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_TEXT
+        child[_Fields.PATH] = path
+        self._sources.append(child)
+        return child_set
+
+    def from_elements(self, *elements):
+        """
+        Creates a new data set that contains the given elements.
+
+        The elements must all be of the same type, for example, all of the 
String or Integer.
+        The sequence of elements must not be empty.
+
+        :param elements: The elements to make up the data set.
+        :return: A DataSet representing the given list of elements.
+        """
+        child = dict()
+        child_set = DataSet(self, child)
+        child[_Fields.IDENTIFIER] = _Identifier.SOURCE_VALUE
+        child[_Fields.VALUES] = elements
+        self._sources.append(child)
+        return child_set
+
+    def set_degree_of_parallelism(self, degree):
+        """
+        Sets the degree of parallelism (DOP) for operations executed through 
this environment.
+
+        Setting a DOP of x here will cause all operators (such as join, map, 
reduce) to run with x parallel instances.
+
+        :param degreeOfParallelism: The degree of parallelism
+        """
+        self._parameters.append(("dop", degree))
+
+    def execute(self, local=False, debug=False):
+        """
+        Triggers the program execution.
+
+        The environment will execute all parts of the program that have 
resulted in a "sink" operation.
+        """
+        if debug:
+            local = True
+        self._parameters.append(("mode", local))
+        self._parameters.append(("debug", debug))
+        self._optimize_plan()
+        self._send_plan()
+        self._connection._write_buffer()
+
+    def _optimize_plan(self):
+        self._find_chains()
+
+    def _find_chains(self):
+        udf = set([_Identifier.MAP, _Identifier.FLATMAP, _Identifier.FILTER, 
_Identifier.MAPPARTITION,
+                   _Identifier.GROUPREDUCE, _Identifier.REDUCE, 
_Identifier.COGROUP,
+                   _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST,
+                   _Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT])
+        chainable = set([_Identifier.MAP, _Identifier.FILTER, 
_Identifier.FLATMAP, _Identifier.GROUPREDUCE, _Identifier.REDUCE])
+        multi_input = set([_Identifier.JOIN, _Identifier.JOINH, 
_Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, 
_Identifier.COGROUP, _Identifier.UNION])
+        x = len(self._sets) - 1
+        while x > -1:
+            child = self._sets[x]
+            child_type = child[_Fields.IDENTIFIER]
+            if child_type in chainable:
+                parent = child[_Fields.PARENT]
+                parent_type = parent[_Fields.IDENTIFIER]
+                if len(parent[_Fields.SINKS]) == 0:
+                    if child_type == _Identifier.GROUPREDUCE or child_type == 
_Identifier.REDUCE:
+                        if child[_Fields.COMBINE]:
+                            while parent_type == _Identifier.GROUP or 
parent_type == _Identifier.SORT:
+                                parent = parent[_Fields.PARENT]
+                                parent_type = parent[_Fields.IDENTIFIER]
+                            if parent_type in udf and 
len(parent[_Fields.CHILDREN]) == 1:
+                                if parent[_Fields.OPERATOR] is not None:
+                                    function = child[_Fields.COMBINEOP]
+                                    meta = str(inspect.getmodule(function)) + 
"|" + str(function.__class__.__name__)
+                                    
parent[_Fields.OPERATOR]._chain(_dump(function), meta)
+                                    child[_Fields.COMBINE] = False
+                                    parent[_Fields.NAME] += " -> PythonCombine"
+                                    for bcvar in child[_Fields.BCVARS]:
+                                        bcvar_copy = copy.deepcopy(bcvar)
+                                        bcvar_copy[_Fields.PARENT] = parent
+                                        self._broadcast.append(bcvar_copy)
+                    else:
+                        if parent_type in udf and 
len(parent[_Fields.CHILDREN]) == 1:
+                            parent_op = parent[_Fields.OPERATOR]
+                            if parent_op is not None:
+                                function = child[_Fields.OPERATOR]
+                                meta = str(inspect.getmodule(function)) + "|" 
+ str(function.__class__.__name__)
+                                parent_op._chain(_dump(function), meta)
+                                parent[_Fields.NAME] += " -> " + 
child[_Fields.NAME]
+                                parent[_Fields.TYPES] = child[_Fields.TYPES]
+                                for grand_child in child[_Fields.CHILDREN]:
+                                    if grand_child[_Fields.IDENTIFIER] in 
multi_input:
+                                        if 
grand_child[_Fields.PARENT][_Fields.ID] == child[_Fields.ID]:
+                                            grand_child[_Fields.PARENT] = 
parent
+                                        else:
+                                            grand_child[_Fields.OTHER] = parent
+                                    else:
+                                        grand_child[_Fields.PARENT] = parent
+                                        
parent[_Fields.CHILDREN].append(grand_child)
+                                parent[_Fields.CHILDREN].remove(child)
+                                for sink in child[_Fields.SINKS]:
+                                    sink[_Fields.PARENT] = parent
+                                    parent[_Fields.SINKS].append(sink)
+                                for bcvar in child[_Fields.BCVARS]:
+                                    bcvar[_Fields.PARENT] = parent
+                                    parent[_Fields.BCVARS].append(bcvar)
+                                self._remove_set((child))
+            x -= 1
+
+    def _remove_set(self, set):
+        self._sets[:] = [s for s in self._sets if 
s[_Fields.ID]!=set[_Fields.ID]]
+
+    def _send_plan(self):
+        self._send_parameters()
+        self._collector.collect(len(self._sources) + len(self._sets) + 
len(self._sinks) + len(self._broadcast))
+        self._send_sources()
+        self._send_operations()
+        self._send_sinks()
+        self._send_broadcast()
+
+    def _send_parameters(self):
+        self._collector.collect(len(self._parameters))
+        for parameter in self._parameters:
+            self._collector.collect(parameter)
+
+    def _send_sources(self):
+        for source in self._sources:
+            identifier = source[_Fields.IDENTIFIER]
+            collect = self._collector.collect
+            collect(identifier)
+            collect(source[_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SOURCE_CSV):
+                    collect(source[_Fields.PATH])
+                    collect(source[_Fields.DELIMITER_FIELD])
+                    collect(source[_Fields.DELIMITER_LINE])
+                    collect(source[_Fields.TYPES])
+                    break
+                if case(_Identifier.SOURCE_TEXT):
+                    collect(source[_Fields.PATH])
+                    break
+                if case(_Identifier.SOURCE_VALUE):
+                    collect(len(source[_Fields.VALUES]))
+                    for value in source[_Fields.VALUES]:
+                        collect(value)
+                    break
+
+    def _send_operations(self):
+        collect = self._collector.collect
+        collectBytes = self._collector.collectBytes
+        for set in self._sets:
+            identifier = set.get(_Fields.IDENTIFIER)
+            collect(set[_Fields.IDENTIFIER])
+            collect(set[_Fields.ID])
+            collect(set[_Fields.PARENT][_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SORT):
+                    collect(set[_Fields.FIELD])
+                    collect(set[_Fields.ORDER])
+                    break
+                if case(_Identifier.GROUP):
+                    collect(set[_Fields.KEYS])
+                    break
+                if case(_Identifier.COGROUP):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    collect(set[_Fields.KEY1])
+                    collect(set[_Fields.KEY2])
+                    collectBytes(_dump(set[_Fields.OPERATOR]))
+                    collect(set[_Fields.META])
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.CROSS, _Identifier.CROSSH, 
_Identifier.CROSST):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    if set[_Fields.OPERATOR] is None:
+                        collect(set[_Fields.OPERATOR])
+                    else:
+                        collectBytes(_dump(set[_Fields.OPERATOR]))
+                    collect(set[_Fields.META])
+                    collect(set[_Fields.TYPES])
+                    collect(len(set[_Fields.PROJECTIONS]))
+                    for p in set[_Fields.PROJECTIONS]:
+                        collect(p[0])
+                        collect(p[1])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
+                    collectBytes(_dump(set[_Fields.OPERATOR]))
+                    collectBytes(_dump(set[_Fields.COMBINEOP]))
+                    collect(set[_Fields.META])
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.COMBINE])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.JOIN, _Identifier.JOINH, 
_Identifier.JOINT):
+                    collect(set[_Fields.KEY1])
+                    collect(set[_Fields.KEY2])
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    if set[_Fields.OPERATOR] is None:
+                        collect(set[_Fields.OPERATOR])
+                    else:
+                        collectBytes(_dump(set[_Fields.OPERATOR]))
+                    collect(set[_Fields.META])
+                    collect(set[_Fields.TYPES])
+                    collect(len(set[_Fields.PROJECTIONS]))
+                    for p in set[_Fields.PROJECTIONS]:
+                        collect(p[0])
+                        collect(p[1])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.MAP, _Identifier.MAPPARTITION, 
_Identifier.FLATMAP, _Identifier.FILTER):
+                    collectBytes(_dump(set[_Fields.OPERATOR]))
+                    collect(set[_Fields.META])
+                    collect(set[_Fields.TYPES])
+                    collect(set[_Fields.NAME])
+                    break
+                if case(_Identifier.UNION):
+                    collect(set[_Fields.OTHER][_Fields.ID])
+                    break
+                if case(_Identifier.PROJECTION):
+                    collect(set[_Fields.KEYS])
+                    break
+                if case():
+                    raise KeyError("Environment._send_child_sets(): Invalid 
operation identifier: " + str(identifier))
+
+    def _send_sinks(self):
+        for sink in self._sinks:
+            identifier = sink[_Fields.IDENTIFIER]
+            collect = self._collector.collect
+            collect(identifier)
+            collect(sink[_Fields.PARENT][_Fields.ID])
+            for case in Switch(identifier):
+                if case(_Identifier.SINK_CSV):
+                    collect(sink[_Fields.PATH])
+                    collect(sink[_Fields.DELIMITER_FIELD])
+                    collect(sink[_Fields.DELIMITER_LINE])
+                    collect(sink[_Fields.WRITE_MODE])
+                    break;
+                if case(_Identifier.SINK_TEXT):
+                    collect(sink[_Fields.PATH])
+                    collect(sink[_Fields.WRITE_MODE])
+                    break
+                if case(_Identifier.SINK_PRINT):
+                    collect(sink[_Fields.TO_ERR])
+                    break
+
+    def _send_broadcast(self):
+        collect = self._collector.collect
+        for entry in self._broadcast:
+            collect(_Identifier.BROADCAST)
+            collect(entry[_Fields.PARENT][_Fields.ID])
+            collect(entry[_Fields.OTHER][_Fields.ID])
+            collect(entry[_Fields.NAME])
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
new file mode 100644
index 0000000..d35bf39
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
new file mode 100644
index 0000000..faae78a
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/utilities/__init__.py
@@ -0,0 +1,36 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+class Switch(object):
+    def __init__(self, value):
+        self.value = value
+        self.fall = False
+
+    def __iter__(self):
+        yield self.match
+        raise StopIteration
+
+    def match(self, *args):
+        if self.fall or not args:
+            return True
+        elif self.value in args:
+            self.fall = True
+            return True
+        else:
+            return False

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
new file mode 100644
index 0000000..82447e9
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/setup.py
@@ -0,0 +1,33 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from distutils.core import setup
+
+setup(
+    name='flink',
+    version='1.0',
+    packages=['flink',
+              'flink.connection',
+              'flink.functions',
+              'flink.plan',
+              'flink.utilities'],
+    url='http://flink.apache.org',
+    license='Licensed under the Apache License, Version 2.0',
+    author='',
+    author_email='',
+    description='Flink Python API'
+)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
 
b/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
new file mode 100644
index 0000000..4295e93
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinderTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_2;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.ARGUMENT_PYTHON_3;
+import org.junit.Test;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PythonPlanBinderTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(PythonPlanBinder.class);
+       
+       private static boolean python2Supported = true;
+       private static boolean python3Supported = true;
+       private static List<String> TEST_FILES;
+       
+       @BeforeClass
+       public static void setup() throws Exception {
+               findTestFiles();
+               checkPythonSupport();
+       }
+       
+       private static void findTestFiles() throws Exception {
+               TEST_FILES = new ArrayList();
+               FileSystem fs = FileSystem.getLocalFileSystem();
+               FileStatus[] status = fs.listStatus(
+                               new Path(fs.getWorkingDirectory().toString()
+                                               + 
"/src/test/python/org/apache/flink/languagebinding/api/python/flink/test"));
+               for (FileStatus f : status) {
+                       String file = f.getPath().toString();
+                       if (file.endsWith(".py")) {
+                               TEST_FILES.add(file);
+                       }
+               }
+       }
+       
+       private static void checkPythonSupport() {      
+               try {
+                       Runtime.getRuntime().exec("python");
+               } catch (IOException ex) {
+                       python2Supported = false;
+                       LOG.info("No Python 2 runtime detected.");
+               }
+               try {
+                       Runtime.getRuntime().exec("python3");
+               } catch (IOException ex) {
+                       python3Supported = false;
+                       LOG.info("No Python 3 runtime detected.");
+               }
+       }
+       
+       @Test
+       public void testPython2() throws Exception {
+               if (python2Supported) {
+                       for (String file : TEST_FILES) {
+                               LOG.info("testing " + file);
+                               PythonPlanBinder.main(new 
String[]{ARGUMENT_PYTHON_2, file});
+                       }
+               }
+       }
+       
+       @Test
+       public void testPython3() throws Exception {
+               if (python3Supported) {
+                       for (String file : TEST_FILES) {
+                               LOG.info("testing " + file);
+                               PythonPlanBinder.main(new 
String[]{ARGUMENT_PYTHON_3, file});
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
new file mode 100644
index 0000000..a103a5c
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv
@@ -0,0 +1,2 @@
+4,2,hello
+3,2,world

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
new file mode 100644
index 0000000..e7be084
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text
@@ -0,0 +1,2 @@
+sup guys
+i am the world

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
new file mode 100644
index 0000000..c48179b
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_csv.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = 
env.read_csv("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv",
 (INT, INT, STRING))
+
+    d1.write_csv("/tmp/flink/result")
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
new file mode 100644
index 0000000..29113f3
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_main.py
@@ -0,0 +1,264 @@
+# 
###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
+
+
+class Mapper(MapFunction):
+    def map(self, value):
+        return value * value
+
+
+class Filter(FilterFunction):
+    def __init__(self, limit):
+        super(Filter, self).__init__()
+        self.limit = limit
+
+    def filter(self, value):
+        return value > self.limit
+
+
+class FlatMap(FlatMapFunction):
+    def flat_map(self, value, collector):
+        collector.collect(value)
+        collector.collect(value * 2)
+
+
+class MapPartition(MapPartitionFunction):
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value * 2)
+
+
+class Reduce(ReduceFunction):
+    def reduce(self, value1, value2):
+        return value1 + value2
+
+
+class Reduce2(ReduceFunction):
+    def reduce(self, value1, value2):
+        return (value1[0] + value2[0], value1[1] + value2[1], value1[2], 
value1[3] or value2[3])
+
+
+class Cross(CrossFunction):
+    def cross(self, value1, value2):
+        return (value1, value2[3])
+
+
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("test")[0][0]
+        return value * factor
+
+
+class Join(JoinFunction):
+    def join(self, value1, value2):
+        if value1[3]:
+            return value2[0] + str(value1[0])
+        else:
+            return value2[0] + str(value1[1])
+
+
+class GroupReduce(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        if iterator.has_next():
+            i, f, s, b = iterator.next()
+            for value in iterator:
+                i += value[0]
+                f += value[1]
+                b |= value[3]
+            collector.collect((i, f, s, b))
+
+
+class GroupReduce2(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value)
+
+
+class GroupReduce3(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        collector.collect(iterator.next())
+
+    def combine(self, iterator, collector):
+        if iterator.has_next():
+            v1 = iterator.next()
+        if iterator.has_next():
+            v2 = iterator.next()
+        if v1[0] < v2[0]:
+            collector.collect(v1)
+        else:
+            collector.collect(v2)
+
+
+class CoGroup(CoGroupFunction):
+    def co_group(self, iterator1, iterator2, collector):
+        while iterator1.has_next() and iterator2.has_next():
+            collector.collect((iterator1.next(), iterator2.next()))
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + 
str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Verify2(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify2, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            if value in self.expected:
+                try:
+                    self.expected.remove(value)
+                except Exception:
+                    raise Exception(self.name + " failed!")
+        collector.collect(self.name + " successful!")
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(1, 6, 12)
+
+    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d3 = env.from_elements(("hello",), ("world",))
+
+    d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), 
(1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 
4.1, 3))
+
+    d1 \
+        .map((lambda x: x * x), INT).map(Mapper(), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
+
+    d1 \
+        .map(Mapper(), INT).map((lambda x: x * x), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), 
STRING).output()
+
+    d1 \
+        .filter(Filter(5)).filter(Filter(8)) \
+        .map_partition(Verify([12], "Filter"), STRING).output()
+
+    d1 \
+        .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
+        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], 
"FlatMap"), STRING).output()
+
+    d1 \
+        .map_partition(MapPartition(), INT) \
+        .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
+
+    d1 \
+        .reduce(Reduce()) \
+        .map_partition(Verify([19], "AllReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "CombineReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "ChainedReduce"), STRING).output()
+
+    d1 \
+        .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
+        .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
+
+    d1 \
+        .cross(d2).using(Cross(), (INT, BOOL)) \
+        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), 
(12, True), (12, False)], "Cross"), STRING).output()
+
+    d1 \
+        .cross(d3) \
+        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, 
("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default 
Cross"), STRING).output()
+
+    d2 \
+        .cross(d3).project_second(0).project_first(0, 1) \
+        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 
2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).using(Join(), STRING) \
+        .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
+        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], 
"Project Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0) \
+        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 
0.4, "world", False), ("world",))], "Default Join"), STRING).output()
+
+    d2 \
+        .project(0, 1).project(2) \
+        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], 
"Project"), STRING).output()
+
+    d2 \
+        .union(d4) \
+        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", 
False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", 
True), (2, 0.4, "world", False)], "Union"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=False) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "AllGroupReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, 
BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "ChainedGroupReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "CombineGroupReduce"), STRING).output()
+
+    d5 \
+        .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, 
Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), 
combinable=True) \
+        .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], 
"ChainedSortedGroupReduce"), STRING).output()
+
+    d4 \
+        .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, 
STRING, BOOL), (FLOAT, FLOAT, INT))) \
+        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 
0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
new file mode 100644
index 0000000..83b78fa
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_text.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = 
env.read_text("src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text")
+
+    d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE)
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
new file mode 100644
index 0000000..a3b8d07
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_type_deduction.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import _Fields
+from flink.plan.Constants import INT, STRING, BOOL, FLOAT
+import sys
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(("hello", 4, 3.2, True))
+
+    d2 = env.from_elements("world")
+
+    direct_from_source = d1.filter(lambda x:True)
+
+    if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+        sys.exit("Error deducting type directly from source.")
+
+    from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
+
+    if from_common_udf._info[_Fields.TYPES] != BOOL:
+        sys.exit("Error deducting type from common udf.")
+
+    through_projection = d1.project(3, 2).filter(lambda x:True)
+
+    if through_projection._info[_Fields.TYPES] != (True, 3.2):
+        sys.exit("Error deducting type through projection.")
+
+    through_default_op = d1.cross(d2).filter(lambda x:True)
+
+    if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), 
"world"):
+        sys.exit("Error deducting type through default J/C." 
+str(through_default_op._info[_Fields.TYPES]))
+
+    through_prj_op = d1.cross(d2).project_first(1, 
0).project_second().project_first(3, 2).filter(lambda x:True)
+
+    if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
+        sys.exit("Error deducting type through projection J/C. 
"+str(through_prj_op._info[_Fields.TYPES]))
+
+
+    env = get_environment()
+
+    msg = env.from_elements("Type deduction test successful.")
+
+    msg.output()
+
+    env.execute()
+

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
new file mode 100644
index 0000000..f5f3ee4
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/test_types.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + 
str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world"))
+
+    d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), 
bytearray(b"world")], "Byte"), STRING).output()
+
+    d2 = env.from_elements(1,2,3,4,5)
+
+    d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), 
STRING).output()
+
+    d3 = env.from_elements(True, True, False)
+
+    d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), 
STRING).output()
+
+    d4 = env.from_elements(1.4, 1.7, 12312.23)
+
+    d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), 
STRING).output()
+
+    d5 = env.from_elements("hello", "world")
+
+    d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), 
STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)
\ No newline at end of file

Reply via email to