http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
new file mode 100644
index 0000000..2116d1f
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py
@@ -0,0 +1,264 @@
+# 
###############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.FlatMapFunction import FlatMapFunction
+from flink.functions.FilterFunction import FilterFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.functions.ReduceFunction import ReduceFunction
+from flink.functions.CrossFunction import CrossFunction
+from flink.functions.JoinFunction import JoinFunction
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+from flink.functions.CoGroupFunction import CoGroupFunction
+from flink.plan.Constants import INT, STRING, FLOAT, BOOL, Order
+
+
+class Mapper(MapFunction):
+    def map(self, value):
+        return value * value
+
+
+class Filter(FilterFunction):
+    def __init__(self, limit):
+        super(Filter, self).__init__()
+        self.limit = limit
+
+    def filter(self, value):
+        return value > self.limit
+
+
+class FlatMap(FlatMapFunction):
+    def flat_map(self, value, collector):
+        collector.collect(value)
+        collector.collect(value * 2)
+
+
+class MapPartition(MapPartitionFunction):
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value * 2)
+
+
+class Reduce(ReduceFunction):
+    def reduce(self, value1, value2):
+        return value1 + value2
+
+
+class Reduce2(ReduceFunction):
+    def reduce(self, value1, value2):
+        return (value1[0] + value2[0], value1[1] + value2[1], value1[2], 
value1[3] or value2[3])
+
+
+class Cross(CrossFunction):
+    def cross(self, value1, value2):
+        return (value1, value2[3])
+
+
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("test")[0][0]
+        return value * factor
+
+
+class Join(JoinFunction):
+    def join(self, value1, value2):
+        if value1[3]:
+            return value2[0] + str(value1[0])
+        else:
+            return value2[0] + str(value1[1])
+
+
+class GroupReduce(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        if iterator.has_next():
+            i, f, s, b = iterator.next()
+            for value in iterator:
+                i += value[0]
+                f += value[1]
+                b |= value[3]
+            collector.collect((i, f, s, b))
+
+
+class GroupReduce2(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        for value in iterator:
+            collector.collect(value)
+
+
+class GroupReduce3(GroupReduceFunction):
+    def reduce(self, iterator, collector):
+        collector.collect(iterator.next())
+
+    def combine(self, iterator, collector):
+        if iterator.has_next():
+            v1 = iterator.next()
+        if iterator.has_next():
+            v2 = iterator.next()
+        if v1[0] < v2[0]:
+            collector.collect(v1)
+        else:
+            collector.collect(v2)
+
+
+class CoGroup(CoGroupFunction):
+    def co_group(self, iterator1, iterator2, collector):
+        while iterator1.has_next() and iterator2.has_next():
+            collector.collect((iterator1.next(), iterator2.next()))
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + 
str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Verify2(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify2, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        for value in iterator:
+            if value in self.expected:
+                try:
+                    self.expected.remove(value)
+                except Exception:
+                    raise Exception(self.name + " failed!")
+        collector.collect(self.name + " successful!")
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(1, 6, 12)
+
+    d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d3 = env.from_elements(("hello",), ("world",))
+
+    d4 = env.from_elements((1, 0.5, "hello", True), (1, 0.4, "hello", False), 
(1, 0.5, "hello", True), (2, 0.4, "world", False))
+
+    d5 = env.from_elements((4.4, 4.3, 1), (4.3, 4.4, 1), (4.2, 4.1, 3), (4.1, 
4.1, 3))
+
+    d1 \
+        .map((lambda x: x * x), INT).map(Mapper(), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Map"), STRING).output()
+
+    d1 \
+        .map(Mapper(), INT).map((lambda x: x * x), INT) \
+        .map_partition(Verify([1, 1296, 20736], "Chained Lambda"), 
STRING).output()
+
+    d1 \
+        .filter(Filter(5)).filter(Filter(8)) \
+        .map_partition(Verify([12], "Filter"), STRING).output()
+
+    d1 \
+        .flat_map(FlatMap(), INT).flat_map(FlatMap(), INT) \
+        .map_partition(Verify([1, 2, 2, 4, 6, 12, 12, 24, 12, 24, 24, 48], 
"FlatMap"), STRING).output()
+
+    d1 \
+        .map_partition(MapPartition(), INT) \
+        .map_partition(Verify([2, 12, 24], "MapPartition"), STRING).output()
+
+    d1 \
+        .reduce(Reduce()) \
+        .map_partition(Verify([19], "AllReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "CombineReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, BOOL)).group_by(2).reduce(Reduce2()) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "ChainedReduce"), STRING).output()
+
+    d1 \
+        .map(MapperBcv(), INT).with_broadcast_set("test", d2) \
+        .map_partition(Verify([1, 6, 12], "Broadcast"), STRING).output()
+
+    d1 \
+        .cross(d2).using(Cross(), (INT, BOOL)) \
+        .map_partition(Verify([(1, True), (1, False), (6, True), (6, False), 
(12, True), (12, False)], "Cross"), STRING).output()
+
+    d1 \
+        .cross(d3) \
+        .map_partition(Verify([(1, ("hello",)), (1, ("world",)), (6, 
("hello",)), (6, ("world",)), (12, ("hello",)), (12, ("world",))], "Default 
Cross"), STRING).output()
+
+    d2 \
+        .cross(d3).project_second(0).project_first(0, 1) \
+        .map_partition(Verify([("hello", 1, 0.5), ("world", 1, 0.5), ("hello", 
2, 0.4), ("world", 2, 0.4)], "Project Cross"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).using(Join(), STRING) \
+        .map_partition(Verify(["hello1", "world0.4"], "Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0).project_first(0, 3).project_second(0) \
+        .map_partition(Verify([(1, True, "hello"), (2, False, "world")], 
"Project Join"), STRING).output()
+
+    d2 \
+        .join(d3).where(2).equal_to(0) \
+        .map_partition(Verify([((1, 0.5, "hello", True), ("hello",)), ((2, 
0.4, "world", False), ("world",))], "Default Join"), STRING).output()
+
+    d2 \
+        .project(0, 1, 2) \
+        .map_partition(Verify([(1, 0.5, "hello"), (2, 0.4, "world")], 
"Project"), STRING).output()
+
+    d2 \
+        .union(d4) \
+        .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", 
False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", 
True), (2, 0.4, "world", False)], "Union"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=False) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "AllGroupReduce"), STRING).output()
+
+    d4 \
+        .map(Id(), (INT, FLOAT, STRING, 
BOOL)).group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "ChainedGroupReduce"), STRING).output()
+
+    d4 \
+        .group_by(2).reduce_group(GroupReduce(), (INT, FLOAT, STRING, BOOL), 
combinable=True) \
+        .map_partition(Verify([(3, 1.4, "hello", True), (2, 0.4, "world", 
False)], "CombineGroupReduce"), STRING).output()
+
+    d5 \
+        .group_by(2).sort_group(0, Order.DESCENDING).sort_group(1, 
Order.ASCENDING).reduce_group(GroupReduce3(), (FLOAT, FLOAT, INT), 
combinable=True) \
+        .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], 
"ChainedSortedGroupReduce"), STRING).output()
+
+    d4 \
+        .co_group(d5).where(0).equal_to(2).using(CoGroup(), ((INT, FLOAT, 
STRING, BOOL), (FLOAT, FLOAT, INT))) \
+        .map_partition(Verify([((1, 0.5, "hello", True), (4.4, 4.3, 1)), ((1, 
0.4, "hello", False), (4.3, 4.4, 1))], "CoGroup"), STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
new file mode 100644
index 0000000..1f90587
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_text.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import WriteMode
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.read_text("src/test/python/org/apache/flink/python/api/data_text")
+
+    d1.write_text("/tmp/flink/result", WriteMode.OVERWRITE)
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
new file mode 100644
index 0000000..a3b8d07
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_type_deduction.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import _Fields
+from flink.plan.Constants import INT, STRING, BOOL, FLOAT
+import sys
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(("hello", 4, 3.2, True))
+
+    d2 = env.from_elements("world")
+
+    direct_from_source = d1.filter(lambda x:True)
+
+    if direct_from_source._info[_Fields.TYPES] != ("hello", 4, 3.2, True):
+        sys.exit("Error deducting type directly from source.")
+
+    from_common_udf = d1.map(lambda x: x[3], BOOL).filter(lambda x:True)
+
+    if from_common_udf._info[_Fields.TYPES] != BOOL:
+        sys.exit("Error deducting type from common udf.")
+
+    through_projection = d1.project(3, 2).filter(lambda x:True)
+
+    if through_projection._info[_Fields.TYPES] != (True, 3.2):
+        sys.exit("Error deducting type through projection.")
+
+    through_default_op = d1.cross(d2).filter(lambda x:True)
+
+    if through_default_op._info[_Fields.TYPES] != (("hello", 4, 3.2, True), 
"world"):
+        sys.exit("Error deducting type through default J/C." 
+str(through_default_op._info[_Fields.TYPES]))
+
+    through_prj_op = d1.cross(d2).project_first(1, 
0).project_second().project_first(3, 2).filter(lambda x:True)
+
+    if through_prj_op._info[_Fields.TYPES] != (4, "hello", "world", True, 3.2):
+        sys.exit("Error deducting type through projection J/C. 
"+str(through_prj_op._info[_Fields.TYPES]))
+
+
+    env = get_environment()
+
+    msg = env.from_elements("Type deduction test successful.")
+
+    msg.output()
+
+    env.execute()
+

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
new file mode 100644
index 0000000..f5f3ee4
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_types.py
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from flink.plan.Environment import get_environment
+from flink.functions.MapFunction import MapFunction
+from flink.functions.MapPartitionFunction import MapPartitionFunction
+from flink.plan.Constants import BOOL, INT, FLOAT, STRING, BYTES
+
+
+class Verify(MapPartitionFunction):
+    def __init__(self, expected, name):
+        super(Verify, self).__init__()
+        self.expected = expected
+        self.name = name
+
+    def map_partition(self, iterator, collector):
+        index = 0
+        for value in iterator:
+            if value != self.expected[index]:
+                print(self.name + " Test failed. Expected: " + 
str(self.expected[index]) + " Actual: " + str(value))
+                raise Exception(self.name + " failed!")
+            index += 1
+        collector.collect(self.name + " successful!")
+
+
+class Id(MapFunction):
+    def map(self, value):
+        return value
+
+
+if __name__ == "__main__":
+    env = get_environment()
+
+    d1 = env.from_elements(bytearray(b"hello"), bytearray(b"world"))
+
+    d1.map(Id(), BYTES).map_partition(Verify([bytearray(b"hello"), 
bytearray(b"world")], "Byte"), STRING).output()
+
+    d2 = env.from_elements(1,2,3,4,5)
+
+    d2.map(Id(), INT).map_partition(Verify([1,2,3,4,5], "Int"), 
STRING).output()
+
+    d3 = env.from_elements(True, True, False)
+
+    d3.map(Id(), BOOL).map_partition(Verify([True, True, False], "Bool"), 
STRING).output()
+
+    d4 = env.from_elements(1.4, 1.7, 12312.23)
+
+    d4.map(Id(), FLOAT).map_partition(Verify([1.4, 1.7, 12312.23], "Float"), 
STRING).output()
+
+    d5 = env.from_elements("hello", "world")
+
+    d5.map(Id(), STRING).map_partition(Verify(["hello", "world"], "String"), 
STRING).output()
+
+    env.set_degree_of_parallelism(1)
+
+    env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 065127b..e1dc2d9 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -36,5 +36,6 @@ under the License.
        <modules>
                <module>flink-gelly</module>
                <module>flink-gelly-scala</module>
+               <module>flink-python</module>
        </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml 
b/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
deleted file mode 100644
index 7a8f8af..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/pom.xml
+++ /dev/null
@@ -1,61 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-language-binding-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-       
-    <artifactId>flink-language-binding-generic</artifactId>
-    <name>flink-language-binding-generic</name>
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
deleted file mode 100644
index bdb0444..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/OperationInfo.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
-import org.apache.flink.core.fs.FileSystem.WriteMode;
-import org.apache.flink.languagebinding.api.java.common.PlanBinder.Operation;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.normalizeKeys;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.toIntArray;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Container for all generic information related to operations. This class 
contains the absolute minimum fields that are
- * required for all operations. This class should be extended to contain any 
additional fields required on a
- * per-language basis.
- */
-public class OperationInfo {
-       public int parentID; //DataSet that an operation is applied on
-       public int otherID; //secondary DataSet
-       public int setID; //ID for new DataSet
-       public String[] keys;
-       public String[] keys1; //join/cogroup keys
-       public String[] keys2; //join/cogroup keys
-       public TypeInformation<?> types; //typeinformation about output type
-       public AggregationEntry[] aggregates;
-       public ProjectionEntry[] projections; //projectFirst/projectSecond
-       public Object[] values;
-       public int count;
-       public int field;
-       public int[] fields;
-       public Order order;
-       public String path;
-       public String fieldDelimiter;
-       public String lineDelimiter;
-       public long from;
-       public long to;
-       public WriteMode writeMode;
-       public boolean toError;
-       public String name;
-
-       public OperationInfo() {
-       }
-
-       public OperationInfo(Receiver receiver, Operation identifier) throws 
IOException {
-               Object tmpType;
-               switch (identifier) {
-                       case SOURCE_CSV:
-                               setID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               fieldDelimiter = (String) receiver.getRecord();
-                               lineDelimiter = (String) receiver.getRecord();
-                               tmpType = (Tuple) receiver.getRecord();
-                               types = tmpType == null ? null : 
getForObject(tmpType);
-                               return;
-                       case SOURCE_TEXT:
-                               setID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               return;
-                       case SOURCE_VALUE:
-                               setID = (Integer) receiver.getRecord(true);
-                               int valueCount = (Integer) 
receiver.getRecord(true);
-                               values = new Object[valueCount];
-                               for (int x = 0; x < valueCount; x++) {
-                                       values[x] = receiver.getRecord();
-                               }
-                               return;
-                       case SOURCE_SEQ:
-                               setID = (Integer) receiver.getRecord(true);
-                               from = (Long) receiver.getRecord();
-                               to = (Long) receiver.getRecord();
-                               return;
-                       case SINK_CSV:
-                               parentID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               fieldDelimiter = (String) receiver.getRecord();
-                               lineDelimiter = (String) receiver.getRecord();
-                               writeMode = ((Integer) 
receiver.getRecord(true)) == 1
-                                               ? WriteMode.OVERWRITE
-                                               : WriteMode.NO_OVERWRITE;
-                               return;
-                       case SINK_TEXT:
-                               parentID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               writeMode = ((Integer) 
receiver.getRecord(true)) == 1
-                                               ? WriteMode.OVERWRITE
-                                               : WriteMode.NO_OVERWRITE;
-                               return;
-                       case SINK_PRINT:
-                               parentID = (Integer) receiver.getRecord(true);
-                               toError = (Boolean) receiver.getRecord();
-                               return;
-                       case BROADCAST:
-                               parentID = (Integer) receiver.getRecord(true);
-                               otherID = (Integer) receiver.getRecord(true);
-                               name = (String) receiver.getRecord();
-                               return;
-               }
-               setID = (Integer) receiver.getRecord(true);
-               parentID = (Integer) receiver.getRecord(true);
-               switch (identifier) {
-                       case AGGREGATE:
-                               count = (Integer) receiver.getRecord(true);
-                               aggregates = new AggregationEntry[count];
-                               for (int x = 0; x < count; x++) {
-                                       int encodedAgg = (Integer) 
receiver.getRecord(true);
-                                       int field = (Integer) 
receiver.getRecord(true);
-                                       aggregates[x] = new 
AggregationEntry(encodedAgg, field);
-                               }
-                               return;
-                       case FIRST:
-                               count = (Integer) receiver.getRecord(true);
-                               return;
-                       case DISTINCT:
-                       case GROUPBY:
-                       case PARTITION_HASH:
-                               keys = normalizeKeys(receiver.getRecord(true));
-                               return;
-                       case PROJECTION:
-                               fields = toIntArray(receiver.getRecord(true));
-                               return;
-                       case REBALANCE:
-                               return;
-                       case SORT:
-                               field = (Integer) receiver.getRecord(true);
-                               int encodedOrder = (Integer) 
receiver.getRecord(true);
-                               switch (encodedOrder) {
-                                       case 0:
-                                               order = Order.NONE;
-                                               break;
-                                       case 1:
-                                               order = Order.ASCENDING;
-                                               break;
-                                       case 2:
-                                               order = Order.DESCENDING;
-                                               break;
-                                       case 3:
-                                               order = Order.ANY;
-                                               break;
-                                       default:
-                                               order = Order.NONE;
-                                               break;
-                               }
-                               return;
-                       case UNION:
-                               otherID = (Integer) receiver.getRecord(true);
-                               return;
-               }
-       }
-
-       @Override
-       public String toString() {
-               StringBuilder sb = new StringBuilder();
-               sb.append("SetID: ").append(setID).append("\n");
-               sb.append("ParentID: ").append(parentID).append("\n");
-               sb.append("OtherID: ").append(otherID).append("\n");
-               sb.append("Name: ").append(name).append("\n");
-               sb.append("Types: ").append(types).append("\n");
-               sb.append("Keys1: 
").append(Arrays.toString(keys1)).append("\n");
-               sb.append("Keys2: 
").append(Arrays.toString(keys2)).append("\n");
-               sb.append("Keys: ").append(Arrays.toString(keys)).append("\n");
-               sb.append("Aggregates: 
").append(Arrays.toString(aggregates)).append("\n");
-               sb.append("Projections: 
").append(Arrays.toString(projections)).append("\n");
-               sb.append("Count: ").append(count).append("\n");
-               sb.append("Field: ").append(field).append("\n");
-               sb.append("Order: ").append(order.toString()).append("\n");
-               sb.append("Path: ").append(path).append("\n");
-               sb.append("FieldDelimiter: 
").append(fieldDelimiter).append("\n");
-               sb.append("LineDelimiter: ").append(lineDelimiter).append("\n");
-               sb.append("From: ").append(from).append("\n");
-               sb.append("To: ").append(to).append("\n");
-               sb.append("WriteMode: ").append(writeMode).append("\n");
-               sb.append("toError: ").append(toError).append("\n");
-               return sb.toString();
-       }
-
-       public static class AggregationEntry {
-               public Aggregations agg;
-               public int field;
-
-               public AggregationEntry(int encodedAgg, int field) {
-                       switch (encodedAgg) {
-                               case 0:
-                                       agg = Aggregations.MAX;
-                                       break;
-                               case 1:
-                                       agg = Aggregations.MIN;
-                                       break;
-                               case 2:
-                                       agg = Aggregations.SUM;
-                                       break;
-                       }
-                       this.field = field;
-               }
-
-               @Override
-               public String toString() {
-                       return agg.toString() + " - " + field;
-               }
-       }
-
-       public static class ProjectionEntry {
-               public ProjectionSide side;
-               public int[] keys;
-
-               public ProjectionEntry(ProjectionSide side, int[] keys) {
-                       this.side = side;
-                       this.keys = keys;
-               }
-
-               @Override
-               public String toString() {
-                       return side + " - " + Arrays.toString(keys);
-               }
-       }
-
-       public enum ProjectionSide {
-               FIRST,
-               SECOND
-       }
-
-       public enum DatasizeHint {
-               NONE,
-               TINY,
-               HUGE
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
deleted file mode 100644
index ca252f8..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/PlanBinder.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.flink.api.common.typeutils.CompositeType;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvInputFormat;
-import org.apache.flink.api.java.io.PrintingOutputFormat;
-import org.apache.flink.api.java.operators.AggregateOperator;
-import org.apache.flink.api.java.operators.CrossOperator.DefaultCross;
-import org.apache.flink.api.java.operators.CrossOperator.ProjectCross;
-import org.apache.flink.api.java.operators.Grouping;
-import org.apache.flink.api.java.operators.JoinOperator.DefaultJoin;
-import org.apache.flink.api.java.operators.JoinOperator.ProjectJoin;
-import org.apache.flink.api.java.operators.SortedGrouping;
-import org.apache.flink.api.java.operators.UdfOperator;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
-import 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint;
-import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.HUGE;
-import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.NONE;
-import static 
org.apache.flink.languagebinding.api.java.common.OperationInfo.DatasizeHint.TINY;
-import 
org.apache.flink.languagebinding.api.java.common.OperationInfo.ProjectionEntry;
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-
-/**
- * Generic class to construct a Flink plan based on external data.
- *
- * @param <INFO>
- */
-public abstract class PlanBinder<INFO extends OperationInfo> {
-       public static final String PLANBINDER_CONFIG_BCVAR_COUNT = 
"PLANBINDER_BCVAR_COUNT";
-       public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
-
-       protected static String FLINK_HDFS_PATH = "hdfs:/tmp";
-       public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + "/flink_data";
-
-       public static boolean DEBUG = false;
-
-       protected HashMap<Integer, Object> sets = new HashMap();
-       public static ExecutionEnvironment env;
-       protected Receiver receiver;
-
-       public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
-
-       
//====Plan==========================================================================================================
-       protected void receivePlan() throws IOException {
-               receiveParameters();
-               receiveOperations();
-       }
-
-       
//====Environment===================================================================================================
-       /**
-        * This enum contains the identifiers for all supported environment 
parameters.
-        */
-       private enum Parameters {
-               DOP,
-               MODE,
-               RETRY,
-               DEBUG
-       }
-
-       private void receiveParameters() throws IOException {
-               Integer parameterCount = (Integer) receiver.getRecord(true);
-
-               for (int x = 0; x < parameterCount; x++) {
-                       Tuple value = (Tuple) receiver.getRecord(true);
-                       switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
-                               case DOP:
-                                       Integer dop = (Integer) 
value.getField(1);
-                                       env.setParallelism(dop);
-                                       break;
-                               case MODE:
-                                       FLINK_HDFS_PATH = (Boolean) 
value.getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
-                                       break;
-                               case RETRY:
-                                       int retry = (Integer) value.getField(1);
-                                       env.setNumberOfExecutionRetries(retry);
-                                       break;
-                               case DEBUG:
-                                       DEBUG = (Boolean) value.getField(1);
-                                       break;
-                       }
-               }
-               if (env.getParallelism() < 0) {
-                       env.setParallelism(1);
-               }
-       }
-
-       
//====Operations====================================================================================================
-       /**
-        * This enum contains the identifiers for all supported non-UDF DataSet 
operations.
-        */
-       protected enum Operation {
-               SOURCE_CSV, SOURCE_TEXT, SOURCE_VALUE, SOURCE_SEQ, SINK_CSV, 
SINK_TEXT, SINK_PRINT,
-               PROJECTION, SORT, UNION, FIRST, DISTINCT, GROUPBY, AGGREGATE,
-               REBALANCE, PARTITION_HASH,
-               BROADCAST
-       }
-
-       /**
-        * This enum contains the identifiers for all supported UDF DataSet 
operations.
-        */
-       protected enum AbstractOperation {
-               COGROUP, CROSS, CROSS_H, CROSS_T, FILTER, FLATMAP, GROUPREDUCE, 
JOIN, JOIN_H, JOIN_T, MAP, REDUCE, MAPPARTITION,
-       }
-
-       protected void receiveOperations() throws IOException {
-               Integer operationCount = (Integer) receiver.getRecord(true);
-               for (int x = 0; x < operationCount; x++) {
-                       String identifier = (String) receiver.getRecord();
-                       Operation op = null;
-                       AbstractOperation aop = null;
-                       try {
-                               op = 
Operation.valueOf(identifier.toUpperCase());
-                       } catch (IllegalArgumentException iae) {
-                               try {
-                                       aop = 
AbstractOperation.valueOf(identifier.toUpperCase());
-                               } catch (IllegalArgumentException iae2) {
-                                       throw new 
IllegalArgumentException("Invalid operation specified: " + identifier);
-                               }
-                       }
-                       if (op != null) {
-                               switch (op) {
-                                       case SOURCE_CSV:
-                                               
createCsvSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_TEXT:
-                                               
createTextSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_VALUE:
-                                               
createValueSource(createOperationInfo(op));
-                                               break;
-                                       case SOURCE_SEQ:
-                                               
createSequenceSource(createOperationInfo(op));
-                                               break;
-                                       case SINK_CSV:
-                                               
createCsvSink(createOperationInfo(op));
-                                               break;
-                                       case SINK_TEXT:
-                                               
createTextSink(createOperationInfo(op));
-                                               break;
-                                       case SINK_PRINT:
-                                               
createPrintSink(createOperationInfo(op));
-                                               break;
-                                       case BROADCAST:
-                                               
createBroadcastVariable(createOperationInfo(op));
-                                               break;
-                                       case AGGREGATE:
-                                               
createAggregationOperation(createOperationInfo(op));
-                                               break;
-                                       case DISTINCT:
-                                               
createDistinctOperation(createOperationInfo(op));
-                                               break;
-                                       case FIRST:
-                                               
createFirstOperation(createOperationInfo(op));
-                                               break;
-                                       case PARTITION_HASH:
-                                               
createHashPartitionOperation(createOperationInfo(op));
-                                               break;
-                                       case PROJECTION:
-                                               
createProjectOperation(createOperationInfo(op));
-                                               break;
-                                       case REBALANCE:
-                                               
createRebalanceOperation(createOperationInfo(op));
-                                               break;
-                                       case GROUPBY:
-                                               
createGroupOperation(createOperationInfo(op));
-                                               break;
-                                       case SORT:
-                                               
createSortOperation(createOperationInfo(op));
-                                               break;
-                                       case UNION:
-                                               
createUnionOperation(createOperationInfo(op));
-                                               break;
-                               }
-                       }
-                       if (aop != null) {
-                               switch (aop) {
-                                       case COGROUP:
-                                               
createCoGroupOperation(createOperationInfo(aop));
-                                               break;
-                                       case CROSS:
-                                               createCrossOperation(NONE, 
createOperationInfo(aop));
-                                               break;
-                                       case CROSS_H:
-                                               createCrossOperation(HUGE, 
createOperationInfo(aop));
-                                               break;
-                                       case CROSS_T:
-                                               createCrossOperation(TINY, 
createOperationInfo(aop));
-                                               break;
-                                       case FILTER:
-                                               
createFilterOperation(createOperationInfo(aop));
-                                               break;
-                                       case FLATMAP:
-                                               
createFlatMapOperation(createOperationInfo(aop));
-                                               break;
-                                       case GROUPREDUCE:
-                                               
createGroupReduceOperation(createOperationInfo(aop));
-                                               break;
-                                       case JOIN:
-                                               createJoinOperation(NONE, 
createOperationInfo(aop));
-                                               break;
-                                       case JOIN_H:
-                                               createJoinOperation(HUGE, 
createOperationInfo(aop));
-                                               break;
-                                       case JOIN_T:
-                                               createJoinOperation(TINY, 
createOperationInfo(aop));
-                                               break;
-                                       case MAP:
-                                               
createMapOperation(createOperationInfo(aop));
-                                               break;
-                                       case MAPPARTITION:
-                                               
createMapPartitionOperation(createOperationInfo(aop));
-                                               break;
-                                       case REDUCE:
-                                               
createReduceOperation(createOperationInfo(aop));
-                                               break;
-                               }
-                       }
-               }
-       }
-
-       /**
-        * This method creates an OperationInfo object based on the 
operation-identifier passed.
-        *
-        * @param operationIdentifier
-        * @return
-        * @throws IOException
-        */
-       protected OperationInfo createOperationInfo(Operation 
operationIdentifier) throws IOException {
-               return new OperationInfo(receiver, operationIdentifier);
-       }
-
-       /**
-        * This method creates an OperationInfo object based on the 
operation-identifier passed.
-        *
-        * @param operationIdentifier
-        * @return
-        * @throws IOException
-        */
-       protected abstract INFO createOperationInfo(AbstractOperation 
operationIdentifier) throws IOException;
-
-       private void createCsvSource(OperationInfo info) throws IOException {
-               if (!(info.types instanceof CompositeType)) {
-                       throw new RuntimeException("The output type of a csv 
source has to be a tuple or a " +
-                               "pojo type. The derived type is " + info);
-               }
-
-               sets.put(info.setID, env.createInput(new CsvInputFormat(new 
Path(info.path),
-                       info.lineDelimiter, info.fieldDelimiter, 
(CompositeType)info.types), info.types)
-                       .name("CsvSource"));
-       }
-
-       private void createTextSource(OperationInfo info) throws IOException {
-               sets.put(info.setID, 
env.readTextFile(info.path).name("TextSource"));
-       }
-
-       private void createValueSource(OperationInfo info) throws IOException {
-               sets.put(info.setID, 
env.fromElements(info.values).name("ValueSource"));
-       }
-
-       private void createSequenceSource(OperationInfo info) throws 
IOException {
-               sets.put(info.setID, env.generateSequence(info.from, 
info.to).name("SequenceSource"));
-       }
-
-       private void createCsvSink(OperationInfo info) throws IOException {
-               DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.writeAsCsv(info.path, info.lineDelimiter, 
info.fieldDelimiter, info.writeMode).name("CsvSink");
-       }
-
-       private void createTextSink(OperationInfo info) throws IOException {
-               DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.writeAsText(info.path, info.writeMode).name("TextSink");
-       }
-
-       private void createPrintSink(OperationInfo info) throws IOException {
-               DataSet parent = (DataSet) sets.get(info.parentID);
-               parent.output(new PrintingOutputFormat(info.toError));
-       }
-
-       private void createBroadcastVariable(OperationInfo info) throws 
IOException {
-               UdfOperator op1 = (UdfOperator) sets.get(info.parentID);
-               DataSet op2 = (DataSet) sets.get(info.otherID);
-
-               op1.withBroadcastSet(op2, info.name);
-               Configuration c = ((UdfOperator) op1).getParameters();
-
-               if (c == null) {
-                       c = new Configuration();
-               }
-
-               int count = c.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
-               c.setInteger(PLANBINDER_CONFIG_BCVAR_COUNT, count + 1);
-               c.setString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + count, 
info.name);
-
-               op1.withParameters(c);
-       }
-
-       private void createAggregationOperation(OperationInfo info) throws 
IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               AggregateOperator ao = op.aggregate(info.aggregates[0].agg, 
info.aggregates[0].field);
-
-               for (int x = 1; x < info.count; x++) {
-                       ao = ao.and(info.aggregates[x].agg, 
info.aggregates[x].field);
-               }
-
-               sets.put(info.setID, ao.name("Aggregation"));
-       }
-
-       private void createDistinctOperation(OperationInfo info) throws 
IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, info.keys.length == 0 ? op.distinct() : 
op.distinct(info.keys).name("Distinct"));
-       }
-
-       private void createFirstOperation(OperationInfo info) throws 
IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op.first(info.count).name("First"));
-       }
-
-       private void createGroupOperation(OperationInfo info) throws 
IOException {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.groupBy(info.keys));
-       }
-
-       private void createHashPartitionOperation(OperationInfo info) throws 
IOException {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op1.partitionByHash(info.keys));
-       }
-
-       private void createProjectOperation(OperationInfo info) throws 
IOException {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, 
op1.project(info.fields).name("Projection"));
-       }
-
-       private void createRebalanceOperation(OperationInfo info) throws 
IOException {
-               DataSet op = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, op.rebalance().name("Rebalance"));
-       }
-
-       private void createSortOperation(OperationInfo info) throws IOException 
{
-               Grouping op1 = (Grouping) sets.get(info.parentID);
-               if (op1 instanceof UnsortedGrouping) {
-                       sets.put(info.setID, ((UnsortedGrouping) 
op1).sortGroup(info.field, info.order));
-                       return;
-               }
-               if (op1 instanceof SortedGrouping) {
-                       sets.put(info.setID, ((SortedGrouping) 
op1).sortGroup(info.field, info.order));
-               }
-       }
-
-       private void createUnionOperation(OperationInfo info) throws 
IOException {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               DataSet op2 = (DataSet) sets.get(info.otherID);
-               sets.put(info.setID, op1.union(op2).name("Union"));
-       }
-
-       private void createCoGroupOperation(INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               DataSet op2 = (DataSet) sets.get(info.otherID);
-               sets.put(info.setID, applyCoGroupOperation(op1, op2, 
info.keys1, info.keys2, info));
-       }
-
-       private void createCrossOperation(DatasizeHint mode, INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               DataSet op2 = (DataSet) sets.get(info.otherID);
-
-               if (info.types != null && (info.projections == null || 
info.projections.length == 0)) {
-                       sets.put(info.setID, applyCrossOperation(op1, op2, 
mode, info));
-               } else {
-                       DefaultCross defaultResult;
-                       switch (mode) {
-                               case NONE:
-                                       defaultResult = op1.cross(op2);
-                                       break;
-                               case HUGE:
-                                       defaultResult = op1.crossWithHuge(op2);
-                                       break;
-                               case TINY:
-                                       defaultResult = op1.crossWithTiny(op2);
-                                       break;
-                               default:
-                                       throw new 
IllegalArgumentException("Invalid Cross mode specified: " + mode);
-                       }
-                       if (info.projections.length == 0) {
-                               sets.put(info.setID, 
defaultResult.name("DefaultCross"));
-                       } else {
-                               ProjectCross project = null;
-                               for (ProjectionEntry pe : info.projections) {
-                                       switch (pe.side) {
-                                               case FIRST:
-                                                       project = project == 
null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
-                                                       break;
-                                               case SECOND:
-                                                       project = project == 
null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
-                                                       break;
-                                       }
-                               }
-                               sets.put(info.setID, 
project.name("ProjectCross"));
-                       }
-               }
-       }
-
-       private void createFilterOperation(INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, applyFilterOperation(op1, info));
-       }
-
-       private void createFlatMapOperation(INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, applyFlatMapOperation(op1, info));
-       }
-
-       private void createGroupReduceOperation(INFO info) {
-               Object op1 = sets.get(info.parentID);
-               if (op1 instanceof DataSet) {
-                       sets.put(info.setID, 
applyGroupReduceOperation((DataSet) op1, info));
-                       return;
-               }
-               if (op1 instanceof UnsortedGrouping) {
-                       sets.put(info.setID, 
applyGroupReduceOperation((UnsortedGrouping) op1, info));
-                       return;
-               }
-               if (op1 instanceof SortedGrouping) {
-                       sets.put(info.setID, 
applyGroupReduceOperation((SortedGrouping) op1, info));
-               }
-       }
-
-       private void createJoinOperation(DatasizeHint mode, INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               DataSet op2 = (DataSet) sets.get(info.otherID);
-
-               if (info.types != null && (info.projections == null || 
info.projections.length == 0)) {
-                       sets.put(info.setID, applyJoinOperation(op1, op2, 
info.keys1, info.keys2, mode, info));
-               } else {
-                       DefaultJoin defaultResult = createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode);
-                       if (info.projections.length == 0) {
-                               sets.put(info.setID, 
defaultResult.name("DefaultJoin"));
-                       } else {
-                               ProjectJoin project = null;
-                               for (ProjectionEntry pe : info.projections) {
-                                       switch (pe.side) {
-                                               case FIRST:
-                                                       project = project == 
null ? defaultResult.projectFirst(pe.keys) : project.projectFirst(pe.keys);
-                                                       break;
-                                               case SECOND:
-                                                       project = project == 
null ? defaultResult.projectSecond(pe.keys) : project.projectSecond(pe.keys);
-                                                       break;
-                                       }
-                               }
-                               sets.put(info.setID, 
project.name("ProjectJoin"));
-                       }
-               }
-       }
-
-       protected DefaultJoin createDefaultJoin(DataSet op1, DataSet op2, 
String[] firstKeys, String[] secondKeys, DatasizeHint mode) {
-               switch (mode) {
-                       case NONE:
-                               return 
op1.join(op2).where(firstKeys).equalTo(secondKeys);
-                       case HUGE:
-                               return 
op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys);
-                       case TINY:
-                               return 
op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys);
-                       default:
-                               throw new IllegalArgumentException("Invalid 
join mode specified.");
-               }
-       }
-
-       private void createMapOperation(INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, applyMapOperation(op1, info));
-       }
-
-       private void createMapPartitionOperation(INFO info) {
-               DataSet op1 = (DataSet) sets.get(info.parentID);
-               sets.put(info.setID, applyMapPartitionOperation(op1, info));
-       }
-
-       private void createReduceOperation(INFO info) {
-               Object op1 = sets.get(info.parentID);
-               if (op1 instanceof DataSet) {
-                       sets.put(info.setID, applyReduceOperation((DataSet) 
op1, info));
-                       return;
-               }
-               if (op1 instanceof UnsortedGrouping) {
-                       sets.put(info.setID, 
applyReduceOperation((UnsortedGrouping) op1, info));
-               }
-       }
-
-       protected abstract DataSet applyCoGroupOperation(DataSet op1, DataSet 
op2, String[] firstKeys, String[] secondKeys, INFO info);
-
-       protected abstract DataSet applyCrossOperation(DataSet op1, DataSet 
op2, DatasizeHint mode, INFO info);
-
-       protected abstract DataSet applyFilterOperation(DataSet op1, INFO info);
-
-       protected abstract DataSet applyFlatMapOperation(DataSet op1, INFO 
info);
-
-       protected abstract DataSet applyGroupReduceOperation(DataSet op1, INFO 
info);
-
-       protected abstract DataSet applyGroupReduceOperation(UnsortedGrouping 
op1, INFO info);
-
-       protected abstract DataSet applyGroupReduceOperation(SortedGrouping 
op1, INFO info);
-
-       protected abstract DataSet applyJoinOperation(DataSet op1, DataSet op2, 
String[] firstKeys, String[] secondKeys, DatasizeHint mode, INFO info);
-
-       protected abstract DataSet applyMapOperation(DataSet op1, INFO info);
-
-       protected abstract DataSet applyMapPartitionOperation(DataSet op1, INFO 
info);
-
-       protected abstract DataSet applyReduceOperation(DataSet op1, INFO info);
-
-       protected abstract DataSet applyReduceOperation(UnsortedGrouping op1, 
INFO info);
-
-       
//====Utility=======================================================================================================
-       protected static String[] normalizeKeys(Object keys) {
-               if (keys instanceof Tuple) {
-                       Tuple tupleKeys = (Tuple) keys;
-                       if (tupleKeys.getArity() == 0) {
-                               return new String[0];
-                       }
-                       if (tupleKeys.getField(0) instanceof Integer) {
-                               String[] stringKeys = new 
String[tupleKeys.getArity()];
-                               for (int x = 0; x < stringKeys.length; x++) {
-                                       stringKeys[x] = "f" + (Integer) 
tupleKeys.getField(x);
-                               }
-                               return stringKeys;
-                       }
-                       if (tupleKeys.getField(0) instanceof String) {
-                               return tupleToStringArray(tupleKeys);
-                       }
-                       throw new RuntimeException("Key argument contains field 
that is neither an int nor a String.");
-               }
-               if (keys instanceof int[]) {
-                       int[] intKeys = (int[]) keys;
-                       String[] stringKeys = new String[intKeys.length];
-                       for (int x = 0; x < stringKeys.length; x++) {
-                               stringKeys[x] = "f" + intKeys[x];
-                       }
-                       return stringKeys;
-               }
-               throw new RuntimeException("Key argument is neither an int[] 
nor a Tuple.");
-       }
-
-       protected static int[] toIntArray(Object key) {
-               if (key instanceof Tuple) {
-                       Tuple tuple = (Tuple) key;
-                       int[] keys = new int[tuple.getArity()];
-                       for (int y = 0; y < tuple.getArity(); y++) {
-                               keys[y] = (Integer) tuple.getField(y);
-                       }
-                       return keys;
-               }
-               if (key instanceof int[]) {
-                       return (int[]) key;
-               }
-               throw new RuntimeException("Key argument is neither an int[] 
nor a Tuple.");
-       }
-
-       protected static String[] tupleToStringArray(Tuple tuple) {
-               String[] keys = new String[tuple.getArity()];
-               for (int y = 0; y < tuple.getArity(); y++) {
-                       keys[y] = (String) tuple.getField(y);
-               }
-               return keys;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
deleted file mode 100644
index 59ed20c..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-//CHECKSTYLE.OFF: AvoidStarImport - tuple imports
-import org.apache.flink.api.java.tuple.*;
-import static 
org.apache.flink.languagebinding.api.java.common.streaming.Sender.*;
-//CHECKSTYLE.ON: AvoidStarImport
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-import org.apache.flink.util.Collector;
-
-/**
- * General-purpose class to read data from memory-mapped files.
- */
-public class Receiver implements Serializable {
-       private static final long serialVersionUID = -2474088929850009968L;
-
-       private final AbstractRichFunction function;
-
-       private File inputFile;
-       private RandomAccessFile inputRAF;
-       private FileChannel inputChannel;
-       private MappedByteBuffer fileBuffer;
-
-       private Deserializer<?> deserializer = null;
-
-       public Receiver(AbstractRichFunction function) {
-               this.function = function;
-       }
-
-       
//=====Setup========================================================================================================
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
-       }
-
-       private void setupMappedFile(String inputFilePath) throws 
FileNotFoundException, IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
-
-               inputFile = new File(inputFilePath);
-               if (inputFile.exists()) {
-                       inputFile.delete();
-               }
-               inputFile.createNewFile();
-               inputRAF = new RandomAccessFile(inputFilePath, "rw");
-               inputRAF.setLength(MAPPED_FILE_SIZE);
-               inputRAF.seek(MAPPED_FILE_SIZE - 1);
-               inputRAF.writeByte(0);
-               inputRAF.seek(0);
-               inputChannel = inputRAF.getChannel();
-               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
-       }
-
-       public void close() throws IOException {
-               closeMappedFile();
-       }
-
-       private void closeMappedFile() throws IOException {
-               inputChannel.close();
-               inputRAF.close();
-       }
-
-       
//=====Record-API===================================================================================================
-       /**
-        * Loads a buffer from the memory-mapped file. The records contained 
within the buffer can be accessed using
-        * collectRecord(). These records do not necessarily have to be of the 
same type. This method requires external
-        * synchronization.
-        *
-        * @throws IOException
-        */
-       private void loadBuffer() throws IOException {
-               int count = 0;
-               while (fileBuffer.get(0) == 0 && count < 10) {
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException ie) {
-                       }
-                       fileBuffer.load();
-                       count++;
-               }
-               if (fileBuffer.get(0) == 0) {
-                       throw new RuntimeException("External process not 
responding.");
-               }
-               fileBuffer.position(1);
-       }
-
-       /**
-        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
-        * similar. The PlanBinder requires a method that can return any kind 
of object.
-        *
-        * @return read record
-        * @throws IOException
-        */
-       public Object getRecord() throws IOException {
-               return getRecord(false);
-       }
-
-       /**
-        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
-        * similar. The PlanBinder requires a method that can return any kind 
of object.
-        *
-        * @param normalized flag indicating whether certain types should be 
normalized
-        * @return read record
-        * @throws IOException
-        */
-       public Object getRecord(boolean normalized) throws IOException {
-               if (fileBuffer.position() == 0) {
-                       loadBuffer();
-               }
-               return receiveField(normalized);
-       }
-
-       /**
-        * Reads a single primitive value or tuple from the buffer.
-        *
-        * @return primitive value or tuple
-        * @throws IOException
-        */
-       private Object receiveField(boolean normalized) throws IOException {
-               byte type = fileBuffer.get();
-               switch (type) {
-                       case TYPE_TUPLE:
-                               int tupleSize = fileBuffer.get();
-                               Tuple tuple = createTuple(tupleSize);
-                               for (int x = 0; x < tupleSize; x++) {
-                                       
tuple.setField(receiveField(normalized), x);
-                               }
-                               return tuple;
-                       case TYPE_BOOLEAN:
-                               return fileBuffer.get() == 1;
-                       case TYPE_BYTE:
-                               return fileBuffer.get();
-                       case TYPE_SHORT:
-                               if (normalized) {
-                                       return (int) fileBuffer.getShort();
-                               } else {
-                                       return fileBuffer.getShort();
-                               }
-                       case TYPE_INTEGER:
-                               return fileBuffer.getInt();
-                       case TYPE_LONG:
-                               if (normalized) {
-                                       return new 
Long(fileBuffer.getLong()).intValue();
-                               } else {
-                                       return fileBuffer.getLong();
-                               }
-                       case TYPE_FLOAT:
-                               if (normalized) {
-                                       return (double) fileBuffer.getFloat();
-                               } else {
-                                       return fileBuffer.getFloat();
-                               }
-                       case TYPE_DOUBLE:
-                               return fileBuffer.getDouble();
-                       case TYPE_STRING:
-                               int stringSize = fileBuffer.getInt();
-                               byte[] buffer = new byte[stringSize];
-                               fileBuffer.get(buffer);
-                               return new String(buffer);
-                       case TYPE_BYTES:
-                               int bytessize = fileBuffer.getInt();
-                               byte[] bytebuffer = new byte[bytessize];
-                               fileBuffer.get(bytebuffer);
-                               return bytebuffer;
-                       case TYPE_NULL:
-                               return null;
-                       default:
-                               throw new IllegalArgumentException("Unknown 
TypeID encountered: " + type);
-               }
-       }
-
-       
//=====Buffered-API=================================================================================================
-       /**
-        * Reads a buffer of the given size from the memory-mapped file, and 
collects all records contained. This method
-        * assumes that all values in the buffer are of the same type. This 
method does NOT take care of synchronization.
-        * The user must guarantee that the buffer was completely written 
before calling this method.
-        *
-        * @param c Collector to collect records
-        * @param bufferSize size of the buffer
-        * @throws IOException
-        */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       public void collectBuffer(Collector c, int bufferSize) throws 
IOException {
-               fileBuffer.position(0);
-
-               if (deserializer == null) {
-                       byte type = fileBuffer.get();
-                       deserializer = getDeserializer(type);
-               }
-               while (fileBuffer.position() < bufferSize) {
-                       c.collect(deserializer.deserialize());
-               }
-       }
-
-       
//=====Deserializer=================================================================================================
-       private Deserializer<?> getDeserializer(byte type) {
-               switch (type) {
-                       case TYPE_TUPLE:
-                               return new TupleDeserializer();
-                       case TYPE_BOOLEAN:
-                               return new BooleanDeserializer();
-                       case TYPE_BYTE:
-                               return new ByteDeserializer();
-                       case TYPE_BYTES:
-                               return new BytesDeserializer();
-                       case TYPE_SHORT:
-                               return new ShortDeserializer();
-                       case TYPE_INTEGER:
-                               return new IntDeserializer();
-                       case TYPE_LONG:
-                               return new LongDeserializer();
-                       case TYPE_STRING:
-                               return new StringDeserializer();
-                       case TYPE_FLOAT:
-                               return new FloatDeserializer();
-                       case TYPE_DOUBLE:
-                               return new DoubleDeserializer();
-                       case TYPE_NULL:
-                               return new NullDeserializer();
-                       default:
-                               throw new IllegalArgumentException("Unknown 
TypeID encountered: " + type);
-
-               }
-       }
-
-       private interface Deserializer<T> {
-               public T deserialize();
-
-       }
-
-       private class BooleanDeserializer implements Deserializer<Boolean> {
-               @Override
-               public Boolean deserialize() {
-                       return fileBuffer.get() == 1;
-               }
-       }
-
-       private class ByteDeserializer implements Deserializer<Byte> {
-               @Override
-               public Byte deserialize() {
-                       return fileBuffer.get();
-               }
-       }
-
-       private class ShortDeserializer implements Deserializer<Short> {
-               @Override
-               public Short deserialize() {
-                       return fileBuffer.getShort();
-               }
-       }
-
-       private class IntDeserializer implements Deserializer<Integer> {
-               @Override
-               public Integer deserialize() {
-                       return fileBuffer.getInt();
-               }
-       }
-
-       private class LongDeserializer implements Deserializer<Long> {
-               @Override
-               public Long deserialize() {
-                       return fileBuffer.getLong();
-               }
-       }
-
-       private class FloatDeserializer implements Deserializer<Float> {
-               @Override
-               public Float deserialize() {
-                       return fileBuffer.getFloat();
-               }
-       }
-
-       private class DoubleDeserializer implements Deserializer<Double> {
-               @Override
-               public Double deserialize() {
-                       return fileBuffer.getDouble();
-               }
-       }
-
-       private class StringDeserializer implements Deserializer<String> {
-               private int size;
-
-               @Override
-               public String deserialize() {
-                       size = fileBuffer.getInt();
-                       byte[] buffer = new byte[size];
-                       fileBuffer.get(buffer);
-                       return new String(buffer);
-               }
-       }
-
-       private class NullDeserializer implements Deserializer<Object> {
-               @Override
-               public Object deserialize() {
-                       return null;
-               }
-       }
-
-       private class BytesDeserializer implements Deserializer<byte[]> {
-               @Override
-               public byte[] deserialize() {
-                       int length = fileBuffer.getInt();
-                       byte[] result = new byte[length];
-                       fileBuffer.get(result);
-                       return result;
-               }
-
-       }
-
-       private class TupleDeserializer implements Deserializer<Tuple> {
-               Deserializer<?>[] deserializer = null;
-               Tuple reuse;
-
-               public TupleDeserializer() {
-                       int size = fileBuffer.getInt();
-                       reuse = createTuple(size);
-                       deserializer = new Deserializer[size];
-                       for (int x = 0; x < deserializer.length; x++) {
-                               deserializer[x] = 
getDeserializer(fileBuffer.get());
-                       }
-               }
-
-               @Override
-               public Tuple deserialize() {
-                       for (int x = 0; x < deserializer.length; x++) {
-                               reuse.setField(deserializer[x].deserialize(), 
x);
-                       }
-                       return reuse;
-               }
-       }
-
-       public static Tuple createTuple(int size) {
-               try {
-                       return Tuple.getTupleClass(size).newInstance();
-               } catch (InstantiationException e) {
-                       throw new RuntimeException(e);
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
deleted file mode 100644
index 3e0c317..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.MAPPED_FILE_SIZE;
-
-/**
- * General-purpose class to write data to memory-mapped files.
- */
-public class Sender implements Serializable {
-       public static final byte TYPE_TUPLE = (byte) 11;
-       public static final byte TYPE_BOOLEAN = (byte) 10;
-       public static final byte TYPE_BYTE = (byte) 9;
-       public static final byte TYPE_SHORT = (byte) 8;
-       public static final byte TYPE_INTEGER = (byte) 7;
-       public static final byte TYPE_LONG = (byte) 6;
-       public static final byte TYPE_DOUBLE = (byte) 4;
-       public static final byte TYPE_FLOAT = (byte) 5;
-       public static final byte TYPE_CHAR = (byte) 3;
-       public static final byte TYPE_STRING = (byte) 2;
-       public static final byte TYPE_BYTES = (byte) 1;
-       public static final byte TYPE_NULL = (byte) 0;
-
-       private final AbstractRichFunction function;
-
-       private File outputFile;
-       private RandomAccessFile outputRAF;
-       private FileChannel outputChannel;
-       private MappedByteBuffer fileBuffer;
-
-       private final ByteBuffer[] saved = new ByteBuffer[2];
-
-       private final Serializer[] serializer = new Serializer[2];
-
-       public Sender(AbstractRichFunction function) {
-               this.function = function;
-       }
-
-       
//=====Setup========================================================================================================
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
-       }
-
-       private void setupMappedFile(String outputFilePath) throws 
FileNotFoundException, IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
-
-               outputFile = new File(outputFilePath);
-               if (outputFile.exists()) {
-                       outputFile.delete();
-               }
-               outputFile.createNewFile();
-               outputRAF = new RandomAccessFile(outputFilePath, "rw");
-               outputRAF.setLength(MAPPED_FILE_SIZE);
-               outputRAF.seek(MAPPED_FILE_SIZE - 1);
-               outputRAF.writeByte(0);
-               outputRAF.seek(0);
-               outputChannel = outputRAF.getChannel();
-               fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
-       }
-
-       public void close() throws IOException {
-               closeMappedFile();
-       }
-
-       private void closeMappedFile() throws IOException {
-               outputChannel.close();
-               outputRAF.close();
-       }
-
-       /**
-        * Resets this object to the post-configuration state.
-        */
-       public void reset() {
-               serializer[0] = null;
-               serializer[1] = null;
-               fileBuffer.clear();
-       }
-
-       
//=====Serialization================================================================================================
-       /**
-        * Writes a single record to the memory-mapped file. This method does 
NOT take care of synchronization. The user
-        * must guarantee that the file may be written to before calling this 
method. This method essentially reserves the
-        * whole buffer for one record. As such it imposes some performance 
restrictions and should only be used when
-        * absolutely necessary.
-        *
-        * @param value record to send
-        * @return size of the written buffer
-        * @throws IOException
-        */
-       public int sendRecord(Object value) throws IOException {
-               fileBuffer.clear();
-               int group = 0;
-
-               serializer[group] = getSerializer(value);
-               ByteBuffer bb = serializer[group].serialize(value);
-               if (bb.remaining() > MAPPED_FILE_SIZE) {
-                       throw new RuntimeException("Serialized object does not 
fit into a single buffer.");
-               }
-               fileBuffer.put(bb);
-
-               int size = fileBuffer.position();
-
-               reset();
-               return size;
-       }
-
-       public boolean hasRemaining(int group) {
-               return saved[group] != null;
-       }
-
-       /**
-        * Extracts records from an iterator and writes them to the 
memory-mapped file. This method assumes that all values
-        * in the iterator are of the same type. This method does NOT take care 
of synchronization. The caller must
-        * guarantee that the file may be written to before calling this method.
-        *
-        * @param i iterator containing records
-        * @param group group to which the iterator belongs, most notably used 
by CoGroup-functions.
-        * @return size of the written buffer
-        * @throws IOException
-        */
-       public int sendBuffer(Iterator i, int group) throws IOException {
-               fileBuffer.clear();
-
-               Object value;
-               ByteBuffer bb;
-               if (serializer[group] == null) {
-                       value = i.next();
-                       serializer[group] = getSerializer(value);
-                       bb = serializer[group].serialize(value);
-                       if (bb.remaining() > MAPPED_FILE_SIZE) {
-                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
-                       }
-                       fileBuffer.put(bb);
-
-               }
-               if (saved[group] != null) {
-                       fileBuffer.put(saved[group]);
-                       saved[group] = null;
-               }
-               while (i.hasNext() && saved[group] == null) {
-                       value = i.next();
-                       bb = serializer[group].serialize(value);
-                       if (bb.remaining() > MAPPED_FILE_SIZE) {
-                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
-                       }
-                       if (bb.remaining() <= fileBuffer.remaining()) {
-                               fileBuffer.put(bb);
-                       } else {
-                               saved[group] = bb;
-                       }
-               }
-
-               int size = fileBuffer.position();
-               return size;
-       }
-
-       private enum SupportedTypes {
-               TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, 
FLOAT, DOUBLE, STRING, OTHER, NULL
-       }
-
-       
//=====Serializer===================================================================================================
-       private Serializer getSerializer(Object value) throws IOException {
-               String className = 
value.getClass().getSimpleName().toUpperCase();
-               if (className.startsWith("TUPLE")) {
-                       className = "TUPLE";
-               }
-               if (className.startsWith("BYTE[]")) {
-                       className = "BYTES";
-               }
-               SupportedTypes type = SupportedTypes.valueOf(className);
-               switch (type) {
-                       case TUPLE:
-                               fileBuffer.put(TYPE_TUPLE);
-                               fileBuffer.putInt(((Tuple) value).getArity());
-                               return new TupleSerializer((Tuple) value);
-                       case BOOLEAN:
-                               fileBuffer.put(TYPE_BOOLEAN);
-                               return new BooleanSerializer();
-                       case BYTE:
-                               fileBuffer.put(TYPE_BYTE);
-                               return new ByteSerializer();
-                       case BYTES:
-                               fileBuffer.put(TYPE_BYTES);
-                               return new BytesSerializer();
-                       case CHARACTER:
-                               fileBuffer.put(TYPE_CHAR);
-                               return new CharSerializer();
-                       case SHORT:
-                               fileBuffer.put(TYPE_SHORT);
-                               return new ShortSerializer();
-                       case INTEGER:
-                               fileBuffer.put(TYPE_INTEGER);
-                               return new IntSerializer();
-                       case LONG:
-                               fileBuffer.put(TYPE_LONG);
-                               return new LongSerializer();
-                       case STRING:
-                               fileBuffer.put(TYPE_STRING);
-                               return new StringSerializer();
-                       case FLOAT:
-                               fileBuffer.put(TYPE_FLOAT);
-                               return new FloatSerializer();
-                       case DOUBLE:
-                               fileBuffer.put(TYPE_DOUBLE);
-                               return new DoubleSerializer();
-                       case NULL:
-                               fileBuffer.put(TYPE_NULL);
-                               return new NullSerializer();
-                       default:
-                               throw new IllegalArgumentException("Unknown 
Type encountered: " + type);
-               }
-       }
-
-       private abstract class Serializer<T> {
-               protected ByteBuffer buffer;
-
-               public Serializer(int capacity) {
-                       buffer = ByteBuffer.allocate(capacity);
-               }
-
-               public ByteBuffer serialize(T value) {
-                       buffer.clear();
-                       serializeInternal(value);
-                       buffer.flip();
-                       return buffer;
-               }
-
-               public abstract void serializeInternal(T value);
-       }
-
-       private class ByteSerializer extends Serializer<Byte> {
-               public ByteSerializer() {
-                       super(1);
-               }
-
-               @Override
-               public void serializeInternal(Byte value) {
-                       buffer.put(value);
-               }
-       }
-
-       private class BooleanSerializer extends Serializer<Boolean> {
-               public BooleanSerializer() {
-                       super(1);
-               }
-
-               @Override
-               public void serializeInternal(Boolean value) {
-                       buffer.put(value ? (byte) 1 : (byte) 0);
-               }
-       }
-
-       private class CharSerializer extends Serializer<Character> {
-               public CharSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Character value) {
-                       buffer.put((value + "").getBytes());
-               }
-       }
-
-       private class ShortSerializer extends Serializer<Short> {
-               public ShortSerializer() {
-                       super(2);
-               }
-
-               @Override
-               public void serializeInternal(Short value) {
-                       buffer.putShort(value);
-               }
-       }
-
-       private class IntSerializer extends Serializer<Integer> {
-               public IntSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Integer value) {
-                       buffer.putInt(value);
-               }
-       }
-
-       private class LongSerializer extends Serializer<Long> {
-               public LongSerializer() {
-                       super(8);
-               }
-
-               @Override
-               public void serializeInternal(Long value) {
-                       buffer.putLong(value);
-               }
-       }
-
-       private class StringSerializer extends Serializer<String> {
-               public StringSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(String value) {
-                       byte[] bytes = value.getBytes();
-                       buffer = ByteBuffer.allocate(bytes.length + 4);
-                       buffer.putInt(bytes.length);
-                       buffer.put(bytes);
-               }
-       }
-
-       private class FloatSerializer extends Serializer<Float> {
-               public FloatSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Float value) {
-                       buffer.putFloat(value);
-               }
-       }
-
-       private class DoubleSerializer extends Serializer<Double> {
-               public DoubleSerializer() {
-                       super(8);
-               }
-
-               @Override
-               public void serializeInternal(Double value) {
-                       buffer.putDouble(value);
-               }
-       }
-
-       private class NullSerializer extends Serializer<Object> {
-               public NullSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(Object value) {
-               }
-       }
-
-       private class BytesSerializer extends Serializer<byte[]> {
-               public BytesSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(byte[] value) {
-                       buffer = ByteBuffer.allocate(4 + value.length);
-                       buffer.putInt(value.length);
-                       buffer.put(value);
-               }
-       }
-
-       private class TupleSerializer extends Serializer<Tuple> {
-               private final Serializer[] serializer;
-               private final List<ByteBuffer> buffers;
-
-               public TupleSerializer(Tuple value) throws IOException {
-                       super(0);
-                       serializer = new Serializer[value.getArity()];
-                       buffers = new ArrayList();
-                       for (int x = 0; x < serializer.length; x++) {
-                               serializer[x] = 
getSerializer(value.getField(x));
-                       }
-               }
-
-               @Override
-               public void serializeInternal(Tuple value) {
-                       int length = 0;
-                       for (int x = 0; x < serializer.length; x++) {
-                               serializer[x].buffer.clear();
-                               
serializer[x].serializeInternal(value.getField(x));
-                               length += serializer[x].buffer.position();
-                               buffers.add(serializer[x].buffer);
-                       }
-                       buffer = ByteBuffer.allocate(length);
-                       for (ByteBuffer b : buffers) {
-                               b.flip();
-                               buffer.put(b);
-                       }
-                       buffers.clear();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
deleted file mode 100644
index 1ad0606..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/StreamPrinter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Simple utility class to print all contents of an inputstream to stdout.
- */
-public class StreamPrinter extends Thread {
-       private final BufferedReader reader;
-       private final boolean wrapInException;
-       private StringBuilder msg;
-
-       public StreamPrinter(InputStream stream) {
-               this(stream, false, null);
-       }
-
-       public StreamPrinter(InputStream stream, boolean wrapInException, 
StringBuilder msg) {
-               this.reader = new BufferedReader(new InputStreamReader(stream));
-               this.wrapInException = wrapInException;
-               this.msg = msg;
-       }
-
-       @Override
-       public void run() {
-               String line;
-               try {
-                       if (wrapInException) {
-                               while ((line = reader.readLine()) != null) {
-                                       msg.append("\n" + line);
-                               }
-                       } else {
-                               while ((line = reader.readLine()) != null) {
-                                       System.out.println(line);
-                                       System.out.flush();
-                               }
-                       }
-               } catch (IOException ex) {
-               }
-       }
-}

Reply via email to