[FLINK-3057] Bidrectional plan connection

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4d9469
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4d9469
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4d9469

Branch: refs/heads/master
Commit: fc4d9469d9d3cf17f877b2c49f345ed0001322c1
Parents: 0ac5d40
Author: zentol <ches...@apache.org>
Authored: Sun Nov 22 16:23:29 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Wed Jan 20 06:03:38 2016 +0100

----------------------------------------------------------------------
 .../flink/python/api/PythonOperationInfo.java   | 126 +++---
 .../flink/python/api/PythonPlanBinder.java      |  69 +--
 .../python/api/functions/PythonCoGroup.java     |   2 +-
 .../api/functions/PythonCombineIdentity.java    |   2 +-
 .../api/functions/PythonMapPartition.java       |   2 +-
 .../python/api/streaming/PythonStreamer.java    | 374 ----------------
 .../flink/python/api/streaming/Receiver.java    | 384 -----------------
 .../flink/python/api/streaming/Sender.java      | 427 -------------------
 .../python/api/streaming/StreamPrinter.java     |  55 ---
 .../api/streaming/data/PythonReceiver.java      | 267 ++++++++++++
 .../python/api/streaming/data/PythonSender.java | 420 ++++++++++++++++++
 .../api/streaming/data/PythonStreamer.java      | 375 ++++++++++++++++
 .../api/streaming/plan/PythonPlanReceiver.java  | 107 +++++
 .../api/streaming/plan/PythonPlanSender.java    | 116 +++++
 .../api/streaming/plan/PythonPlanStreamer.java  |  98 +++++
 .../api/streaming/util/StreamPrinter.java       |  55 +++
 .../python/api/flink/connection/Connection.py   |  30 +-
 .../python/api/flink/connection/Iterator.py     |  40 ++
 .../flink/python/api/flink/plan/Environment.py  |  22 +-
 19 files changed, 1588 insertions(+), 1383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
