http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
deleted file mode 100644
index 8b19425..0000000
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.common.streaming;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class is the basis for using an external process within a Java Flink 
operator. It contains logic to send and
- * receive data, while taking care of synchronization.
- */
-public abstract class Streamer implements Serializable {
-       protected static final Logger LOG = 
LoggerFactory.getLogger(Streamer.class);
-       private static final int SIGNAL_BUFFER_REQUEST = 0;
-       private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
-       private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
-       private static final int SIGNAL_FINISHED = -1;
-       private static final int SIGNAL_ERROR = -2;
-       private static final byte SIGNAL_LAST = 32;
-
-       private final byte[] buffer = new byte[4];
-
-       protected ServerSocket server;
-       protected Socket socket;
-       protected InputStream in;
-       protected OutputStream out;
-       protected int port;
-
-       protected Sender sender;
-       protected Receiver receiver;
-
-       protected StringBuilder msg = new StringBuilder();
-
-       protected final AbstractRichFunction function;
-
-       public Streamer(AbstractRichFunction function) {
-               this.function = function;
-               sender = new Sender(function);
-               receiver = new Receiver(function);
-       }
-
-       public void open() throws IOException {
-               server = new ServerSocket(0);
-               setupProcess();
-       }
-
-       /**
-        * This method opens all required resources-
-        *
-        * @throws IOException
-        */
-       public abstract void setupProcess() throws IOException;
-
-       /**
-        * This method closes all previously opened resources.
-        *
-        * @throws IOException
-        */
-       public void close() throws IOException {
-               socket.close();
-               sender.close();
-               receiver.close();
-       }
-
-       private void sendWriteNotification(int size, boolean hasNext) throws 
IOException {
-               byte[] tmp = new byte[5];
-               putInt(tmp, 0, size);
-               tmp[4] = hasNext ? 0 : SIGNAL_LAST;
-               out.write(tmp, 0, 5);
-               out.flush();
-       }
-
-       private void sendReadConfirmation() throws IOException {
-               out.write(new byte[1], 0, 1);
-               out.flush();
-       }
-
-       private void checkForError() {
-               if (getInt(buffer, 0) == -2) {
-                       try { //wait before terminating to ensure that the 
complete error message is printed
-                               Thread.sleep(2000);
-                       } catch (InterruptedException ex) {
-                       }
-                       throw new RuntimeException(
-                                       "External process for task " + 
function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-               }
-       }
-
-       /**
-        * Sends all broadcast-variables encoded in the configuration to the 
external process.
-        *
-        * @param config configuration object containing broadcast-variable 
count and names
-        * @throws IOException
-        */
-       public final void sendBroadCastVariables(Configuration config) throws 
IOException {
-               try {
-                       int broadcastCount = 
config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
-
-                       String[] names = new String[broadcastCount];
-
-                       for (int x = 0; x < names.length; x++) {
-                               names[x] = 
config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
-                       }
-
-                       in.read(buffer, 0, 4);
-                       checkForError();
-                       int size = sender.sendRecord(broadcastCount);
-                       sendWriteNotification(size, false);
-
-                       for (String name : names) {
-                               Iterator bcv = 
function.getRuntimeContext().getBroadcastVariable(name).iterator();
-
-                               in.read(buffer, 0, 4);
-                               checkForError();
-                               size = sender.sendRecord(name);
-                               sendWriteNotification(size, false);
-
-                               while (bcv.hasNext() || sender.hasRemaining(0)) 
{
-                                       in.read(buffer, 0, 4);
-                                       checkForError();
-                                       size = sender.sendBuffer(bcv, 0);
-                                       sendWriteNotification(size, 
bcv.hasNext() || sender.hasRemaining(0));
-                               }
-                               sender.reset();
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       /**
-        * Sends all values contained in the iterator to the external process 
and collects all results.
-        *
-        * @param i iterator
-        * @param c collector
-        * @throws IOException
-        */
-       public final void streamBufferWithoutGroups(Iterator i, Collector c) 
throws IOException {
-               try {
-                       int size;
-                       if (i.hasNext()) {
-                               while (true) {
-                                       in.read(buffer, 0, 4);
-                                       int sig = getInt(buffer, 0);
-                                       switch (sig) {
-                                               case SIGNAL_BUFFER_REQUEST:
-                                                       if (i.hasNext() || 
sender.hasRemaining(0)) {
-                                                               size = 
sender.sendBuffer(i, 0);
-                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
-                                                       } else {
-                                                               throw new 
RuntimeException("External process requested data even though none is 
available.");
-                                                       }
-                                                       break;
-                                               case SIGNAL_FINISHED:
-                                                       return;
-                                               case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ex) {
-                                                       }
-                                                       throw new 
RuntimeException(
-                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
-                                               default:
-                                                       
receiver.collectBuffer(c, sig);
-                                                       sendReadConfirmation();
-                                                       break;
-                                       }
-                               }
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       /**
-        * Sends all values contained in both iterators to the external process 
and collects all results.
-        *
-        * @param i1 iterator
-        * @param i2 iterator
-        * @param c collector
-        * @throws IOException
-        */
-       public final void streamBufferWithGroups(Iterator i1, Iterator i2, 
Collector c) throws IOException {
-               try {
-                       int size;
-                       if (i1.hasNext() || i2.hasNext()) {
-                               while (true) {
-                                       in.read(buffer, 0, 4);
-                                       int sig = getInt(buffer, 0);
-                                       switch (sig) {
-                                               case SIGNAL_BUFFER_REQUEST_G0:
-                                                       if (i1.hasNext() || 
sender.hasRemaining(0)) {
-                                                               size = 
sender.sendBuffer(i1, 0);
-                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
-                                                       }
-                                                       break;
-                                               case SIGNAL_BUFFER_REQUEST_G1:
-                                                       if (i2.hasNext() || 
sender.hasRemaining(1)) {
-                                                               size = 
sender.sendBuffer(i2, 1);
-                                                               
sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
-                                                       }
-                                                       break;
-                                               case SIGNAL_FINISHED:
-                                                       return;
-                                               case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ex) {
-                                                       }
-                                                       throw new 
RuntimeException(
-                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
-                                               default:
-                                                       
receiver.collectBuffer(c, sig);
-                                                       sendReadConfirmation();
-                                                       break;
-                                       }
-                               }
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       protected final static int getInt(byte[] array, int offset) {
-               return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 
| (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
-       }
-
-       protected final static void putInt(byte[] array, int offset, int value) 
{
-               array[offset] = (byte) (value >> 24);
-               array[offset + 1] = (byte) (value >> 16);
-               array[offset + 2] = (byte) (value >> 8);
-               array[offset + 3] = (byte) (value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/pom.xml 
b/flink-staging/flink-language-binding/flink-python/pom.xml
deleted file mode 100644
index 06ef3ca..0000000
--- a/flink-staging/flink-language-binding/flink-python/pom.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-language-binding-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-       
-    <artifactId>flink-python</artifactId>
-    <name>flink-python</name>
-    <packaging>jar</packaging>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <configuration>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <archive>
-                        <manifest>
-                            <addClasspath>true</addClasspath>
-                            
<mainClass>org.apache.flink.languagebinding.api.java.python.PythonPlanBinder</mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-language-binding-generic</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-    </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
deleted file mode 100644
index 5aa1480..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ /dev/null
@@ -1,442 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.operators.CoGroupRawOperator;
-import org.apache.flink.api.java.operators.SortedGrouping;
-import org.apache.flink.api.java.operators.UnsortedGrouping;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.languagebinding.api.java.common.PlanBinder;
-import org.apache.flink.languagebinding.api.java.common.OperationInfo;
-import 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
-//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
-import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
-import org.apache.flink.languagebinding.api.java.python.functions.*;
-//CHECKSTYLE.ON: AvoidStarImport
-import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
-import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class allows the execution of a Flink plan written in python.
- */
-public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
-       static final Logger LOG = 
LoggerFactory.getLogger(PythonPlanBinder.class);
-
-       public static final String ARGUMENT_PYTHON_2 = "2";
-       public static final String ARGUMENT_PYTHON_3 = "3";
-
-       public static final String FLINK_PYTHON_DC_ID = "flink";
-       public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
-
-       public static final String FLINK_PYTHON2_BINARY_KEY = 
"python.binary.python2";
-       public static final String FLINK_PYTHON3_BINARY_KEY = 
"python.binary.python3";
-       public static String FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-       public static String FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
-
-       private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + "/flink_plan";
-       protected static final String FLINK_PYTHON_REL_LOCAL_PATH = 
"/resources/python";
-       protected static final String FLINK_DIR = 
System.getenv("FLINK_ROOT_DIR");
-       protected static String FULL_PATH;
-
-       public static StringBuilder arguments = new StringBuilder();
-
-       private Process process;
-
-       public static boolean usePython3 = false;
-
-       /**
-        * Entry point for the execution of a python plan.
-        *
-        * @param args planPath[ package1[ packageX[ - parameter1[ 
parameterX]]]]
-        * @throws Exception
-        */
-       public static void main(String[] args) throws Exception {
-               if (args.length < 2) {
-                       System.out.println("Usage: ./bin/pyflink<2/3>.sh 
<pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ 
<parameterX>]]");
-                       return;
-               }
-               usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
-               PythonPlanBinder binder = new PythonPlanBinder();
-               binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
-       }
-
-       public PythonPlanBinder() throws IOException {
-               FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-               FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
-               FULL_PATH = FLINK_DIR != null
-                               ? FLINK_DIR + "/" + FLINK_PYTHON_REL_LOCAL_PATH 
//command-line
-                               : 
FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing
-                               + 
"/src/main/python/org/apache/flink/languagebinding/api/python";
-       }
-
-       protected void runPlan(String[] args) throws Exception {
-               env = ExecutionEnvironment.getExecutionEnvironment();
-
-               int split = 0;
-               for (int x = 0; x < args.length; x++) {
-                       if (args[x].compareTo("-") == 0) {
-                               split = x;
-                       }
-               }
-
-               try {
-                       prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 
: split));
-                       startPython(Arrays.copyOfRange(args, split == 0 ? 
args.length : split + 1, args.length));
-                       receivePlan();
-
-                       if (env instanceof LocalEnvironment) {
-                               FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + "/flink";
-                       }
-
-                       distributeFiles(env);
-                       env.execute();
-                       close();
-               } catch (Exception e) {
-                       close();
-                       throw e;
-               }
-       }
-
-       
//=====Setup========================================================================================================
-       /**
-        * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). 
This allows us to distribute it as one big
-        * package, and resolves PYTHONPATH issues.
-        *
-        * @param filePaths
-        * @throws IOException
-        * @throws URISyntaxException
-        */
-       private void prepareFiles(String... filePaths) throws IOException, 
URISyntaxException {
-               //Flink python package
-               String tempFilePath = FLINK_PYTHON_FILE_PATH;
-               clearPath(tempFilePath);
-               FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), 
false);
-
-               //plan file             
-               copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME);
-
-               //additional files/folders
-               for (int x = 1; x < filePaths.length; x++) {
-                       copyFile(filePaths[x], null);
-               }
-       }
-
-       private static void clearPath(String path) throws IOException, 
URISyntaxException {
-               FileSystem fs = FileSystem.get(new URI(path));
-               if (fs.exists(new Path(path))) {
-                       fs.delete(new Path(path), true);
-               }
-       }
-
-       private static void copyFile(String path, String name) throws 
IOException, URISyntaxException {
-               if (path.endsWith("/")) {
-                       path = path.substring(0, path.length() - 1);
-               }
-               String identifier = name == null ? 
path.substring(path.lastIndexOf("/")) : name;
-               String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
-               clearPath(tmpFilePath);
-               Path p = new Path(path);
-               FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
-       }
-
-       private static void distributeFiles(ExecutionEnvironment env) throws 
IOException, URISyntaxException {
-               clearPath(FLINK_HDFS_PATH);
-               FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new 
Path(FLINK_HDFS_PATH), true);
-               env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
-               clearPath(FLINK_PYTHON_FILE_PATH);
-       }
-
-       private void startPython(String[] args) throws IOException {
-               for (String arg : args) {
-                       arguments.append(" ").append(arg);
-               }
-               receiver = new Receiver(null);
-               receiver.open(FLINK_TMP_DATA_DIR + "/output");
-
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-               try {
-                       Runtime.getRuntime().exec(pythonBinaryPath);
-               } catch (IOException ex) {
-                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
-               }
-               process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString());
-
-               new StreamPrinter(process.getInputStream()).start();
-               new StreamPrinter(process.getErrorStream()).start();
-
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException ex) {
-               }
-
-               try {
-                       int value = process.exitValue();
-                       if (value != 0) {
-                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
-                       }
-                       if (value == 0) {
-                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
-                       }
-               } catch (IllegalThreadStateException ise) {//Process still 
running
-               }
-
-               process.getOutputStream().write("plan\n".getBytes());
-               process.getOutputStream().write((FLINK_TMP_DATA_DIR + 
"/output\n").getBytes());
-               process.getOutputStream().flush();
-       }
-
-       private void close() {
-               try { //prevent throwing exception so that previous exceptions 
aren't hidden.
-                       if (!DEBUG) {
-                               FileSystem hdfs = FileSystem.get(new 
URI(FLINK_HDFS_PATH));
-                               hdfs.delete(new Path(FLINK_HDFS_PATH), true);
-                       }
-
-                       FileSystem local = FileSystem.getLocalFileSystem();
-                       local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
-                       local.delete(new Path(FLINK_TMP_DATA_DIR), true);
-                       receiver.close();
-               } catch (NullPointerException npe) {
-               } catch (IOException ioe) {
-                       LOG.error("PythonAPI file cleanup failed. " + 
ioe.getMessage());
-               } catch (URISyntaxException use) { // can't occur
-               }
-               try {
-                       process.exitValue();
-               } catch (NullPointerException npe) { //exception occurred 
before process was started
-               } catch (IllegalThreadStateException ise) { //process still 
active
-                       process.destroy();
-               }
-       }
-
-       //=====Plan 
Binding=================================================================================================
-       protected class PythonOperationInfo extends OperationInfo {
-               public boolean combine;
-
-               @Override
-               public String toString() {
-                       StringBuilder sb = new StringBuilder();
-                       sb.append(super.toString());
-                       sb.append("Combine: ").append(combine).append("\n");
-                       return sb.toString();
-               }
-
-               protected PythonOperationInfo(Receiver receiver, 
AbstractOperation identifier) throws IOException {
-                       Object tmpType;
-                       setID = (Integer) receiver.getRecord(true);
-                       parentID = (Integer) receiver.getRecord(true);
-                       switch (identifier) {
-                               case COGROUP:
-                                       otherID = (Integer) 
receiver.getRecord(true);
-                                       keys1 = 
normalizeKeys(receiver.getRecord(true));
-                                       keys2 = 
normalizeKeys(receiver.getRecord(true));
-                                       tmpType = receiver.getRecord();
-                                       types = tmpType == null ? null : 
getForObject(tmpType);
-                                       name = (String) receiver.getRecord();
-                                       break;
-                               case CROSS:
-                               case CROSS_H:
-                               case CROSS_T:
-                                       otherID = (Integer) 
receiver.getRecord(true);
-                                       tmpType = receiver.getRecord();
-                                       types = tmpType == null ? null : 
getForObject(tmpType);
-                                       int cProjectCount = (Integer) 
receiver.getRecord(true);
-                                       projections = new 
ProjectionEntry[cProjectCount];
-                                       for (int x = 0; x < cProjectCount; x++) 
{
-                                               String side = (String) 
receiver.getRecord();
-                                               int[] keys = toIntArray((Tuple) 
receiver.getRecord(true));
-                                               projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
-                                       }
-                                       name = (String) receiver.getRecord();
-                                       break;
-                               case REDUCE:
-                               case GROUPREDUCE:
-                                       tmpType = receiver.getRecord();
-                                       types = tmpType == null ? null : 
getForObject(tmpType);
-                                       combine = (Boolean) 
receiver.getRecord();
-                                       name = (String) receiver.getRecord();
-                                       break;
-                               case JOIN:
-                               case JOIN_H:
-                               case JOIN_T:
-                                       keys1 = 
normalizeKeys(receiver.getRecord(true));
-                                       keys2 = 
normalizeKeys(receiver.getRecord(true));
-                                       otherID = (Integer) 
receiver.getRecord(true);
-                                       tmpType = receiver.getRecord();
-                                       types = tmpType == null ? null : 
getForObject(tmpType);
-                                       int jProjectCount = (Integer) 
receiver.getRecord(true);
-                                       projections = new 
ProjectionEntry[jProjectCount];
-                                       for (int x = 0; x < jProjectCount; x++) 
{
-                                               String side = (String) 
receiver.getRecord();
-                                               int[] keys = toIntArray((Tuple) 
receiver.getRecord(true));
-                                               projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
-                                       }
-                                       name = (String) receiver.getRecord();
-                                       break;
-                               case MAPPARTITION:
-                               case FLATMAP:
-                               case MAP:
-                               case FILTER:
-                                       tmpType = receiver.getRecord();
-                                       types = tmpType == null ? null : 
getForObject(tmpType);
-                                       name = (String) receiver.getRecord();
-                                       break;
-                               default:
-                                       throw new 
UnsupportedOperationException("This operation is not implemented in the Python 
API: " + identifier);
-                       }
-               }
-       }
-
-       @Override
-       protected PythonOperationInfo createOperationInfo(AbstractOperation 
identifier) throws IOException {
-               return new PythonOperationInfo(receiver, identifier);
-       }
-
-       @Override
-       protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, 
String[] firstKeys, String[] secondKeys, PythonOperationInfo info) {
-               return new CoGroupRawOperator(
-                               op1,
-                               op2,
-                               new Keys.ExpressionKeys(firstKeys, 
op1.getType()),
-                               new Keys.ExpressionKeys(secondKeys, 
op2.getType()),
-                               new PythonCoGroup(info.setID, info.types),
-                               info.types, info.name);
-       }
-
-       @Override
-       protected DataSet applyCrossOperation(DataSet op1, DataSet op2, 
DatasizeHint mode, PythonOperationInfo info) {
-               switch (mode) {
-                       case NONE:
-                               return op1.cross(op2).name("PythonCrossPreStep")
-                                               .mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name);
-                       case HUGE:
-                               return 
op1.crossWithHuge(op2).name("PythonCrossPreStep")
-                                               .mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name);
-                       case TINY:
-                               return 
op1.crossWithTiny(op2).name("PythonCrossPreStep")
-                                               .mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name);
-                       default:
-                               throw new IllegalArgumentException("Invalid 
Cross mode specified: " + mode);
-               }
-       }
-
-       @Override
-       protected DataSet applyFilterOperation(DataSet op1, PythonOperationInfo 
info) {
-               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.types)).name(info.name);
-       }
-
-       @Override
-       protected DataSet applyFlatMapOperation(DataSet op1, 
PythonOperationInfo info) {
-               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.types)).name(info.name);
-       }
-
-       @Override
-       protected DataSet applyGroupReduceOperation(DataSet op1, 
PythonOperationInfo info) {
-               if (info.combine) {
-                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID * -1))
-                                       
.setCombinable(true).name("PythonCombine")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               } else {
-                       return op1.reduceGroup(new PythonCombineIdentity())
-                                       
.setCombinable(false).name("PythonGroupReducePreStep")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               }
-       }
-
-       @Override
-       protected DataSet applyGroupReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
-               if (info.combine) {
-                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID * -1))
-                                       
.setCombinable(true).name("PythonCombine")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               } else {
-                       return op1.reduceGroup(new PythonCombineIdentity())
-                                       
.setCombinable(false).name("PythonGroupReducePreStep")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               }
-       }
-
-       @Override
-       protected DataSet applyGroupReduceOperation(SortedGrouping op1, 
PythonOperationInfo info) {
-               if (info.combine) {
-                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID * -1))
-                                       
.setCombinable(true).name("PythonCombine")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               } else {
-                       return op1.reduceGroup(new PythonCombineIdentity())
-                                       
.setCombinable(false).name("PythonGroupReducePreStep")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               }
-       }
-
-       @Override
-       protected DataSet applyJoinOperation(DataSet op1, DataSet op2, String[] 
firstKeys, String[] secondKeys, DatasizeHint mode, PythonOperationInfo info) {
-               return createDefaultJoin(op1, op2, firstKeys, secondKeys, 
mode).name("PythonJoinPreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types)).name(info.name);
-       }
-
-       @Override
-       protected DataSet applyMapOperation(DataSet op1, PythonOperationInfo 
info) {
-               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.types)).name(info.name);
-       }
-
-       @Override
-       protected DataSet applyMapPartitionOperation(DataSet op1, 
PythonOperationInfo info) {
-               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.types)).name(info.name);
-       }
-
-       @Override
-       protected DataSet applyReduceOperation(DataSet op1, PythonOperationInfo 
info) {
-               return op1.reduceGroup(new PythonCombineIdentity())
-                               
.setCombinable(false).name("PythonReducePreStep")
-                               .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                               .name(info.name);
-       }
-
-       @Override
-       protected DataSet applyReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
-               if (info.combine) {
-                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID * -1))
-                                       
.setCombinable(true).name("PythonCombine")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               } else {
-                       return op1.reduceGroup(new PythonCombineIdentity())
-                                       
.setCombinable(false).name("PythonReducePreStep")
-                                       .mapPartition(new 
PythonMapPartition(info.setID, info.types))
-                                       .name(info.name);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
deleted file mode 100644
index 26d554d..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python.functions;
-
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
-import org.apache.flink.util.Collector;
-import java.io.IOException;
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-/**
- * CoGroupFunction that uses a python script.
- *
- * @param <IN1>
- * @param <IN2>
- * @param <OUT>
- */
-public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, 
IN2, OUT> implements ResultTypeQueryable {
-       private final PythonStreamer streamer;
-       private transient final TypeInformation<OUT> typeInformation;
-
-       public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) {
-               this.typeInformation = typeInformation;
-               streamer = new PythonStreamer(this, id);
-       }
-
-       /**
-        * Opens this function.
-        *
-        * @param config configuration
-        * @throws IOException
-        */
-       @Override
-       public void open(Configuration config) throws IOException {
-               streamer.open();
-               streamer.sendBroadCastVariables(config);
-       }
-
-       /**
-        * Calls the external python function.
-        *
-        * @param first
-        * @param second
-        * @param out collector
-        * @throws IOException
-        */
-       @Override
-       public final void coGroup(Iterable<IN1> first, Iterable<IN2> second, 
Collector<OUT> out) throws Exception {
-               streamer.streamBufferWithGroups(first.iterator(), 
second.iterator(), out);
-       }
-
-       /**
-        * Closes this function.
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               streamer.close();
-       }
-
-       @Override
-       public TypeInformation<OUT> getProducedType() {
-               return typeInformation;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
deleted file mode 100644
index a8ff96c..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python.functions;
-
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
-import org.apache.flink.util.Collector;
-import java.io.IOException;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-
-/**
- * Multi-purpose class, used for Combine-operations using a python script, and 
as a preprocess step for
- * GroupReduce-operations.
- *
- * @param <IN>
- */
-public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN> 
{
-       private PythonStreamer streamer;
-
-       public PythonCombineIdentity() {
-               streamer = null;
-       }
-
-       public PythonCombineIdentity(int id) {
-               streamer = new PythonStreamer(this, id);
-       }
-
-       @Override
-       public void open(Configuration config) throws IOException {
-               if (streamer != null) {
-                       streamer.open();
-                       streamer.sendBroadCastVariables(config);
-               }
-       }
-
-       /**
-        * Calls the external python function.
-        *
-        * @param values function input
-        * @param out collector
-        * @throws IOException
-        */
-       @Override
-       public final void reduce(Iterable<IN> values, Collector<IN> out) throws 
Exception {
-               for (IN value : values) {
-                       out.collect(value);
-               }
-       }
-
-       /**
-        * Calls the external python function.
-        *
-        * @param values function input
-        * @param out collector
-        * @throws IOException
-        */
-       @Override
-       public final void combine(Iterable<IN> values, Collector<IN> out) 
throws Exception {
-               streamer.streamBufferWithoutGroups(values.iterator(), out);
-       }
-
-       @Override
-       public void close() throws IOException {
-               if (streamer != null) {
-                       streamer.close();
-                       streamer = null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
deleted file mode 100644
index 1f13e5c..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python.functions;
-
-import java.io.IOException;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
-import org.apache.flink.util.Collector;
-
-/**
- * Multi-purpose class, usable by all operations using a python script with 
one input source and possibly differing
- * in-/output types.
- *
- * @param <IN>
- * @param <OUT>
- */
-public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, 
OUT> implements ResultTypeQueryable {
-       private final PythonStreamer streamer;
-       private transient final TypeInformation<OUT> typeInformation;
-
-       public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) 
{
-               this.typeInformation = typeInformation;
-               streamer = new PythonStreamer(this, id);
-       }
-
-       /**
-        * Opens this function.
-        *
-        * @param config configuration
-        * @throws IOException
-        */
-       @Override
-       public void open(Configuration config) throws IOException {
-               streamer.open();
-               streamer.sendBroadCastVariables(config);
-       }
-
-       @Override
-       public void mapPartition(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
-               streamer.streamBufferWithoutGroups(values.iterator(), out);
-       }
-
-       /**
-        * Closes this function.
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               streamer.close();
-       }
-
-       @Override
-       public TypeInformation<OUT> getProducedType() {
-               return typeInformation;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
deleted file mode 100644
index 6d21c4c..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.languagebinding.api.java.python.streaming;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
-import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_DC_ID;
-import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
-import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
-import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
-import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
-import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
-
-/**
- * This streamer is used by functions to send/receive data to/from an external 
python process.
- */
-public class PythonStreamer extends Streamer {
-       private Process process;
-       private final int id;
-       private final boolean usePython3;
-       private final boolean debug;
-       private Thread shutdownThread;
-       private final String planArguments;
-
-       private String inputFilePath;
-       private String outputFilePath;
-
-       public PythonStreamer(AbstractRichFunction function, int id) {
-               super(function);
-               this.id = id;
-               this.usePython3 = PythonPlanBinder.usePython3;
-               this.debug = DEBUG;
-               planArguments = PythonPlanBinder.arguments.toString();
-       }
-
-       /**
-        * Starts the python script.
-        *
-        * @throws IOException
-        */
-       @Override
-       public void setupProcess() throws IOException {
-               startPython();
-       }
-
-       private void startPython() throws IOException {
-               this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-               this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
-
-               sender.open(inputFilePath);
-               receiver.open(outputFilePath);
-
-               String path = 
function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
-               String planPath = path + FLINK_PYTHON_PLAN_NAME;
-
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-               try {
-                       Runtime.getRuntime().exec(pythonBinaryPath);
-               } catch (IOException ex) {
-                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
-               }
-
-               if (debug) {
-                       socket.setSoTimeout(0);
-                       LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
-                                       + " Run python " + planPath + 
planArguments);
-               } else {
-                       process = Runtime.getRuntime().exec(pythonBinaryPath + 
" -O -B " + planPath + planArguments);
-                       new StreamPrinter(process.getInputStream()).start();
-                       new StreamPrinter(process.getErrorStream(), true, 
msg).start();
-               }
-
-               shutdownThread = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       destroyProcess();
-                               } catch (IOException ex) {
-                               }
-                       }
-               };
-
-               Runtime.getRuntime().addShutdownHook(shutdownThread);
-
-               OutputStream processOutput = process.getOutputStream();
-               processOutput.write("operator\n".getBytes());
-               processOutput.write(("" + server.getLocalPort() + 
"\n").getBytes());
-               processOutput.write((id + "\n").getBytes());
-               processOutput.write((inputFilePath + "\n").getBytes());
-               processOutput.write((outputFilePath + "\n").getBytes());
-               processOutput.flush();
-
-               try { // wait a bit to catch syntax errors
-                       Thread.sleep(2000);
-               } catch (InterruptedException ex) {
-               }
-               if (!debug) {
-                       try {
-                               process.exitValue();
-                               throw new RuntimeException("External process 
for task " + function.getRuntimeContext().getTaskName() + " terminated 
prematurely." + msg);
-                       } catch (IllegalThreadStateException ise) { //process 
still active -> start receiving data
-                       }
-               }
-
-               socket = server.accept();
-               in = socket.getInputStream();
-               out = socket.getOutputStream();
-       }
-
-       /**
-        * Closes this streamer.
-        *
-        * @throws IOException
-        */
-       @Override
-       public void close() throws IOException {
-               try {
-                       super.close();
-               } catch (Exception e) {
-                       LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
-               }
-               if (!debug) {
-                       destroyProcess();
-               }
-               if (shutdownThread != null) {
-                       Runtime.getRuntime().removeShutdownHook(shutdownThread);
-               }
-       }
-
-       private void destroyProcess() throws IOException {
-               try {
-                       process.exitValue();
-               } catch (IllegalThreadStateException ise) { //process still 
active
-                       if 
(process.getClass().getName().equals("java.lang.UNIXProcess")) {
-                               int pid;
-                               try {
-                                       Field f = 
process.getClass().getDeclaredField("pid");
-                                       f.setAccessible(true);
-                                       pid = f.getInt(process);
-                               } catch (Throwable e) {
-                                       process.destroy();
-                                       return;
-                               }
-                               String[] args = new String[]{"kill", "-9", "" + 
pid};
-                               Runtime.getRuntime().exec(args);
-                       } else {
-                               process.destroy();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
deleted file mode 100644
index d35bf39..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py
deleted file mode 100644
index d35bf39..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
deleted file mode 100644
index bf35756..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
+++ /dev/null
@@ -1,157 +0,0 @@
-# 
###############################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from struct import pack
-import sys
-
-from flink.connection.Constants import Types
-
-PY2 = sys.version_info[0] == 2
-PY3 = sys.version_info[0] == 3
-
-if PY2:
-    stringtype = basestring
-else:
-    stringtype = str
-
-
-class Collector(object):
-    def __init__(self, con):
-        self._connection = con
-        self._serializer = None
-
-    def _close(self):
-        self._connection.send_end_signal()
-
-    def collect(self, value):
-        self._serializer = _get_serializer(self._connection.write, value)
-        self.collect = self._collect
-        self.collect(value)
-
-    def _collect(self, value):
-        self._connection.write(self._serializer.serialize(value))
-
-
-def _get_serializer(write, value):
-    if isinstance(value, (list, tuple)):
-        write(Types.TYPE_TUPLE)
-        write(pack(">I", len(value)))
-        return TupleSerializer(write, value)
-    elif value is None:
-        write(Types.TYPE_NULL)
-        return NullSerializer()
-    elif isinstance(value, stringtype):
-        write(Types.TYPE_STRING)
-        return StringSerializer()
-    elif isinstance(value, bool):
-        write(Types.TYPE_BOOLEAN)
-        return BooleanSerializer()
-    elif isinstance(value, int) or PY2 and isinstance(value, long):
-        write(Types.TYPE_LONG)
-        return LongSerializer()
-    elif isinstance(value, bytearray):
-        write(Types.TYPE_BYTES)
-        return ByteArraySerializer()
-    elif isinstance(value, float):
-        write(Types.TYPE_DOUBLE)
-        return FloatSerializer()
-    else:
-        raise Exception("Unsupported Type encountered.")
-
-
-class TupleSerializer(object):
-    def __init__(self, write, value):
-        self.serializer = [_get_serializer(write, field) for field in value]
-
-    def serialize(self, value):
-        bits = []
-        for i in range(len(value)):
-            bits.append(self.serializer[i].serialize(value[i]))
-        return b"".join(bits)
-
-
-class BooleanSerializer(object):
-    def serialize(self, value):
-        return pack(">?", value)
-
-
-class FloatSerializer(object):
-    def serialize(self, value):
-        return pack(">d", value)
-
-
-class LongSerializer(object):
-    def serialize(self, value):
-        return pack(">q", value)
-
-
-class ByteArraySerializer(object):
-    def serialize(self, value):
-        value = bytes(value)
-        return pack(">I", len(value)) + value
-
-
-class StringSerializer(object):
-    def serialize(self, value):
-        value = value.encode("utf-8")
-        return pack(">I", len(value)) + value
-
-
-class NullSerializer(object):
-    def serialize(self, value):
-        return b""
-
-
-class TypedCollector(object):
-    def __init__(self, con):
-        self._connection = con
-
-    def collect(self, value):
-        if not isinstance(value, (list, tuple)):
-            self._send_field(value)
-        else:
-            self._connection.write(Types.TYPE_TUPLE)
-            meta = pack(">I", len(value))
-            self._connection.write(bytes([meta[3]]) if PY3 else meta[3])
-            for field in value:
-                self.collect(field)
-
-    def _send_field(self, value):
-        if value is None:
-            self._connection.write(Types.TYPE_NULL)
-        elif isinstance(value, stringtype):
-            value = value.encode("utf-8")
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_STRING, size, value]))
-        elif isinstance(value, bytes):
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
-        elif isinstance(value, bool):
-            data = pack(">?", value)
-            self._connection.write(b"".join([Types.TYPE_BOOLEAN, data]))
-        elif isinstance(value, int) or PY2 and isinstance(value, long):
-            data = pack(">q", value)
-            self._connection.write(b"".join([Types.TYPE_LONG, data]))
-        elif isinstance(value, float):
-            data = pack(">d", value)
-            self._connection.write(b"".join([Types.TYPE_DOUBLE, data]))
-        elif isinstance(value, bytearray):
-            value = bytes(value)
-            size = pack(">I", len(value))
-            self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
-        else:
-            raise Exception("Unsupported Type encountered.")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
deleted file mode 100644
index 988bf25..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# 
###############################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import mmap
-import socket as SOCKET
-from struct import pack, unpack
-from collections import deque
-import sys
-PY2 = sys.version_info[0] == 2
-PY3 = sys.version_info[0] == 3
-
-MAPPED_FILE_SIZE = 1024 * 1024 * 64
-
-SIGNAL_REQUEST_BUFFER = b"\x00\x00\x00\x00"
-SIGNAL_REQUEST_BUFFER_G0 = b"\xFF\xFF\xFF\xFD"
-SIGNAL_REQUEST_BUFFER_G1 = b"\xFF\xFF\xFF\xFC"
-SIGNAL_FINISHED = b"\xFF\xFF\xFF\xFF"
-
-if PY2:
-    SIGNAL_WAS_LAST = "\x20"
-else:
-    SIGNAL_WAS_LAST = 32
-
-
-class OneWayBusyBufferingMappedFileConnection(object):
-    def __init__(self, output_path):
-        self._output_file = open(output_path, "rb+")
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-
-        self._out = deque()
-        self._out_size = 0
-
-        self._offset_limit = MAPPED_FILE_SIZE - 1024 * 1024 * 3
-
-    def write(self, msg):
-        self._out.append(msg)
-        self._out_size += len(msg)
-        if self._out_size > self._offset_limit:
-            self._write_buffer()
-
-    def _write_buffer(self):
-        self._file_output_buffer.seek(1, 0)
-        self._file_output_buffer.write(b"".join(self._out))
-        self._file_output_buffer.seek(0, 0)
-        self._file_output_buffer.write(b'\x01')
-
-
-class BufferingTCPMappedFileConnection(object):
-    def __init__(self, input_file, output_file, port):
-        self._input_file = open(input_file, "rb+")
-        self._output_file = open(output_file, "rb+")
-        self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-        self._socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)
-        self._socket.connect((SOCKET.gethostbyname("localhost"), port))
-
-        self._out = deque()
-        self._out_size = 0
-
-        self._input = b""
-        self._input_offset = 0
-        self._input_size = 0
-        self._was_last = False
-
-    def write(self, msg):
-        length = len(msg)
-        if length > MAPPED_FILE_SIZE:
-            raise Exception("Serialized object does not fit into a single 
buffer.")
-        tmp = self._out_size + length
-        if tmp > MAPPED_FILE_SIZE:
-            self._write_buffer()
-            self.write(msg)
-        else:
-            self._out.append(msg)
-            self._out_size = tmp
-
-    def _write_buffer(self):
-        self._file_output_buffer.seek(0, 0)
-        self._file_output_buffer.write(b"".join(self._out))
-        self._socket.send(pack(">i", self._out_size))
-        self._out.clear()
-        self._out_size = 0
-        self._socket.recv(1, SOCKET.MSG_WAITALL)
-
-    def read(self, des_size, ignored=None):
-        if self._input_size == self._input_offset:
-            self._read_buffer()
-        old_offset = self._input_offset
-        self._input_offset += des_size
-        return self._input[old_offset:self._input_offset]
-
-    def _read_buffer(self):
-        self._socket.send(SIGNAL_REQUEST_BUFFER)
-        self._file_input_buffer.seek(0, 0)
-        self._input_offset = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
-        self._input_size = unpack(">I", meta_size[:4])[0]
-        self._was_last = meta_size[4] == SIGNAL_WAS_LAST
-        self._input = self._file_input_buffer.read(self._input_size)
-
-    def send_end_signal(self):
-        if self._out_size:
-            self._write_buffer()
-        self._socket.send(SIGNAL_FINISHED)
-
-    def has_next(self, ignored=None):
-        return not self._was_last or not self._input_size == self._input_offset
-
-    def reset(self):
-        self._was_last = False
-        self._input_size = 0
-        self._input_offset = 0
-        self._input = b""
-
-
-class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
-    def __init__(self, input_file, output_file, port):
-        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, 
output_file, port)
-        self._input = [b"", b""]
-        self._input_offset = [0, 0]
-        self._input_size = [0, 0]
-        self._was_last = [False, False]
-
-    def read(self, des_size, group):
-        if self._input_size[group] == self._input_offset[group]:
-            self._read_buffer(group)
-        old_offset = self._input_offset[group]
-        self._input_offset[group] += des_size
-        return self._input[group][old_offset:self._input_offset[group]]
-
-    def _read_buffer(self, group):
-        if group:
-            self._socket.send(SIGNAL_REQUEST_BUFFER_G1)
-        else:
-            self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
-        self._file_input_buffer.seek(0, 0)
-        self._input_offset[group] = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
-        self._input_size[group] = unpack(">I", meta_size[:4])[0]
-        self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
-        self._input[group] = 
self._file_input_buffer.read(self._input_size[group])
-
-    def has_next(self, group):
-        return not self._was_last[group] or not self._input_size[group] == 
self._input_offset[group]
-
-    def reset(self):
-        self._was_last = [False, False]
-        self._input_size = [0, 0]
-        self._input_offset = [0, 0]
-        self._input = [b"", b""]
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py
deleted file mode 100644
index 0ca2232..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-
-class Types(object):
-    TYPE_TUPLE = b'\x0B'
-    TYPE_BOOLEAN = b'\x0A'
-    TYPE_BYTE = b'\x09'
-    TYPE_SHORT = b'\x08'
-    TYPE_INTEGER = b'\x07'
-    TYPE_LONG = b'\x06'
-    TYPE_DOUBLE = b'\x04'
-    TYPE_FLOAT = b'\x05'
-    TYPE_STRING = b'\x02'
-    TYPE_NULL = b'\x00'
-    TYPE_BYTES = b'\x01'

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
deleted file mode 100644
index fb0e26d..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
+++ /dev/null
@@ -1,327 +0,0 @@
-# 
###############################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from struct import unpack
-from collections import deque
-
-try:
-    import _abcoll as defIter
-except:
-    import _collections_abc as defIter
-
-from flink.connection.Constants import Types
-
-
-class ListIterator(defIter.Iterator):
-    def __init__(self, values):
-        super(ListIterator, self).__init__()
-        self._values = deque(values)
-
-    def __next__(self):
-        return self.next()
-
-    def next(self):
-        if self.has_next():
-            return self._values.popleft()
-        else:
-            raise StopIteration
-
-    def has_next(self):
-        return self._values
-
-
-class GroupIterator(defIter.Iterator):
-    def __init__(self, iterator, keys=None):
-        super(GroupIterator, self).__init__()
-        self.iterator = iterator
-        self.key = None
-        self.keys = keys
-        if self.keys is None:
-            self._extract_keys = self._extract_keys_id
-        self.cur = None
-        self.empty = False
-
-    def _init(self):
-        if self.iterator.has_next():
-            self.empty = False
-            self.cur = self.iterator.next()
-            self.key = self._extract_keys(self.cur)
-        else:
-            self.empty = True
-
-    def __next__(self):
-        return self.next()
-
-    def next(self):
-        if self.has_next():
-            tmp = self.cur
-            if self.iterator.has_next():
-                self.cur = self.iterator.next()
-                if self.key != self._extract_keys(self.cur):
-                    self.empty = True
-            else:
-                self.cur = None
-                self.empty = True
-            return tmp
-        else:
-            raise StopIteration
-
-    def has_next(self):
-        if self.empty:
-            return False
-        return self.key == self._extract_keys(self.cur)
-
-    def has_group(self):
-        return self.cur is not None
-
-    def next_group(self):
-        self.key = self._extract_keys(self.cur)
-        self.empty = False
-
-    def _extract_keys(self, x):
-        return [x[k] for k in self.keys]
-
-    def _extract_keys_id(self, x):
-        return x
-
-
-class CoGroupIterator(object):
-    NONE_REMAINED = 1
-    FIRST_REMAINED = 2
-    SECOND_REMAINED = 3
-    FIRST_EMPTY = 4
-    SECOND_EMPTY = 5
-
-    def __init__(self, c1, c2, k1, k2):
-        self.i1 = GroupIterator(c1, k1)
-        self.i2 = GroupIterator(c2, k2)
-        self.p1 = None
-        self.p2 = None
-        self.match = None
-        self.key = None
-
-    def _init(self):
-        self.i1._init()
-        self.i2._init()
-
-    def next(self):
-        first_empty = True
-        second_empty = True
-
-        if self.match != CoGroupIterator.FIRST_EMPTY:
-            if self.match == CoGroupIterator.FIRST_REMAINED:
-                first_empty = False
-            else:
-                if self.i1.has_group():
-                    self.i1.next_group()
-                    self.key = self.i1.key
-                    first_empty = False
-
-        if self.match != CoGroupIterator.SECOND_EMPTY:
-            if self.match == CoGroupIterator.SECOND_REMAINED:
-                second_empty = False
-            else:
-                if self.i2.has_group():
-                    self.i2.next_group()
-                    second_empty = False
-
-        if first_empty and second_empty:
-            return False
-        elif first_empty and (not second_empty):
-            self.p1 = DummyIterator()
-            self.p2 = self.i2
-            self.match = CoGroupIterator.FIRST_EMPTY
-            return True
-        elif (not first_empty) and second_empty:
-            self.p1 = self.i1
-            self.p2 = DummyIterator()
-            self.match = CoGroupIterator.SECOND_EMPTY
-            return True
-        else:
-            if self.key == self.i2.key:
-                self.p1 = self.i1
-                self.p2 = self.i2
-                self.match = CoGroupIterator.NONE_REMAINED
-            elif self.key < self.i2.key:
-                self.p1 = self.i1
-                self.p2 = DummyIterator()
-                self.match = CoGroupIterator.SECOND_REMAINED
-            else:
-                self.p1 = DummyIterator()
-                self.p2 = self.i2
-                self.match = CoGroupIterator.FIRST_REMAINED
-            return True
-
-
-class Iterator(defIter.Iterator):
-    def __init__(self, con, group=0):
-        super(Iterator, self).__init__()
-        self._connection = con
-        self._init = True
-        self._group = group
-        self._deserializer = None
-
-    def __next__(self):
-        return self.next()
-
-    def next(self):
-        if self.has_next():
-            if self._deserializer is None:
-                self._deserializer = _get_deserializer(self._group, 
self._connection.read)
-            return self._deserializer.deserialize()
-        else:
-            raise StopIteration
-
-    def has_next(self):
-        return self._connection.has_next(self._group)
-
-    def _reset(self):
-        self._deserializer = None
-
-
-class DummyIterator(Iterator):
-    def __init__(self):
-        super(Iterator, self).__init__()
-
-    def __next__(self):
-        raise StopIteration
-
-    def next(self):
-        raise StopIteration
-
-    def has_next(self):
-        return False
-
-
-def _get_deserializer(group, read, type=None):
-    if type is None:
-        type = read(1, group)
-        return _get_deserializer(group, read, type)
-    elif type == Types.TYPE_TUPLE:
-        return TupleDeserializer(read, group)
-    elif type == Types.TYPE_BYTE:
-        return ByteDeserializer(read, group)
-    elif type == Types.TYPE_BYTES:
-        return ByteArrayDeserializer(read, group)
-    elif type == Types.TYPE_BOOLEAN:
-        return BooleanDeserializer(read, group)
-    elif type == Types.TYPE_FLOAT:
-        return FloatDeserializer(read, group)
-    elif type == Types.TYPE_DOUBLE:
-        return DoubleDeserializer(read, group)
-    elif type == Types.TYPE_INTEGER:
-        return IntegerDeserializer(read, group)
-    elif type == Types.TYPE_LONG:
-        return LongDeserializer(read, group)
-    elif type == Types.TYPE_STRING:
-        return StringDeserializer(read, group)
-    elif type == Types.TYPE_NULL:
-        return NullDeserializer(read, group)
-
-
-class TupleDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-        size = unpack(">I", self.read(4, self._group))[0]
-        self.deserializer = [_get_deserializer(self._group, self.read) for _ 
in range(size)]
-
-    def deserialize(self):
-        return tuple([s.deserialize() for s in self.deserializer])
-
-
-class ByteDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">c", self.read(1, self._group))[0]
-
-
-class ByteArrayDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        size = unpack(">i", self.read(4, self._group))[0]
-        return bytearray(self.read(size, self._group)) if size else 
bytearray(b"")
-
-
-class BooleanDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">?", self.read(1, self._group))[0]
-
-
-class FloatDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">f", self.read(4, self._group))[0]
-
-
-class DoubleDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">d", self.read(8, self._group))[0]
-
-
-class IntegerDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">i", self.read(4, self._group))[0]
-
-
-class LongDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return unpack(">q", self.read(8, self._group))[0]
-
-
-class StringDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        length = unpack(">i", self.read(4, self._group))[0]
-        return self.read(length, self._group).decode("utf-8") if length else ""
-
-
-class NullDeserializer(object):
-    def __init__(self, read, group):
-        self.read = read
-        self._group = group
-
-    def deserialize(self):
-        return None

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
deleted file mode 100644
index 65b48d4..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py
deleted file mode 100644
index 032ef85..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py
+++ /dev/null
@@ -1,115 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import sys
-
-from datetime import datetime
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING, FLOAT, WriteMode
-from flink.functions.FilterFunction import FilterFunction
-from flink.functions.ReduceFunction import ReduceFunction
-from flink.functions.JoinFunction import JoinFunction
-from flink.functions.MapFunction import MapFunction
-
-class OrderFilter(FilterFunction):
-    def filter(self, value):
-        if (datetime.strptime(value[2], "%Y-%m-%d") > 
datetime.strptime("1990-12-31", "%Y-%m-%d")):
-            return True
-        else:
-            return False
-
-class LineitemFilter(FilterFunction):
-    def filter(self, value):
-       if value[3] == "R":
-               return True
-       else:
-               return False
-
-class ComputeRevenue(MapFunction):
-       def map(self, value):
-               return (value[0], value[1] * (1 - value[2]))
-
-class SumReducer(ReduceFunction):
-    def reduce(self, value1, value2):
-        return (value1[0], value1[1] + value2[1])
-
-class test(JoinFunction):
-       def join(self, value1, value2):
-               return (value1[1], value2[1])
-
-
-if __name__ == "__main__":
-
-       env = get_environment()
-       
-       if len(sys.argv) != 6:
-               sys.exit("Usage: ./bin/pyflink.sh TPCHQuery10 <customer path> 
<order path> <lineitem path> <nation path> <output path>")
-       
-       customer = env \
-        .read_csv(sys.argv[1], 
-            [INT, STRING, STRING, INT, STRING, FLOAT, STRING, STRING], '\n', 
'|') \
-        .project(0,1,2,3,5) 
-
-       order = env \
-        .read_csv(sys.argv[2], 
-            [INT, INT, STRING, FLOAT, STRING, STRING, STRING, INT, STRING], 
'\n', '|') \
-        .project(0,1,4) \
-               .filter(OrderFilter()) \
-               .project(0,1)
-
-       lineitem = env \
-        .read_csv(sys.argv[3], [INT, INT, INT, INT, INT, FLOAT, FLOAT, FLOAT,
-             STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], 
'\n', '|') \
-        .project(0,5,6,8) \
-        .filter(LineitemFilter()) \
-        .map(ComputeRevenue(), [INT, FLOAT])
-
-       nation = env \
-       .read_csv(sys.argv[4], [INT, STRING, INT, STRING], '\n', '|') \
-       .project(0,1)
-
-       revenueByCustomer = order \
-       .join_with_huge(lineitem) \
-       .where(0) \
-       .equal_to(0) \
-       .project_first(1) \
-       .project_second(1)
-
-       revenueByCustomer = revenueByCustomer \
-       .group_by(0) \
-       .reduce(SumReducer())
-
-       customerWithNation = customer \
-       .join_with_tiny(nation) \
-       .where(3) \
-       .equal_to(0) \
-       .project_first(0,1,2) \
-       .project_second(1) \
-       .project_first(4)
-
-       result = customerWithNation \
-       .join(revenueByCustomer) \
-       .where(0) \
-       .equal_to(0) \
-       .project_first(0,1,2,3,4) \
-       .project_second(1)
-
-       result.write_csv(sys.argv[5], '\n', '|', WriteMode.OVERWRITE)
-
-       env.set_degree_of_parallelism(1)
-
-       env.execute(local=True)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py
deleted file mode 100644
index 5fafb01..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py
+++ /dev/null
@@ -1,106 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import sys
-
-from datetime import datetime
-from flink.plan.Environment import get_environment
-from flink.plan.Constants import INT, STRING, FLOAT, WriteMode
-from flink.functions.FilterFunction import FilterFunction
-from flink.functions.ReduceFunction import ReduceFunction
-from flink.functions.JoinFunction import JoinFunction
-
-class CustomerFilter(FilterFunction):
-    def filter(self, value):
-        if value[1] == "AUTOMOBILE":
-            return True
-        else:
-            return False
-
-class LineitemFilter(FilterFunction):
-    def filter(self, value):
-        if (datetime.strptime(value[3], "%Y-%m-%d") > 
datetime.strptime("1995-03-12", "%Y-%m-%d")):
-            return True
-        else:
-            return False
-
-class OrderFilter(FilterFunction):
-    def filter(self, value):
-        if (datetime.strptime(value[2], "%Y-%m-%d") < 
datetime.strptime("1995-03-12", "%Y-%m-%d")):
-            return True
-        else:
-            return False
-
-class SumReducer(ReduceFunction):
-    def reduce(self, value1, value2):
-        return (value1[0], value1[1] + value2[1], value1[2], value1[3])
-
-class CustomerOrderJoin(JoinFunction):
-    def join(self, value1, value2):
-        return (value2[0], 0.0, value2[2], value2[3])
-
-class CustomerOrderLineitemJoin(JoinFunction):
-    def join(self, value1, value2):
-        return (value1[0], value2[1] * (1 - value2[2]), value1[2], value1[3])
-
-
-
-if __name__ == "__main__":
-    env = get_environment()
-
-    if len(sys.argv) != 5:
-        sys.exit("Usage: ./bin/pyflink.sh TPCHQuery3 <lineitem path> <customer 
path> <order path> <output path>")
-    lineitem = env \
-        .read_csv(sys.argv[1], [INT, INT, INT, INT, INT, FLOAT, FLOAT, FLOAT,
-             STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], 
'\n', '|') \
-        .project(0,5,6,10) \
-        .filter(LineitemFilter())
-
-
-    customer = env \
-        .read_csv(sys.argv[2], 
-            [INT, STRING, STRING, INT, STRING, FLOAT, STRING, STRING], '\n', 
'|') \
-        .project(0,6) \
-        .filter(CustomerFilter())
-
-    order = env \
-        .read_csv(sys.argv[3], 
-            [INT, INT, STRING, FLOAT, STRING, STRING, STRING, INT, STRING], 
'\n', '|') \
-        .project(0,1,4,7) \
-        .filter(OrderFilter())
-
-    customerWithOrder = customer \
-        .join(order) \
-        .where(0) \
-        .equal_to(1) \
-        .using(CustomerOrderJoin(),[INT, FLOAT, STRING, INT])
-
-    result = customerWithOrder \
-        .join(lineitem) \
-        .where(0) \
-        .equal_to(0) \
-        .using(CustomerOrderLineitemJoin(), [INT, FLOAT, STRING, INT]) \
-        .group_by(0, 2, 3) \
-        .reduce(SumReducer())
-
-    result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE)
-
-    env.set_degree_of_parallelism(1)
-
-    env.execute(local=True)
-
-

Reply via email to