index 0ccf568..5cf3621 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.python.api.PythonPlanBinder.Operation;
-import org.apache.flink.python.api.streaming.Receiver;
+import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 
 public class PythonOperationInfo {
        public int parentID; //DataSet that an operation is applied on
@@ -48,88 +48,88 @@ public class PythonOperationInfo {
        public boolean toError;
        public String name;
 
-       public PythonOperationInfo(Receiver receiver, Operation identifier) 
throws IOException {
+       public PythonOperationInfo(PythonPlanStreamer streamer, Operation 
identifier) throws IOException {
                Object tmpType;
                switch (identifier) {
                        case SOURCE_CSV:
-                               setID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               fieldDelimiter = (String) receiver.getRecord();
-                               lineDelimiter = (String) receiver.getRecord();
-                               tmpType = (Tuple) receiver.getRecord();
+                               setID = (Integer) streamer.getRecord(true);
+                               path = (String) streamer.getRecord();
+                               fieldDelimiter = (String) streamer.getRecord();
+                               lineDelimiter = (String) streamer.getRecord();
+                               tmpType = (Tuple) streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
                                return;
                        case SOURCE_TEXT:
-                               setID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
+                               setID = (Integer) streamer.getRecord(true);
+                               path = (String) streamer.getRecord();
                                return;
                        case SOURCE_VALUE:
-                               setID = (Integer) receiver.getRecord(true);
-                               int valueCount = (Integer) 
receiver.getRecord(true);
+                               setID = (Integer) streamer.getRecord(true);
+                               int valueCount = (Integer) 
streamer.getRecord(true);
                                values = new Object[valueCount];
                                for (int x = 0; x < valueCount; x++) {
-                                       values[x] = receiver.getRecord();
+                                       values[x] = streamer.getRecord();
                                }
                                return;
                        case SOURCE_SEQ:
-                               setID = (Integer) receiver.getRecord(true);
-                               from = (Long) receiver.getRecord();
-                               to = (Long) receiver.getRecord();
+                               setID = (Integer) streamer.getRecord(true);
+                               from = (Long) streamer.getRecord();
+                               to = (Long) streamer.getRecord();
                                return;
                        case SINK_CSV:
-                               parentID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               fieldDelimiter = (String) receiver.getRecord();
-                               lineDelimiter = (String) receiver.getRecord();
-                               writeMode = ((Integer) 
receiver.getRecord(true)) == 1
+                               parentID = (Integer) streamer.getRecord(true);
+                               path = (String) streamer.getRecord();
+                               fieldDelimiter = (String) streamer.getRecord();
+                               lineDelimiter = (String) streamer.getRecord();
+                               writeMode = ((Integer) 
streamer.getRecord(true)) == 1
                                                ? WriteMode.OVERWRITE
                                                : WriteMode.NO_OVERWRITE;
                                return;
                        case SINK_TEXT:
-                               parentID = (Integer) receiver.getRecord(true);
-                               path = (String) receiver.getRecord();
-                               writeMode = ((Integer) 
receiver.getRecord(true)) == 1
+                               parentID = (Integer) streamer.getRecord(true);
+                               path = (String) streamer.getRecord();
+                               writeMode = ((Integer) 
streamer.getRecord(true)) == 1
                                                ? WriteMode.OVERWRITE
                                                : WriteMode.NO_OVERWRITE;
                                return;
                        case SINK_PRINT:
-                               parentID = (Integer) receiver.getRecord(true);
-                               toError = (Boolean) receiver.getRecord();
+                               parentID = (Integer) streamer.getRecord(true);
+                               toError = (Boolean) streamer.getRecord();
                                return;
                        case BROADCAST:
-                               parentID = (Integer) receiver.getRecord(true);
-                               otherID = (Integer) receiver.getRecord(true);
-                               name = (String) receiver.getRecord();
+                               parentID = (Integer) streamer.getRecord(true);
+                               otherID = (Integer) streamer.getRecord(true);
+                               name = (String) streamer.getRecord();
                                return;
                }
-               setID = (Integer) receiver.getRecord(true);
-               parentID = (Integer) receiver.getRecord(true);
+               setID = (Integer) streamer.getRecord(true);
+               parentID = (Integer) streamer.getRecord(true);
                switch (identifier) {
                        case AGGREGATE:
-                               count = (Integer) receiver.getRecord(true);
+                               count = (Integer) streamer.getRecord(true);
                                aggregates = new AggregationEntry[count];
                                for (int x = 0; x < count; x++) {
-                                       int encodedAgg = (Integer) 
receiver.getRecord(true);
-                                       int field = (Integer) 
receiver.getRecord(true);
+                                       int encodedAgg = (Integer) 
streamer.getRecord(true);
+                                       int field = (Integer) 
streamer.getRecord(true);
                                        aggregates[x] = new 
AggregationEntry(encodedAgg, field);
                                }
                                return;
                        case FIRST:
-                               count = (Integer) receiver.getRecord(true);
+                               count = (Integer) streamer.getRecord(true);
                                return;
                        case DISTINCT:
                        case GROUPBY:
                        case PARTITION_HASH:
-                               keys = normalizeKeys(receiver.getRecord(true));
+                               keys = normalizeKeys(streamer.getRecord(true));
                                return;
                        case PROJECTION:
-                               fields = toIntArray(receiver.getRecord(true));
+                               fields = toIntArray(streamer.getRecord(true));
                                return;
                        case REBALANCE:
                                return;
                        case SORT:
-                               field = (Integer) receiver.getRecord(true);
-                               int encodedOrder = (Integer) 
receiver.getRecord(true);
+                               field = (Integer) streamer.getRecord(true);
+                               int encodedOrder = (Integer) 
streamer.getRecord(true);
                                switch (encodedOrder) {
                                        case 0:
                                                order = Order.NONE;
@@ -149,62 +149,62 @@ public class PythonOperationInfo {
                                }
                                return;
                        case UNION:
-                               otherID = (Integer) receiver.getRecord(true);
+                               otherID = (Integer) streamer.getRecord(true);
                                return;
                        case COGROUP:
-                               otherID = (Integer) receiver.getRecord(true);
-                               keys1 = normalizeKeys(receiver.getRecord(true));
-                               keys2 = normalizeKeys(receiver.getRecord(true));
-                               tmpType = receiver.getRecord();
+                               otherID = (Integer) streamer.getRecord(true);
+                               keys1 = normalizeKeys(streamer.getRecord(true));
+                               keys2 = normalizeKeys(streamer.getRecord(true));
+                               tmpType = streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) receiver.getRecord();
+                               name = (String) streamer.getRecord();
                                return;
                        case CROSS:
                        case CROSS_H:
                        case CROSS_T:
-                               otherID = (Integer) receiver.getRecord(true);
-                               tmpType = receiver.getRecord();
+                               otherID = (Integer) streamer.getRecord(true);
+                               tmpType = streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
-                               int cProjectCount = (Integer) 
receiver.getRecord(true);
+                               int cProjectCount = (Integer) 
streamer.getRecord(true);
                                projections = new 
ProjectionEntry[cProjectCount];
                                for (int x = 0; x < cProjectCount; x++) {
-                                       String side = (String) 
receiver.getRecord();
-                                       int[] keys = toIntArray((Tuple) 
receiver.getRecord(true));
+                                       String side = (String) 
streamer.getRecord();
+                                       int[] keys = toIntArray((Tuple) 
streamer.getRecord(true));
                                        projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
                                }
-                               name = (String) receiver.getRecord();
+                               name = (String) streamer.getRecord();
                                return;
                        case REDUCE:
                        case GROUPREDUCE:
-                               tmpType = receiver.getRecord();
+                               tmpType = streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
-                               combine = (Boolean) receiver.getRecord();
-                               name = (String) receiver.getRecord();
+                               combine = (Boolean) streamer.getRecord();
+                               name = (String) streamer.getRecord();
                                return;
                        case JOIN:
                        case JOIN_H:
                        case JOIN_T:
-                               keys1 = normalizeKeys(receiver.getRecord(true));
-                               keys2 = normalizeKeys(receiver.getRecord(true));
-                               otherID = (Integer) receiver.getRecord(true);
-                               tmpType = receiver.getRecord();
+                               keys1 = normalizeKeys(streamer.getRecord(true));
+                               keys2 = normalizeKeys(streamer.getRecord(true));
+                               otherID = (Integer) streamer.getRecord(true);
+                               tmpType = streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
-                               int jProjectCount = (Integer) 
receiver.getRecord(true);
+                               int jProjectCount = (Integer) 
streamer.getRecord(true);
                                projections = new 
ProjectionEntry[jProjectCount];
                                for (int x = 0; x < jProjectCount; x++) {
-                                       String side = (String) 
receiver.getRecord();
-                                       int[] keys = toIntArray((Tuple) 
receiver.getRecord(true));
+                                       String side = (String) 
streamer.getRecord();
+                                       int[] keys = toIntArray((Tuple) 
streamer.getRecord(true));
                                        projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
                                }
-                               name = (String) receiver.getRecord();
+                               name = (String) streamer.getRecord();
                                return;
                        case MAPPARTITION:
                        case FLATMAP:
                        case MAP:
                        case FILTER:
-                               tmpType = receiver.getRecord();
+                               tmpType = streamer.getRecord();
                                types = tmpType == null ? null : 
getForObject(tmpType);
-                               name = (String) receiver.getRecord();
+                               name = (String) streamer.getRecord();
                                return;
                        default:
                                throw new UnsupportedOperationException("This 
operation is not implemented in the Python API: " + identifier);

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 7c74054..d178dcb 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -19,6 +19,7 @@ import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Random;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -49,8 +50,7 @@ import 
org.apache.flink.python.api.PythonOperationInfo.ProjectionEntry;
 import org.apache.flink.python.api.functions.PythonCoGroup;
 import org.apache.flink.python.api.functions.PythonCombineIdentity;
 import org.apache.flink.python.api.functions.PythonMapPartition;
-import org.apache.flink.python.api.streaming.Receiver;
-import org.apache.flink.python.api.streaming.StreamPrinter;
+import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,15 +76,13 @@ public class PythonPlanBinder {
 
        private static final Random r = new Random();
 
-       private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
+       public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
        private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
File.separator + "resources" + File.separator + "python";
        private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
        private static String FULL_PATH;
 
        public static StringBuilder arguments = new StringBuilder();
 
-       private Process process;
-
        public static boolean usePython3 = false;
 
        private static String FLINK_HDFS_PATH = "hdfs:/tmp";
@@ -94,7 +92,7 @@ public class PythonPlanBinder {
 
        private HashMap<Integer, Object> sets = new HashMap();
        public ExecutionEnvironment env;
-       private Receiver receiver;
+       private PythonPlanStreamer streamer;
 
        public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
 
@@ -145,7 +143,8 @@ public class PythonPlanBinder {
                        }
 
                        distributeFiles(tmpPath, env);
-                       env.execute();
+                       JobExecutionResult jer = env.execute();
+                       sendResult(jer);
                        close();
                } catch (Exception e) {
                        close();
@@ -205,41 +204,13 @@ public class PythonPlanBinder {
                for (String arg : args) {
                        arguments.append(" ").append(arg);
                }
-               String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + 
r.nextInt();
-               receiver = new Receiver(null);
-               receiver.open(mappedFilePath);
-
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-               try {
-                       Runtime.getRuntime().exec(pythonBinaryPath);
-               } catch (IOException ex) {
-                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
-               }
-               process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
tempPath + FLINK_PYTHON_PLAN_NAME + arguments.toString());
-
-               new StreamPrinter(process.getInputStream()).start();
-               new StreamPrinter(process.getErrorStream()).start();
-
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException ex) {
-               }
-
-               try {
-                       int value = process.exitValue();
-                       if (value != 0) {
-                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
-                       }
-                       if (value == 0) {
-                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
-                       }
-               } catch (IllegalThreadStateException ise) {//Process still 
running
-               }
+               streamer = new PythonPlanStreamer();
+               streamer.open(tempPath, arguments.toString());
+       }
 
-               process.getOutputStream().write("plan\n".getBytes());
-               process.getOutputStream().write((mappedFilePath + 
"\n").getBytes());
-               process.getOutputStream().flush();
+       private void sendResult(JobExecutionResult jer) throws IOException {
+               long runtime = jer.getNetRuntime();
+               streamer.sendRecord(runtime);
        }
 
        private void close() {
@@ -252,18 +223,12 @@ public class PythonPlanBinder {
                        FileSystem local = FileSystem.getLocalFileSystem();
                        local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
                        local.delete(new Path(FLINK_TMP_DATA_DIR), true);
-                       receiver.close();
+                       streamer.close();
                } catch (NullPointerException npe) {
                } catch (IOException ioe) {
                        LOG.error("PythonAPI file cleanup failed. " + 
ioe.getMessage());
                } catch (URISyntaxException use) { // can't occur
                }
-               try {
-                       process.exitValue();
-               } catch (NullPointerException npe) { //exception occurred 
before process was started
-               } catch (IllegalThreadStateException ise) { //process still 
active
-                       process.destroy();
-               }
        }
 
        
//====Plan==========================================================================================================
@@ -285,7 +250,7 @@ public class PythonPlanBinder {
 
        private void receiveParameters() throws IOException {
                for (int x = 0; x < 4; x++) {
-                       Tuple value = (Tuple) receiver.getRecord(true);
+                       Tuple value = (Tuple) streamer.getRecord(true);
                        switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
                                case DOP:
                                        Integer dop = (Integer) 
value.getField(1);
@@ -321,9 +286,9 @@ public class PythonPlanBinder {
        }
 
        private void receiveOperations() throws IOException {
-               Integer operationCount = (Integer) receiver.getRecord(true);
+               Integer operationCount = (Integer) streamer.getRecord(true);
                for (int x = 0; x < operationCount; x++) {
-                       String identifier = (String) receiver.getRecord();
+                       String identifier = (String) streamer.getRecord();
                        Operation op = null;
                        try {
                                op = 
Operation.valueOf(identifier.toUpperCase());
@@ -435,7 +400,7 @@ public class PythonPlanBinder {
         * @throws IOException
         */
        private PythonOperationInfo createOperationInfo(Operation 
operationIdentifier) throws IOException {
-               return new PythonOperationInfo(receiver, operationIdentifier);
+               return new PythonOperationInfo(streamer, operationIdentifier);
        }
 
        private void createCsvSource(PythonOperationInfo info) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 4d74878..2349aa9 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -14,7 +14,7 @@ package org.apache.flink.python.api.functions;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 import java.io.IOException;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
index 9b3189b..f80d975 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java
@@ -13,7 +13,7 @@
 package org.apache.flink.python.api.functions;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 import java.io.IOException;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 1578e18..50b2cf4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -17,7 +17,7 @@ import 
org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonStreamer;
 import org.apache.flink.util.Collector;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
deleted file mode 100644
index 9e4f479..0000000
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.Iterator;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
-import static org.apache.flink.python.api.PythonPlanBinder.DEBUG;
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
-import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This streamer is used by functions to send/receive data to/from an external 
python process.
- */
-public class PythonStreamer implements Serializable {
-       protected static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamer.class);
-       private static final int SIGNAL_BUFFER_REQUEST = 0;
-       private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
-       private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
-       private static final int SIGNAL_FINISHED = -1;
-       private static final int SIGNAL_ERROR = -2;
-       private static final byte SIGNAL_LAST = 32;
-
-       private final int id;
-       private final boolean usePython3;
-       private final boolean debug;
-       private final String planArguments;
-
-       private String inputFilePath;
-       private String outputFilePath;
-
-       private final byte[] buffer = new byte[4];
-
-       private Process process;
-       private Thread shutdownThread;
-       protected ServerSocket server;
-       protected Socket socket;
-       protected InputStream in;
-       protected OutputStream out;
-       protected int port;
-
-       protected Sender sender;
-       protected Receiver receiver;
-
-       protected StringBuilder msg = new StringBuilder();
-
-       protected final AbstractRichFunction function;
-
-       public PythonStreamer(AbstractRichFunction function, int id) {
-               this.id = id;
-               this.usePython3 = PythonPlanBinder.usePython3;
-               this.debug = DEBUG;
-               planArguments = PythonPlanBinder.arguments.toString();
-               sender = new Sender(function);
-               receiver = new Receiver(function);
-               this.function = function;
-       }
-
-       /**
-        * Starts the python script.
-        *
-        * @throws IOException
-        */
-       public void open() throws IOException {
-               server = new ServerSocket(0);
-               startPython();
-       }
-
-       private void startPython() throws IOException {
-               this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-               this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
-
-               sender.open(inputFilePath);
-               receiver.open(outputFilePath);
-
-               String path = 
function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
-               String planPath = path + FLINK_PYTHON_PLAN_NAME;
-
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
-
-               try {
-                       Runtime.getRuntime().exec(pythonBinaryPath);
-               } catch (IOException ex) {
-                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
-               }
-
-               if (debug) {
-                       socket.setSoTimeout(0);
-                       LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
-                                       + " Run python " + planPath + 
planArguments);
-               } else {
-                       process = Runtime.getRuntime().exec(pythonBinaryPath + 
" -O -B " + planPath + planArguments);
-                       new StreamPrinter(process.getInputStream()).start();
-                       new StreamPrinter(process.getErrorStream(), true, 
msg).start();
-               }
-
-               shutdownThread = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       destroyProcess();
-                               } catch (IOException ex) {
-                               }
-                       }
-               };
-
-               Runtime.getRuntime().addShutdownHook(shutdownThread);
-
-               OutputStream processOutput = process.getOutputStream();
-               processOutput.write("operator\n".getBytes());
-               processOutput.write(("" + server.getLocalPort() + 
"\n").getBytes());
-               processOutput.write((id + "\n").getBytes());
-               processOutput.write((inputFilePath + "\n").getBytes());
-               processOutput.write((outputFilePath + "\n").getBytes());
-               processOutput.flush();
-
-               try { // wait a bit to catch syntax errors
-                       Thread.sleep(2000);
-               } catch (InterruptedException ex) {
-               }
-               if (!debug) {
-                       try {
-                               process.exitValue();
-                               throw new RuntimeException("External process 
for task " + function.getRuntimeContext().getTaskName() + " terminated 
prematurely." + msg);
-                       } catch (IllegalThreadStateException ise) { //process 
still active -> start receiving data
-                       }
-               }
-
-               socket = server.accept();
-               in = socket.getInputStream();
-               out = socket.getOutputStream();
-       }
-
-       /**
-        * Closes this streamer.
-        *
-        * @throws IOException
-        */
-       public void close() throws IOException {
-               try {
-               socket.close();
-               sender.close();
-               receiver.close();
-               } catch (Exception e) {
-                       LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
-               }
-               if (!debug) {
-                       destroyProcess();
-               }
-               if (shutdownThread != null) {
-                       Runtime.getRuntime().removeShutdownHook(shutdownThread);
-               }
-       }
-
-       private void destroyProcess() throws IOException {
-               try {
-                       process.exitValue();
-               } catch (IllegalThreadStateException ise) { //process still 
active
-                       if 
(process.getClass().getName().equals("java.lang.UNIXProcess")) {
-                               int pid;
-                               try {
-                                       Field f = 
process.getClass().getDeclaredField("pid");
-                                       f.setAccessible(true);
-                                       pid = f.getInt(process);
-                               } catch (Throwable e) {
-                                       process.destroy();
-                                       return;
-                               }
-                               String[] args = new String[]{"kill", "-9", "" + 
pid};
-                               Runtime.getRuntime().exec(args);
-                       } else {
-                               process.destroy();
-                       }
-               }
-       }
-       
-               private void sendWriteNotification(int size, boolean hasNext) 
throws IOException {
-               byte[] tmp = new byte[5];
-               putInt(tmp, 0, size);
-               tmp[4] = hasNext ? 0 : SIGNAL_LAST;
-               out.write(tmp, 0, 5);
-               out.flush();
-       }
-
-       private void sendReadConfirmation() throws IOException {
-               out.write(new byte[1], 0, 1);
-               out.flush();
-       }
-
-       private void checkForError() {
-               if (getInt(buffer, 0) == -2) {
-                       try { //wait before terminating to ensure that the 
complete error message is printed
-                               Thread.sleep(2000);
-                       } catch (InterruptedException ex) {
-                       }
-                       throw new RuntimeException(
-                                       "External process for task " + 
function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
-               }
-       }
-
-       /**
-        * Sends all broadcast-variables encoded in the configuration to the 
external process.
-        *
-        * @param config configuration object containing broadcast-variable 
count and names
-        * @throws IOException
-        */
-       public final void sendBroadCastVariables(Configuration config) throws 
IOException {
-               try {
-                       int broadcastCount = 
config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
-
-                       String[] names = new String[broadcastCount];
-
-                       for (int x = 0; x < names.length; x++) {
-                               names[x] = 
config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
-                       }
-
-                       in.read(buffer, 0, 4);
-                       checkForError();
-                       int size = sender.sendRecord(broadcastCount);
-                       sendWriteNotification(size, false);
-
-                       for (String name : names) {
-                               Iterator bcv = 
function.getRuntimeContext().getBroadcastVariable(name).iterator();
-
-                               in.read(buffer, 0, 4);
-                               checkForError();
-                               size = sender.sendRecord(name);
-                               sendWriteNotification(size, false);
-
-                               while (bcv.hasNext() || sender.hasRemaining(0)) 
{
-                                       in.read(buffer, 0, 4);
-                                       checkForError();
-                                       size = sender.sendBuffer(bcv, 0);
-                                       sendWriteNotification(size, 
bcv.hasNext() || sender.hasRemaining(0));
-                               }
-                               sender.reset();
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       /**
-        * Sends all values contained in the iterator to the external process 
and collects all results.
-        *
-        * @param i iterator
-        * @param c collector
-        * @throws IOException
-        */
-       public final void streamBufferWithoutGroups(Iterator i, Collector c) 
throws IOException {
-               try {
-                       int size;
-                       if (i.hasNext()) {
-                               while (true) {
-                                       in.read(buffer, 0, 4);
-                                       int sig = getInt(buffer, 0);
-                                       switch (sig) {
-                                               case SIGNAL_BUFFER_REQUEST:
-                                                       if (i.hasNext() || 
sender.hasRemaining(0)) {
-                                                               size = 
sender.sendBuffer(i, 0);
-                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
-                                                       } else {
-                                                               throw new 
RuntimeException("External process requested data even though none is 
available.");
-                                                       }
-                                                       break;
-                                               case SIGNAL_FINISHED:
-                                                       return;
-                                               case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ex) {
-                                                       }
-                                                       throw new 
RuntimeException(
-                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
-                                               default:
-                                                       
receiver.collectBuffer(c, sig);
-                                                       sendReadConfirmation();
-                                                       break;
-                                       }
-                               }
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       /**
-        * Sends all values contained in both iterators to the external process 
and collects all results.
-        *
-        * @param i1 iterator
-        * @param i2 iterator
-        * @param c collector
-        * @throws IOException
-        */
-       public final void streamBufferWithGroups(Iterator i1, Iterator i2, 
Collector c) throws IOException {
-               try {
-                       int size;
-                       if (i1.hasNext() || i2.hasNext()) {
-                               while (true) {
-                                       in.read(buffer, 0, 4);
-                                       int sig = getInt(buffer, 0);
-                                       switch (sig) {
-                                               case SIGNAL_BUFFER_REQUEST_G0:
-                                                       if (i1.hasNext() || 
sender.hasRemaining(0)) {
-                                                               size = 
sender.sendBuffer(i1, 0);
-                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
-                                                       }
-                                                       break;
-                                               case SIGNAL_BUFFER_REQUEST_G1:
-                                                       if (i2.hasNext() || 
sender.hasRemaining(1)) {
-                                                               size = 
sender.sendBuffer(i2, 1);
-                                                               
sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
-                                                       }
-                                                       break;
-                                               case SIGNAL_FINISHED:
-                                                       return;
-                                               case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ex) {
-                                                       }
-                                                       throw new 
RuntimeException(
-                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
-                                               default:
-                                                       
receiver.collectBuffer(c, sig);
-                                                       sendReadConfirmation();
-                                                       break;
-                                       }
-                               }
-                       }
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-       }
-
-       protected final static int getInt(byte[] array, int offset) {
-               return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 
| (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
-       }
-
-       protected final static void putInt(byte[] array, int offset, int value) 
{
-               array[offset] = (byte) (value >> 24);
-               array[offset + 1] = (byte) (value >> 16);
-               array[offset + 2] = (byte) (value >> 8);
-               array[offset + 3] = (byte) (value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
deleted file mode 100644
index a706053..0000000
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BOOLEAN;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTES;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_DOUBLE;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_FLOAT;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_INTEGER;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_LONG;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_NULL;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING;
-import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-import org.apache.flink.util.Collector;
-
-/**
- * General-purpose class to read data from memory-mapped files.
- */
-public class Receiver implements Serializable {
-       private static final long serialVersionUID = -2474088929850009968L;
-
-       private final AbstractRichFunction function;
-
-       private File inputFile;
-       private RandomAccessFile inputRAF;
-       private FileChannel inputChannel;
-       private MappedByteBuffer fileBuffer;
-
-       private Deserializer<?> deserializer = null;
-
-       public Receiver(AbstractRichFunction function) {
-               this.function = function;
-       }
-
-       
//=====Setup========================================================================================================
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
-       }
-
-       private void setupMappedFile(String inputFilePath) throws 
FileNotFoundException, IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
-
-               inputFile = new File(inputFilePath);
-               if (inputFile.exists()) {
-                       inputFile.delete();
-               }
-               inputFile.createNewFile();
-               inputRAF = new RandomAccessFile(inputFilePath, "rw");
-               inputRAF.setLength(MAPPED_FILE_SIZE);
-               inputRAF.seek(MAPPED_FILE_SIZE - 1);
-               inputRAF.writeByte(0);
-               inputRAF.seek(0);
-               inputChannel = inputRAF.getChannel();
-               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
-       }
-
-       public void close() throws IOException {
-               closeMappedFile();
-       }
-
-       private void closeMappedFile() throws IOException {
-               inputChannel.close();
-               inputRAF.close();
-       }
-
-       
//=====Record-API===================================================================================================
-       /**
-        * Loads a buffer from the memory-mapped file. The records contained 
within the buffer can be accessed using
-        * collectRecord(). These records do not necessarily have to be of the 
same type. This method requires external
-        * synchronization.
-        *
-        * @throws IOException
-        */
-       private void loadBuffer() throws IOException {
-               int count = 0;
-               while (fileBuffer.get(0) == 0 && count < 10) {
-                       try {
-                               Thread.sleep(1000);
-                       } catch (InterruptedException ie) {
-                       }
-                       fileBuffer.load();
-                       count++;
-               }
-               if (fileBuffer.get(0) == 0) {
-                       throw new RuntimeException("External process not 
responding.");
-               }
-               fileBuffer.position(1);
-       }
-
-       /**
-        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
-        * similar. The PlanBinder requires a method that can return any kind 
of object.
-        *
-        * @return read record
-        * @throws IOException
-        */
-       public Object getRecord() throws IOException {
-               return getRecord(false);
-       }
-
-       /**
-        * Returns a record from the buffer. Note: This method cannot be 
replaced with specific methods like readInt() or
-        * similar. The PlanBinder requires a method that can return any kind 
of object.
-        *
-        * @param normalized flag indicating whether certain types should be 
normalized
-        * @return read record
-        * @throws IOException
-        */
-       public Object getRecord(boolean normalized) throws IOException {
-               if (fileBuffer.position() == 0) {
-                       loadBuffer();
-               }
-               return receiveField(normalized);
-       }
-
-       /**
-        * Reads a single primitive value or tuple from the buffer.
-        *
-        * @return primitive value or tuple
-        * @throws IOException
-        */
-       private Object receiveField(boolean normalized) throws IOException {
-               byte type = fileBuffer.get();
-               switch (type) {
-                       case TYPE_TUPLE:
-                               int tupleSize = fileBuffer.get();
-                               Tuple tuple = createTuple(tupleSize);
-                               for (int x = 0; x < tupleSize; x++) {
-                                       
tuple.setField(receiveField(normalized), x);
-                               }
-                               return tuple;
-                       case TYPE_BOOLEAN:
-                               return fileBuffer.get() == 1;
-                       case TYPE_BYTE:
-                               return fileBuffer.get();
-                       case TYPE_SHORT:
-                               if (normalized) {
-                                       return (int) fileBuffer.getShort();
-                               } else {
-                                       return fileBuffer.getShort();
-                               }
-                       case TYPE_INTEGER:
-                               return fileBuffer.getInt();
-                       case TYPE_LONG:
-                               if (normalized) {
-                                       return new 
Long(fileBuffer.getLong()).intValue();
-                               } else {
-                                       return fileBuffer.getLong();
-                               }
-                       case TYPE_FLOAT:
-                               if (normalized) {
-                                       return (double) fileBuffer.getFloat();
-                               } else {
-                                       return fileBuffer.getFloat();
-                               }
-                       case TYPE_DOUBLE:
-                               return fileBuffer.getDouble();
-                       case TYPE_STRING:
-                               int stringSize = fileBuffer.getInt();
-                               byte[] buffer = new byte[stringSize];
-                               fileBuffer.get(buffer);
-                               return new String(buffer);
-                       case TYPE_BYTES:
-                               int bytessize = fileBuffer.getInt();
-                               byte[] bytebuffer = new byte[bytessize];
-                               fileBuffer.get(bytebuffer);
-                               return bytebuffer;
-                       case TYPE_NULL:
-                               return null;
-                       default:
-                               return new 
CustomTypeDeserializer(type).deserialize();
-               }
-       }
-
-       
//=====Buffered-API=================================================================================================
-       /**
-        * Reads a buffer of the given size from the memory-mapped file, and 
collects all records contained. This method
-        * assumes that all values in the buffer are of the same type. This 
method does NOT take care of synchronization.
-        * The user must guarantee that the buffer was completely written 
before calling this method.
-        *
-        * @param c Collector to collect records
-        * @param bufferSize size of the buffer
-        * @throws IOException
-        */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       public void collectBuffer(Collector c, int bufferSize) throws 
IOException {
-               fileBuffer.position(0);
-
-               if (deserializer == null) {
-                       byte type = fileBuffer.get();
-                       deserializer = getDeserializer(type);
-               }
-               while (fileBuffer.position() < bufferSize) {
-                       c.collect(deserializer.deserialize());
-               }
-       }
-
-       
//=====Deserializer=================================================================================================
-       private Deserializer<?> getDeserializer(byte type) {
-               switch (type) {
-                       case TYPE_TUPLE:
-                               return new TupleDeserializer();
-                       case TYPE_BOOLEAN:
-                               return new BooleanDeserializer();
-                       case TYPE_BYTE:
-                               return new ByteDeserializer();
-                       case TYPE_BYTES:
-                               return new BytesDeserializer();
-                       case TYPE_SHORT:
-                               return new ShortDeserializer();
-                       case TYPE_INTEGER:
-                               return new IntDeserializer();
-                       case TYPE_LONG:
-                               return new LongDeserializer();
-                       case TYPE_STRING:
-                               return new StringDeserializer();
-                       case TYPE_FLOAT:
-                               return new FloatDeserializer();
-                       case TYPE_DOUBLE:
-                               return new DoubleDeserializer();
-                       case TYPE_NULL:
-                               return new NullDeserializer();
-                       default:
-                               return new CustomTypeDeserializer(type);
-
-               }
-       }
-
-       private interface Deserializer<T> {
-               public T deserialize();
-       }
-
-       private class CustomTypeDeserializer implements 
Deserializer<CustomTypeWrapper> {
-               private final byte type;
-
-               public CustomTypeDeserializer(byte type) {
-                       this.type = type;
-               }
-
-               @Override
-               public CustomTypeWrapper deserialize() {
-                       int size = fileBuffer.getInt();
-                       byte[] data = new byte[size];
-                       fileBuffer.get(data);
-                       return new CustomTypeWrapper(type, data);
-               }
-       }
-
-       private class BooleanDeserializer implements Deserializer<Boolean> {
-               @Override
-               public Boolean deserialize() {
-                       return fileBuffer.get() == 1;
-               }
-       }
-
-       private class ByteDeserializer implements Deserializer<Byte> {
-               @Override
-               public Byte deserialize() {
-                       return fileBuffer.get();
-               }
-       }
-
-       private class ShortDeserializer implements Deserializer<Short> {
-               @Override
-               public Short deserialize() {
-                       return fileBuffer.getShort();
-               }
-       }
-
-       private class IntDeserializer implements Deserializer<Integer> {
-               @Override
-               public Integer deserialize() {
-                       return fileBuffer.getInt();
-               }
-       }
-
-       private class LongDeserializer implements Deserializer<Long> {
-               @Override
-               public Long deserialize() {
-                       return fileBuffer.getLong();
-               }
-       }
-
-       private class FloatDeserializer implements Deserializer<Float> {
-               @Override
-               public Float deserialize() {
-                       return fileBuffer.getFloat();
-               }
-       }
-
-       private class DoubleDeserializer implements Deserializer<Double> {
-               @Override
-               public Double deserialize() {
-                       return fileBuffer.getDouble();
-               }
-       }
-
-       private class StringDeserializer implements Deserializer<String> {
-               private int size;
-
-               @Override
-               public String deserialize() {
-                       size = fileBuffer.getInt();
-                       byte[] buffer = new byte[size];
-                       fileBuffer.get(buffer);
-                       return new String(buffer);
-               }
-       }
-
-       private class NullDeserializer implements Deserializer<Object> {
-               @Override
-               public Object deserialize() {
-                       return null;
-               }
-       }
-
-       private class BytesDeserializer implements Deserializer<byte[]> {
-               @Override
-               public byte[] deserialize() {
-                       int length = fileBuffer.getInt();
-                       byte[] result = new byte[length];
-                       fileBuffer.get(result);
-                       return result;
-               }
-
-       }
-
-       private class TupleDeserializer implements Deserializer<Tuple> {
-               Deserializer<?>[] deserializer = null;
-               Tuple reuse;
-
-               public TupleDeserializer() {
-                       int size = fileBuffer.getInt();
-                       reuse = createTuple(size);
-                       deserializer = new Deserializer[size];
-                       for (int x = 0; x < deserializer.length; x++) {
-                               deserializer[x] = 
getDeserializer(fileBuffer.get());
-                       }
-               }
-
-               @Override
-               public Tuple deserialize() {
-                       for (int x = 0; x < deserializer.length; x++) {
-                               reuse.setField(deserializer[x].deserialize(), 
x);
-                       }
-                       return reuse;
-               }
-       }
-
-       public static Tuple createTuple(int size) {
-               try {
-                       return Tuple.getTupleClass(size).newInstance();
-               } catch (InstantiationException e) {
-                       throw new RuntimeException(e);
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
deleted file mode 100644
index 2db1441..0000000
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java
+++ /dev/null
@@ -1,427 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-import org.apache.flink.python.api.types.CustomTypeWrapper;
-
-/**
- * General-purpose class to write data to memory-mapped files.
- */
-public class Sender implements Serializable {
-       public static final byte TYPE_TUPLE = (byte) 11;
-       public static final byte TYPE_BOOLEAN = (byte) 10;
-       public static final byte TYPE_BYTE = (byte) 9;
-       public static final byte TYPE_SHORT = (byte) 8;
-       public static final byte TYPE_INTEGER = (byte) 7;
-       public static final byte TYPE_LONG = (byte) 6;
-       public static final byte TYPE_DOUBLE = (byte) 4;
-       public static final byte TYPE_FLOAT = (byte) 5;
-       public static final byte TYPE_CHAR = (byte) 3;
-       public static final byte TYPE_STRING = (byte) 2;
-       public static final byte TYPE_BYTES = (byte) 1;
-       public static final byte TYPE_NULL = (byte) 0;
-
-       private final AbstractRichFunction function;
-
-       private File outputFile;
-       private RandomAccessFile outputRAF;
-       private FileChannel outputChannel;
-       private MappedByteBuffer fileBuffer;
-
-       private final ByteBuffer[] saved = new ByteBuffer[2];
-
-       private final Serializer[] serializer = new Serializer[2];
-
-       public Sender(AbstractRichFunction function) {
-               this.function = function;
-       }
-
-       
//=====Setup========================================================================================================
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
-       }
-
-       private void setupMappedFile(String outputFilePath) throws 
FileNotFoundException, IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
-
-               outputFile = new File(outputFilePath);
-               if (outputFile.exists()) {
-                       outputFile.delete();
-               }
-               outputFile.createNewFile();
-               outputRAF = new RandomAccessFile(outputFilePath, "rw");
-               outputRAF.setLength(MAPPED_FILE_SIZE);
-               outputRAF.seek(MAPPED_FILE_SIZE - 1);
-               outputRAF.writeByte(0);
-               outputRAF.seek(0);
-               outputChannel = outputRAF.getChannel();
-               fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
-       }
-
-       public void close() throws IOException {
-               closeMappedFile();
-       }
-
-       private void closeMappedFile() throws IOException {
-               outputChannel.close();
-               outputRAF.close();
-       }
-
-       /**
-        * Resets this object to the post-configuration state.
-        */
-       public void reset() {
-               serializer[0] = null;
-               serializer[1] = null;
-               fileBuffer.clear();
-       }
-
-       
//=====Serialization================================================================================================
-       /**
-        * Writes a single record to the memory-mapped file. This method does 
NOT take care of synchronization. The user
-        * must guarantee that the file may be written to before calling this 
method. This method essentially reserves the
-        * whole buffer for one record. As such it imposes some performance 
restrictions and should only be used when
-        * absolutely necessary.
-        *
-        * @param value record to send
-        * @return size of the written buffer
-        * @throws IOException
-        */
-       public int sendRecord(Object value) throws IOException {
-               fileBuffer.clear();
-               int group = 0;
-
-               serializer[group] = getSerializer(value);
-               ByteBuffer bb = serializer[group].serialize(value);
-               if (bb.remaining() > MAPPED_FILE_SIZE) {
-                       throw new RuntimeException("Serialized object does not 
fit into a single buffer.");
-               }
-               fileBuffer.put(bb);
-
-               int size = fileBuffer.position();
-
-               reset();
-               return size;
-       }
-
-       public boolean hasRemaining(int group) {
-               return saved[group] != null;
-       }
-
-       /**
-        * Extracts records from an iterator and writes them to the 
memory-mapped file. This method assumes that all values
-        * in the iterator are of the same type. This method does NOT take care 
of synchronization. The caller must
-        * guarantee that the file may be written to before calling this method.
-        *
-        * @param i iterator containing records
-        * @param group group to which the iterator belongs, most notably used 
by CoGroup-functions.
-        * @return size of the written buffer
-        * @throws IOException
-        */
-       public int sendBuffer(Iterator i, int group) throws IOException {
-               fileBuffer.clear();
-
-               Object value;
-               ByteBuffer bb;
-               if (serializer[group] == null) {
-                       value = i.next();
-                       serializer[group] = getSerializer(value);
-                       bb = serializer[group].serialize(value);
-                       if (bb.remaining() > MAPPED_FILE_SIZE) {
-                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
-                       }
-                       fileBuffer.put(bb);
-
-               }
-               if (saved[group] != null) {
-                       fileBuffer.put(saved[group]);
-                       saved[group] = null;
-               }
-               while (i.hasNext() && saved[group] == null) {
-                       value = i.next();
-                       bb = serializer[group].serialize(value);
-                       if (bb.remaining() > MAPPED_FILE_SIZE) {
-                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
-                       }
-                       if (bb.remaining() <= fileBuffer.remaining()) {
-                               fileBuffer.put(bb);
-                       } else {
-                               saved[group] = bb;
-                       }
-               }
-
-               int size = fileBuffer.position();
-               return size;
-       }
-
-       private enum SupportedTypes {
-               TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, 
FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
-       }
-
-       
//=====Serializer===================================================================================================
-       private Serializer getSerializer(Object value) throws IOException {
-               String className = 
value.getClass().getSimpleName().toUpperCase();
-               if (className.startsWith("TUPLE")) {
-                       className = "TUPLE";
-               }
-               if (className.startsWith("BYTE[]")) {
-                       className = "BYTES";
-               }
-               SupportedTypes type = SupportedTypes.valueOf(className);
-               switch (type) {
-                       case TUPLE:
-                               fileBuffer.put(TYPE_TUPLE);
-                               fileBuffer.putInt(((Tuple) value).getArity());
-                               return new TupleSerializer((Tuple) value);
-                       case BOOLEAN:
-                               fileBuffer.put(TYPE_BOOLEAN);
-                               return new BooleanSerializer();
-                       case BYTE:
-                               fileBuffer.put(TYPE_BYTE);
-                               return new ByteSerializer();
-                       case BYTES:
-                               fileBuffer.put(TYPE_BYTES);
-                               return new BytesSerializer();
-                       case CHARACTER:
-                               fileBuffer.put(TYPE_CHAR);
-                               return new CharSerializer();
-                       case SHORT:
-                               fileBuffer.put(TYPE_SHORT);
-                               return new ShortSerializer();
-                       case INTEGER:
-                               fileBuffer.put(TYPE_INTEGER);
-                               return new IntSerializer();
-                       case LONG:
-                               fileBuffer.put(TYPE_LONG);
-                               return new LongSerializer();
-                       case STRING:
-                               fileBuffer.put(TYPE_STRING);
-                               return new StringSerializer();
-                       case FLOAT:
-                               fileBuffer.put(TYPE_FLOAT);
-                               return new FloatSerializer();
-                       case DOUBLE:
-                               fileBuffer.put(TYPE_DOUBLE);
-                               return new DoubleSerializer();
-                       case NULL:
-                               fileBuffer.put(TYPE_NULL);
-                               return new NullSerializer();
-                       case CUSTOMTYPEWRAPPER:
-                               fileBuffer.put(((CustomTypeWrapper) 
value).getType());
-                               return new CustomTypeSerializer();
-                       default:
-                               throw new IllegalArgumentException("Unknown 
Type encountered: " + type);
-               }
-       }
-
-       private abstract class Serializer<T> {
-               protected ByteBuffer buffer;
-
-               public Serializer(int capacity) {
-                       buffer = ByteBuffer.allocate(capacity);
-               }
-
-               public ByteBuffer serialize(T value) {
-                       buffer.clear();
-                       serializeInternal(value);
-                       buffer.flip();
-                       return buffer;
-               }
-
-               public abstract void serializeInternal(T value);
-       }
-
-       private class CustomTypeSerializer extends 
Serializer<CustomTypeWrapper> {
-               public CustomTypeSerializer() {
-                       super(0);
-               }
-               @Override
-               public void serializeInternal(CustomTypeWrapper value) {
-                       byte[] bytes = value.getData();
-                       buffer = ByteBuffer.wrap(bytes);
-                       buffer.position(bytes.length);
-               }
-       }
-
-       private class ByteSerializer extends Serializer<Byte> {
-               public ByteSerializer() {
-                       super(1);
-               }
-
-               @Override
-               public void serializeInternal(Byte value) {
-                       buffer.put(value);
-               }
-       }
-
-       private class BooleanSerializer extends Serializer<Boolean> {
-               public BooleanSerializer() {
-                       super(1);
-               }
-
-               @Override
-               public void serializeInternal(Boolean value) {
-                       buffer.put(value ? (byte) 1 : (byte) 0);
-               }
-       }
-
-       private class CharSerializer extends Serializer<Character> {
-               public CharSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Character value) {
-                       buffer.put((value + "").getBytes());
-               }
-       }
-
-       private class ShortSerializer extends Serializer<Short> {
-               public ShortSerializer() {
-                       super(2);
-               }
-
-               @Override
-               public void serializeInternal(Short value) {
-                       buffer.putShort(value);
-               }
-       }
-
-       private class IntSerializer extends Serializer<Integer> {
-               public IntSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Integer value) {
-                       buffer.putInt(value);
-               }
-       }
-
-       private class LongSerializer extends Serializer<Long> {
-               public LongSerializer() {
-                       super(8);
-               }
-
-               @Override
-               public void serializeInternal(Long value) {
-                       buffer.putLong(value);
-               }
-       }
-
-       private class StringSerializer extends Serializer<String> {
-               public StringSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(String value) {
-                       byte[] bytes = value.getBytes();
-                       buffer = ByteBuffer.allocate(bytes.length + 4);
-                       buffer.putInt(bytes.length);
-                       buffer.put(bytes);
-               }
-       }
-
-       private class FloatSerializer extends Serializer<Float> {
-               public FloatSerializer() {
-                       super(4);
-               }
-
-               @Override
-               public void serializeInternal(Float value) {
-                       buffer.putFloat(value);
-               }
-       }
-
-       private class DoubleSerializer extends Serializer<Double> {
-               public DoubleSerializer() {
-                       super(8);
-               }
-
-               @Override
-               public void serializeInternal(Double value) {
-                       buffer.putDouble(value);
-               }
-       }
-
-       private class NullSerializer extends Serializer<Object> {
-               public NullSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(Object value) {
-               }
-       }
-
-       private class BytesSerializer extends Serializer<byte[]> {
-               public BytesSerializer() {
-                       super(0);
-               }
-
-               @Override
-               public void serializeInternal(byte[] value) {
-                       buffer = ByteBuffer.allocate(4 + value.length);
-                       buffer.putInt(value.length);
-                       buffer.put(value);
-               }
-       }
-
-       private class TupleSerializer extends Serializer<Tuple> {
-               private final Serializer[] serializer;
-               private final List<ByteBuffer> buffers;
-
-               public TupleSerializer(Tuple value) throws IOException {
-                       super(0);
-                       serializer = new Serializer[value.getArity()];
-                       buffers = new ArrayList();
-                       for (int x = 0; x < serializer.length; x++) {
-                               serializer[x] = 
getSerializer(value.getField(x));
-                       }
-               }
-
-               @Override
-               public void serializeInternal(Tuple value) {
-                       int length = 0;
-                       for (int x = 0; x < serializer.length; x++) {
-                               serializer[x].buffer.clear();
-                               
serializer[x].serializeInternal(value.getField(x));
-                               length += serializer[x].buffer.position();
-                               buffers.add(serializer[x].buffer);
-                       }
-                       buffer = ByteBuffer.allocate(length);
-                       for (ByteBuffer b : buffers) {
-                               b.flip();
-                               buffer.put(b);
-                       }
-                       buffers.clear();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
deleted file mode 100644
index e42364b..0000000
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.python.api.streaming;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-/**
- * Simple utility class to print all contents of an inputstream to stdout.
- */
-public class StreamPrinter extends Thread {
-       private final BufferedReader reader;
-       private final boolean wrapInException;
-       private StringBuilder msg;
-
-       public StreamPrinter(InputStream stream) {
-               this(stream, false, null);
-       }
-
-       public StreamPrinter(InputStream stream, boolean wrapInException, 
StringBuilder msg) {
-               this.reader = new BufferedReader(new InputStreamReader(stream));
-               this.wrapInException = wrapInException;
-               this.msg = msg;
-       }
-
-       @Override
-       public void run() {
-               String line;
-               try {
-                       if (wrapInException) {
-                               while ((line = reader.readLine()) != null) {
-                                       msg.append("\n" + line);
-                               }
-                       } else {
-                               while ((line = reader.readLine()) != null) {
-                                       System.out.println(line);
-                                       System.out.flush();
-                               }
-                       }
-               } catch (IOException ex) {
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
new file mode 100644
index 0000000..9ed047c
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+import org.apache.flink.util.Collector;
+
+/**
+ * General-purpose class to read data from memory-mapped files.
+ */
+public class PythonReceiver implements Serializable {
+       private static final long serialVersionUID = -2474088929850009968L;
+
+       private File inputFile;
+       private RandomAccessFile inputRAF;
+       private FileChannel inputChannel;
+       private MappedByteBuffer fileBuffer;
+
+       private Deserializer<?> deserializer = null;
+
+       
//=====Setup========================================================================================================
+       public void open(String path) throws IOException {
+               setupMappedFile(path);
+       }
+
+       private void setupMappedFile(String inputFilePath) throws 
FileNotFoundException, IOException {
+               File x = new File(FLINK_TMP_DATA_DIR);
+               x.mkdirs();
+
+               inputFile = new File(inputFilePath);
+               if (inputFile.exists()) {
+                       inputFile.delete();
+               }
+               inputFile.createNewFile();
+               inputRAF = new RandomAccessFile(inputFilePath, "rw");
+               inputRAF.setLength(MAPPED_FILE_SIZE);
+               inputRAF.seek(MAPPED_FILE_SIZE - 1);
+               inputRAF.writeByte(0);
+               inputRAF.seek(0);
+               inputChannel = inputRAF.getChannel();
+               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
+       }
+
+       public void close() throws IOException {
+               closeMappedFile();
+       }
+
+       private void closeMappedFile() throws IOException {
+               inputChannel.close();
+               inputRAF.close();
+       }
+
+       /**
+        * Reads a buffer of the given size from the memory-mapped file, and 
collects all records contained. This method
+        * assumes that all values in the buffer are of the same type. This 
method does NOT take care of synchronization.
+        * The user must guarantee that the buffer was completely written 
before calling this method.
+        *
+        * @param c Collector to collect records
+        * @param bufferSize size of the buffer
+        * @throws IOException
+        */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       public void collectBuffer(Collector c, int bufferSize) throws 
IOException {
+               fileBuffer.position(0);
+
+               if (deserializer == null) {
+                       byte type = fileBuffer.get();
+                       deserializer = getDeserializer(type);
+               }
+               while (fileBuffer.position() < bufferSize) {
+                       c.collect(deserializer.deserialize());
+               }
+       }
+
+       
//=====Deserializer=================================================================================================
+       private Deserializer<?> getDeserializer(byte type) {
+               switch (type) {
+                       case TYPE_TUPLE:
+                               return new TupleDeserializer();
+                       case TYPE_BOOLEAN:
+                               return new BooleanDeserializer();
+                       case TYPE_BYTE:
+                               return new ByteDeserializer();
+                       case TYPE_BYTES:
+                               return new BytesDeserializer();
+                       case TYPE_SHORT:
+                               return new ShortDeserializer();
+                       case TYPE_INTEGER:
+                               return new IntDeserializer();
+                       case TYPE_LONG:
+                               return new LongDeserializer();
+                       case TYPE_STRING:
+                               return new StringDeserializer();
+                       case TYPE_FLOAT:
+                               return new FloatDeserializer();
+                       case TYPE_DOUBLE:
+                               return new DoubleDeserializer();
+                       case TYPE_NULL:
+                               return new NullDeserializer();
+                       default:
+                               return new CustomTypeDeserializer(type);
+
+               }
+       }
+
+       private interface Deserializer<T> {
+               public T deserialize();
+       }
+
+       private class CustomTypeDeserializer implements 
Deserializer<CustomTypeWrapper> {
+               private final byte type;
+
+               public CustomTypeDeserializer(byte type) {
+                       this.type = type;
+               }
+
+               @Override
+               public CustomTypeWrapper deserialize() {
+                       int size = fileBuffer.getInt();
+                       byte[] data = new byte[size];
+                       fileBuffer.get(data);
+                       return new CustomTypeWrapper(type, data);
+               }
+       }
+
+       private class BooleanDeserializer implements Deserializer<Boolean> {
+               @Override
+               public Boolean deserialize() {
+                       return fileBuffer.get() == 1;
+               }
+       }
+
+       private class ByteDeserializer implements Deserializer<Byte> {
+               @Override
+               public Byte deserialize() {
+                       return fileBuffer.get();
+               }
+       }
+
+       private class ShortDeserializer implements Deserializer<Short> {
+               @Override
+               public Short deserialize() {
+                       return fileBuffer.getShort();
+               }
+       }
+
+       private class IntDeserializer implements Deserializer<Integer> {
+               @Override
+               public Integer deserialize() {
+                       return fileBuffer.getInt();
+               }
+       }
+
+       private class LongDeserializer implements Deserializer<Long> {
+               @Override
+               public Long deserialize() {
+                       return fileBuffer.getLong();
+               }
+       }
+
+       private class FloatDeserializer implements Deserializer<Float> {
+               @Override
+               public Float deserialize() {
+                       return fileBuffer.getFloat();
+               }
+       }
+
+       private class DoubleDeserializer implements Deserializer<Double> {
+               @Override
+               public Double deserialize() {
+                       return fileBuffer.getDouble();
+               }
+       }
+
+       private class StringDeserializer implements Deserializer<String> {
+               private int size;
+
+               @Override
+               public String deserialize() {
+                       size = fileBuffer.getInt();
+                       byte[] buffer = new byte[size];
+                       fileBuffer.get(buffer);
+                       return new String(buffer);
+               }
+       }
+
+       private class NullDeserializer implements Deserializer<Object> {
+               @Override
+               public Object deserialize() {
+                       return null;
+               }
+       }
+
+       private class BytesDeserializer implements Deserializer<byte[]> {
+               @Override
+               public byte[] deserialize() {
+                       int length = fileBuffer.getInt();
+                       byte[] result = new byte[length];
+                       fileBuffer.get(result);
+                       return result;
+               }
+
+       }
+
+       private class TupleDeserializer implements Deserializer<Tuple> {
+               Deserializer<?>[] deserializer = null;
+               Tuple reuse;
+
+               public TupleDeserializer() {
+                       int size = fileBuffer.getInt();
+                       reuse = createTuple(size);
+                       deserializer = new Deserializer[size];
+                       for (int x = 0; x < deserializer.length; x++) {
+                               deserializer[x] = 
getDeserializer(fileBuffer.get());
+                       }
+               }
+
+               @Override
+               public Tuple deserialize() {
+                       for (int x = 0; x < deserializer.length; x++) {
+                               reuse.setField(deserializer[x].deserialize(), 
x);
+                       }
+                       return reuse;
+               }
+       }
+
+       public static Tuple createTuple(int size) {
+               try {
+                       return Tuple.getTupleClass(size).newInstance();
+               } catch (InstantiationException e) {
+                       throw new RuntimeException(e);
+               } catch (IllegalAccessException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
new file mode 100644
index 0000000..1d17243
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
+/**
+ * General-purpose class to write data to memory-mapped files.
+ */
+public class PythonSender implements Serializable {
+       public static final byte TYPE_TUPLE = (byte) 11;
+       public static final byte TYPE_BOOLEAN = (byte) 10;
+       public static final byte TYPE_BYTE = (byte) 9;
+       public static final byte TYPE_SHORT = (byte) 8;
+       public static final byte TYPE_INTEGER = (byte) 7;
+       public static final byte TYPE_LONG = (byte) 6;
+       public static final byte TYPE_DOUBLE = (byte) 4;
+       public static final byte TYPE_FLOAT = (byte) 5;
+       public static final byte TYPE_CHAR = (byte) 3;
+       public static final byte TYPE_STRING = (byte) 2;
+       public static final byte TYPE_BYTES = (byte) 1;
+       public static final byte TYPE_NULL = (byte) 0;
+
+       private File outputFile;
+       private RandomAccessFile outputRAF;
+       private FileChannel outputChannel;
+       private MappedByteBuffer fileBuffer;
+
+       private final ByteBuffer[] saved = new ByteBuffer[2];
+
+       private final Serializer[] serializer = new Serializer[2];
+
+       
//=====Setup========================================================================================================
+       public void open(String path) throws IOException {
+               setupMappedFile(path);
+       }
+
+       private void setupMappedFile(String outputFilePath) throws 
FileNotFoundException, IOException {
+               File x = new File(FLINK_TMP_DATA_DIR);
+               x.mkdirs();
+
+               outputFile = new File(outputFilePath);
+               if (outputFile.exists()) {
+                       outputFile.delete();
+               }
+               outputFile.createNewFile();
+               outputRAF = new RandomAccessFile(outputFilePath, "rw");
+               outputRAF.setLength(MAPPED_FILE_SIZE);
+               outputRAF.seek(MAPPED_FILE_SIZE - 1);
+               outputRAF.writeByte(0);
+               outputRAF.seek(0);
+               outputChannel = outputRAF.getChannel();
+               fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
+       }
+
+       public void close() throws IOException {
+               closeMappedFile();
+       }
+
+       private void closeMappedFile() throws IOException {
+               outputChannel.close();
+               outputRAF.close();
+       }
+
+       /**
+        * Resets this object to the post-configuration state.
+        */
+       public void reset() {
+               serializer[0] = null;
+               serializer[1] = null;
+               fileBuffer.clear();
+       }
+
+       
//=====Serialization================================================================================================
+       /**
+        * Writes a single record to the memory-mapped file. This method does 
NOT take care of synchronization. The user
+        * must guarantee that the file may be written to before calling this 
method. This method essentially reserves the
+        * whole buffer for one record. As such it imposes some performance 
restrictions and should only be used when
+        * absolutely necessary.
+        *
+        * @param value record to send
+        * @return size of the written buffer
+        * @throws IOException
+        */
+       public int sendRecord(Object value) throws IOException {
+               fileBuffer.clear();
+               int group = 0;
+
+               serializer[group] = getSerializer(value);
+               ByteBuffer bb = serializer[group].serialize(value);
+               if (bb.remaining() > MAPPED_FILE_SIZE) {
+                       throw new RuntimeException("Serialized object does not 
fit into a single buffer.");
+               }
+               fileBuffer.put(bb);
+
+               int size = fileBuffer.position();
+
+               reset();
+               return size;
+       }
+
+       public boolean hasRemaining(int group) {
+               return saved[group] != null;
+       }
+
+       /**
+        * Extracts records from an iterator and writes them to the 
memory-mapped file. This method assumes that all values
+        * in the iterator are of the same type. This method does NOT take care 
of synchronization. The caller must
+        * guarantee that the file may be written to before calling this method.
+        *
+        * @param i iterator containing records
+        * @param group group to which the iterator belongs, most notably used 
by CoGroup-functions.
+        * @return size of the written buffer
+        * @throws IOException
+        */
+       public int sendBuffer(Iterator i, int group) throws IOException {
+               fileBuffer.clear();
+
+               Object value;
+               ByteBuffer bb;
+               if (serializer[group] == null) {
+                       value = i.next();
+                       serializer[group] = getSerializer(value);
+                       bb = serializer[group].serialize(value);
+                       if (bb.remaining() > MAPPED_FILE_SIZE) {
+                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
+                       }
+                       fileBuffer.put(bb);
+
+               }
+               if (saved[group] != null) {
+                       fileBuffer.put(saved[group]);
+                       saved[group] = null;
+               }
+               while (i.hasNext() && saved[group] == null) {
+                       value = i.next();
+                       bb = serializer[group].serialize(value);
+                       if (bb.remaining() > MAPPED_FILE_SIZE) {
+                               throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
+                       }
+                       if (bb.remaining() <= fileBuffer.remaining()) {
+                               fileBuffer.put(bb);
+                       } else {
+                               saved[group] = bb;
+                       }
+               }
+
+               int size = fileBuffer.position();
+               return size;
+       }
+
+       private enum SupportedTypes {
+               TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, 
FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
+       }
+
+       
//=====Serializer===================================================================================================
+       private Serializer getSerializer(Object value) throws IOException {
+               String className = 
value.getClass().getSimpleName().toUpperCase();
+               if (className.startsWith("TUPLE")) {
+                       className = "TUPLE";
+               }
+               if (className.startsWith("BYTE[]")) {
+                       className = "BYTES";
+               }
+               SupportedTypes type = SupportedTypes.valueOf(className);
+               switch (type) {
+                       case TUPLE:
+                               fileBuffer.put(TYPE_TUPLE);
+                               fileBuffer.putInt(((Tuple) value).getArity());
+                               return new TupleSerializer((Tuple) value);
+                       case BOOLEAN:
+                               fileBuffer.put(TYPE_BOOLEAN);
+                               return new BooleanSerializer();
+                       case BYTE:
+                               fileBuffer.put(TYPE_BYTE);
+                               return new ByteSerializer();
+                       case BYTES:
+                               fileBuffer.put(TYPE_BYTES);
+                               return new BytesSerializer();
+                       case CHARACTER:
+                               fileBuffer.put(TYPE_CHAR);
+                               return new CharSerializer();
+                       case SHORT:
+                               fileBuffer.put(TYPE_SHORT);
+                               return new ShortSerializer();
+                       case INTEGER:
+                               fileBuffer.put(TYPE_INTEGER);
+                               return new IntSerializer();
+                       case LONG:
+                               fileBuffer.put(TYPE_LONG);
+                               return new LongSerializer();
+                       case STRING:
+                               fileBuffer.put(TYPE_STRING);
+                               return new StringSerializer();
+                       case FLOAT:
+                               fileBuffer.put(TYPE_FLOAT);
+                               return new FloatSerializer();
+                       case DOUBLE:
+                               fileBuffer.put(TYPE_DOUBLE);
+                               return new DoubleSerializer();
+                       case NULL:
+                               fileBuffer.put(TYPE_NULL);
+                               return new NullSerializer();
+                       case CUSTOMTYPEWRAPPER:
+                               fileBuffer.put(((CustomTypeWrapper) 
value).getType());
+                               return new CustomTypeSerializer();
+                       default:
+                               throw new IllegalArgumentException("Unknown 
Type encountered: " + type);
+               }
+       }
+
+       private abstract class Serializer<T> {
+               protected ByteBuffer buffer;
+
+               public Serializer(int capacity) {
+                       buffer = ByteBuffer.allocate(capacity);
+               }
+
+               public ByteBuffer serialize(T value) {
+                       buffer.clear();
+                       serializeInternal(value);
+                       buffer.flip();
+                       return buffer;
+               }
+
+               public abstract void serializeInternal(T value);
+       }
+
+       private class CustomTypeSerializer extends 
Serializer<CustomTypeWrapper> {
+               public CustomTypeSerializer() {
+                       super(0);
+               }
+               @Override
+               public void serializeInternal(CustomTypeWrapper value) {
+                       byte[] bytes = value.getData();
+                       buffer = ByteBuffer.wrap(bytes);
+                       buffer.position(bytes.length);
+               }
+       }
+
+       private class ByteSerializer extends Serializer<Byte> {
+               public ByteSerializer() {
+                       super(1);
+               }
+
+               @Override
+               public void serializeInternal(Byte value) {
+                       buffer.put(value);
+               }
+       }
+
+       private class BooleanSerializer extends Serializer<Boolean> {
+               public BooleanSerializer() {
+                       super(1);
+               }
+
+               @Override
+               public void serializeInternal(Boolean value) {
+                       buffer.put(value ? (byte) 1 : (byte) 0);
+               }
+       }
+
+       private class CharSerializer extends Serializer<Character> {
+               public CharSerializer() {
+                       super(4);
+               }
+
+               @Override
+               public void serializeInternal(Character value) {
+                       buffer.put((value + "").getBytes());
+               }
+       }
+
+       private class ShortSerializer extends Serializer<Short> {
+               public ShortSerializer() {
+                       super(2);
+               }
+
+               @Override
+               public void serializeInternal(Short value) {
+                       buffer.putShort(value);
+               }
+       }
+
+       private class IntSerializer extends Serializer<Integer> {
+               public IntSerializer() {
+                       super(4);
+               }
+
+               @Override
+               public void serializeInternal(Integer value) {
+                       buffer.putInt(value);
+               }
+       }
+
+       private class LongSerializer extends Serializer<Long> {
+               public LongSerializer() {
+                       super(8);
+               }
+
+               @Override
+               public void serializeInternal(Long value) {
+                       buffer.putLong(value);
+               }
+       }
+
+       private class StringSerializer extends Serializer<String> {
+               public StringSerializer() {
+                       super(0);
+               }
+
+               @Override
+               public void serializeInternal(String value) {
+                       byte[] bytes = value.getBytes();
+                       buffer = ByteBuffer.allocate(bytes.length + 4);
+                       buffer.putInt(bytes.length);
+                       buffer.put(bytes);
+               }
+       }
+
+       private class FloatSerializer extends Serializer<Float> {
+               public FloatSerializer() {
+                       super(4);
+               }
+
+               @Override
+               public void serializeInternal(Float value) {
+                       buffer.putFloat(value);
+               }
+       }
+
+       private class DoubleSerializer extends Serializer<Double> {
+               public DoubleSerializer() {
+                       super(8);
+               }
+
+               @Override
+               public void serializeInternal(Double value) {
+                       buffer.putDouble(value);
+               }
+       }
+
+       private class NullSerializer extends Serializer<Object> {
+               public NullSerializer() {
+                       super(0);
+               }
+
+               @Override
+               public void serializeInternal(Object value) {
+               }
+       }
+
+       private class BytesSerializer extends Serializer<byte[]> {
+               public BytesSerializer() {
+                       super(0);
+               }
+
+               @Override
+               public void serializeInternal(byte[] value) {
+                       buffer = ByteBuffer.allocate(4 + value.length);
+                       buffer.putInt(value.length);
+                       buffer.put(value);
+               }
+       }
+
+       private class TupleSerializer extends Serializer<Tuple> {
+               private final Serializer[] serializer;
+               private final List<ByteBuffer> buffers;
+
+               public TupleSerializer(Tuple value) throws IOException {
+                       super(0);
+                       serializer = new Serializer[value.getArity()];
+                       buffers = new ArrayList();
+                       for (int x = 0; x < serializer.length; x++) {
+                               serializer[x] = 
getSerializer(value.getField(x));
+                       }
+               }
+
+               @Override
+               public void serializeInternal(Tuple value) {
+                       int length = 0;
+                       for (int x = 0; x < serializer.length; x++) {
+                               serializer[x].buffer.clear();
+                               
serializer[x].serializeInternal(value.getField(x));
+                               length += serializer[x].buffer.position();
+                               buffers.add(serializer[x].buffer);
+                       }
+                       buffer = ByteBuffer.allocate(length);
+                       for (ByteBuffer b : buffers) {
+                               b.flip();
+                               buffer.put(b);
+                       }
+                       buffers.clear();
+               }
+       }
+}

Reply via email to