Repository: flink
Updated Branches:
  refs/heads/master 0ac5d4020 -> c7ada8d78


http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
new file mode 100644
index 0000000..4f5fdae
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonPlanBinder;
+import static org.apache.flink.python.api.PythonPlanBinder.DEBUG;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
+import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
+import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
+import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This streamer is used by functions to send/receive data to/from an external 
python process.
+ */
+public class PythonStreamer implements Serializable {
+       protected static final Logger LOG = 
LoggerFactory.getLogger(PythonStreamer.class);
+       private static final int SIGNAL_BUFFER_REQUEST = 0;
+       private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
+       private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
+       private static final int SIGNAL_FINISHED = -1;
+       private static final int SIGNAL_ERROR = -2;
+       private static final byte SIGNAL_LAST = 32;
+
+       private final int id;
+       private final boolean usePython3;
+       private final boolean debug;
+       private final String planArguments;
+
+       private String inputFilePath;
+       private String outputFilePath;
+
+       private final byte[] buffer = new byte[4];
+
+       private Process process;
+       private Thread shutdownThread;
+       protected ServerSocket server;
+       protected Socket socket;
+       protected InputStream in;
+       protected OutputStream out;
+       protected int port;
+
+       protected PythonSender sender;
+       protected PythonReceiver receiver;
+
+       protected StringBuilder msg = new StringBuilder();
+
+       protected final AbstractRichFunction function;
+
+       public PythonStreamer(AbstractRichFunction function, int id) {
+               this.id = id;
+               this.usePython3 = PythonPlanBinder.usePython3;
+               this.debug = DEBUG;
+               planArguments = PythonPlanBinder.arguments.toString();
+               sender = new PythonSender();
+               receiver = new PythonReceiver();
+               this.function = function;
+       }
+
+       /**
+        * Starts the python script.
+        *
+        * @throws IOException
+        */
+       public void open() throws IOException {
+               server = new ServerSocket(0);
+               startPython();
+       }
+
+       private void startPython() throws IOException {
+               this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
+               this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+
+               sender.open(inputFilePath);
+               receiver.open(outputFilePath);
+
+               String path = 
function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
+               String planPath = path + FLINK_PYTHON_PLAN_NAME;
+
+               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+               try {
+                       Runtime.getRuntime().exec(pythonBinaryPath);
+               } catch (IOException ex) {
+                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
+               }
+
+               if (debug) {
+                       socket.setSoTimeout(0);
+                       LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
+                                       + " Run python " + planPath + 
planArguments);
+               } else {
+                       process = Runtime.getRuntime().exec(pythonBinaryPath + 
" -O -B " + planPath + planArguments);
+                       new StreamPrinter(process.getInputStream()).start();
+                       new StreamPrinter(process.getErrorStream(), true, 
msg).start();
+               }
+
+               shutdownThread = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       destroyProcess();
+                               } catch (IOException ex) {
+                               }
+                       }
+               };
+
+               Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+               OutputStream processOutput = process.getOutputStream();
+               processOutput.write("operator\n".getBytes());
+               processOutput.write(("" + server.getLocalPort() + 
"\n").getBytes());
+               processOutput.write((id + "\n").getBytes());
+               processOutput.write((inputFilePath + "\n").getBytes());
+               processOutput.write((outputFilePath + "\n").getBytes());
+               processOutput.flush();
+
+               try { // wait a bit to catch syntax errors
+                       Thread.sleep(2000);
+               } catch (InterruptedException ex) {
+               }
+               if (!debug) {
+                       try {
+                               process.exitValue();
+                               throw new RuntimeException("External process 
for task " + function.getRuntimeContext().getTaskName() + " terminated 
prematurely." + msg);
+                       } catch (IllegalThreadStateException ise) { //process 
still active -> start receiving data
+                       }
+               }
+
+               socket = server.accept();
+               in = socket.getInputStream();
+               out = socket.getOutputStream();
+       }
+
+       /**
+        * Closes this streamer.
+        *
+        * @throws IOException
+        */
+       public void close() throws IOException {
+               try {
+               socket.close();
+               sender.close();
+               receiver.close();
+               } catch (Exception e) {
+                       LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
+               }
+               if (!debug) {
+                       destroyProcess();
+               }
+               if (shutdownThread != null) {
+                       Runtime.getRuntime().removeShutdownHook(shutdownThread);
+               }
+       }
+
+       private void destroyProcess() throws IOException {
+               try {
+                       process.exitValue();
+               } catch (IllegalThreadStateException ise) { //process still 
active
+                       if 
(process.getClass().getName().equals("java.lang.UNIXProcess")) {
+                               int pid;
+                               try {
+                                       Field f = 
process.getClass().getDeclaredField("pid");
+                                       f.setAccessible(true);
+                                       pid = f.getInt(process);
+                               } catch (Throwable e) {
+                                       process.destroy();
+                                       return;
+                               }
+                               String[] args = new String[]{"kill", "-9", "" + 
pid};
+                               Runtime.getRuntime().exec(args);
+                       } else {
+                               process.destroy();
+                       }
+               }
+       }
+       
+               private void sendWriteNotification(int size, boolean hasNext) 
throws IOException {
+               byte[] tmp = new byte[5];
+               putInt(tmp, 0, size);
+               tmp[4] = hasNext ? 0 : SIGNAL_LAST;
+               out.write(tmp, 0, 5);
+               out.flush();
+       }
+
+       private void sendReadConfirmation() throws IOException {
+               out.write(new byte[1], 0, 1);
+               out.flush();
+       }
+
+       private void checkForError() {
+               if (getInt(buffer, 0) == -2) {
+                       try { //wait before terminating to ensure that the 
complete error message is printed
+                               Thread.sleep(2000);
+                       } catch (InterruptedException ex) {
+                       }
+                       throw new RuntimeException(
+                                       "External process for task " + 
function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg);
+               }
+       }
+
+       /**
+        * Sends all broadcast-variables encoded in the configuration to the 
external process.
+        *
+        * @param config configuration object containing broadcast-variable 
count and names
+        * @throws IOException
+        */
+       public final void sendBroadCastVariables(Configuration config) throws 
IOException {
+               try {
+                       int broadcastCount = 
config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0);
+
+                       String[] names = new String[broadcastCount];
+
+                       for (int x = 0; x < names.length; x++) {
+                               names[x] = 
config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
+                       }
+
+                       in.read(buffer, 0, 4);
+                       checkForError();
+                       int size = sender.sendRecord(broadcastCount);
+                       sendWriteNotification(size, false);
+
+                       for (String name : names) {
+                               Iterator bcv = 
function.getRuntimeContext().getBroadcastVariable(name).iterator();
+
+                               in.read(buffer, 0, 4);
+                               checkForError();
+                               size = sender.sendRecord(name);
+                               sendWriteNotification(size, false);
+
+                               while (bcv.hasNext() || sender.hasRemaining(0)) 
{
+                                       in.read(buffer, 0, 4);
+                                       checkForError();
+                                       size = sender.sendBuffer(bcv, 0);
+                                       sendWriteNotification(size, 
bcv.hasNext() || sender.hasRemaining(0));
+                               }
+                               sender.reset();
+                       }
+               } catch (SocketTimeoutException ste) {
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+               }
+       }
+
+       /**
+        * Sends all values contained in the iterator to the external process 
and collects all results.
+        *
+        * @param i iterator
+        * @param c collector
+        * @throws IOException
+        */
+       public final void streamBufferWithoutGroups(Iterator i, Collector c) 
throws IOException {
+               try {
+                       int size;
+                       if (i.hasNext()) {
+                               while (true) {
+                                       in.read(buffer, 0, 4);
+                                       int sig = getInt(buffer, 0);
+                                       switch (sig) {
+                                               case SIGNAL_BUFFER_REQUEST:
+                                                       if (i.hasNext() || 
sender.hasRemaining(0)) {
+                                                               size = 
sender.sendBuffer(i, 0);
+                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+                                                       } else {
+                                                               throw new 
RuntimeException("External process requested data even though none is 
available.");
+                                                       }
+                                                       break;
+                                               case SIGNAL_FINISHED:
+                                                       return;
+                                               case SIGNAL_ERROR:
+                                                       try { //wait before 
terminating to ensure that the complete error message is printed
+                                                               
Thread.sleep(2000);
+                                                       } catch 
(InterruptedException ex) {
+                                                       }
+                                                       throw new 
RuntimeException(
+                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
+                                               default:
+                                                       
receiver.collectBuffer(c, sig);
+                                                       sendReadConfirmation();
+                                                       break;
+                                       }
+                               }
+                       }
+               } catch (SocketTimeoutException ste) {
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+               }
+       }
+
+       /**
+        * Sends all values contained in both iterators to the external process 
and collects all results.
+        *
+        * @param i1 iterator
+        * @param i2 iterator
+        * @param c collector
+        * @throws IOException
+        */
+       public final void streamBufferWithGroups(Iterator i1, Iterator i2, 
Collector c) throws IOException {
+               try {
+                       int size;
+                       if (i1.hasNext() || i2.hasNext()) {
+                               while (true) {
+                                       in.read(buffer, 0, 4);
+                                       int sig = getInt(buffer, 0);
+                                       switch (sig) {
+                                               case SIGNAL_BUFFER_REQUEST_G0:
+                                                       if (i1.hasNext() || 
sender.hasRemaining(0)) {
+                                                               size = 
sender.sendBuffer(i1, 0);
+                                                               
sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
+                                                       }
+                                                       break;
+                                               case SIGNAL_BUFFER_REQUEST_G1:
+                                                       if (i2.hasNext() || 
sender.hasRemaining(1)) {
+                                                               size = 
sender.sendBuffer(i2, 1);
+                                                               
sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
+                                                       }
+                                                       break;
+                                               case SIGNAL_FINISHED:
+                                                       return;
+                                               case SIGNAL_ERROR:
+                                                       try { //wait before 
terminating to ensure that the complete error message is printed
+                                                               
Thread.sleep(2000);
+                                                       } catch 
(InterruptedException ex) {
+                                                       }
+                                                       throw new 
RuntimeException(
+                                                                       
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
+                                               default:
+                                                       
receiver.collectBuffer(c, sig);
+                                                       sendReadConfirmation();
+                                                       break;
+                                       }
+                               }
+                       }
+               } catch (SocketTimeoutException ste) {
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+               }
+       }
+
+       protected final static int getInt(byte[] array, int offset) {
+               return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 
| (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff);
+       }
+
+       protected final static void putInt(byte[] array, int offset, int value) 
{
+               array[offset] = (byte) (value >> 24);
+               array[offset + 1] = (byte) (value >> 16);
+               array[offset + 2] = (byte) (value >> 8);
+               array[offset + 3] = (byte) (value);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
new file mode 100644
index 0000000..ed02ce4
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanReceiver.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.plan;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import org.apache.flink.api.java.tuple.Tuple;
+import static 
org.apache.flink.python.api.streaming.data.PythonReceiver.createTuple;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
+/**
+ * Instances of this class can be used to receive data from the plan process.
+ */
+public class PythonPlanReceiver implements Serializable {
+       private final DataInputStream input;
+
+       public PythonPlanReceiver(InputStream input) {
+               this.input = new DataInputStream(input);
+       }
+
+       public Object getRecord() throws IOException {
+               return getRecord(false);
+       }
+
+       public Object getRecord(boolean normalized) throws IOException {
+               return receiveField(normalized);
+       }
+
+       private Object receiveField(boolean normalized) throws IOException {
+               byte type = (byte) input.readByte();
+               switch (type) {
+                       case TYPE_TUPLE:
+                               int tupleSize = input.readByte();
+                               Tuple tuple = createTuple(tupleSize);
+                               for (int x = 0; x < tupleSize; x++) {
+                                       
tuple.setField(receiveField(normalized), x);
+                               }
+                               return tuple;
+                       case TYPE_BOOLEAN:
+                               return input.readByte() == 1;
+                       case TYPE_BYTE:
+                               return (byte) input.readByte();
+                       case TYPE_SHORT:
+                               if (normalized) {
+                                       return (int) input.readShort();
+                               } else {
+                                       return input.readShort();
+                               }
+                       case TYPE_INTEGER:
+                               return input.readInt();
+                       case TYPE_LONG:
+                               if (normalized) {
+                                       return new 
Long(input.readLong()).intValue();
+                               } else {
+                                       return input.readLong();
+                               }
+                       case TYPE_FLOAT:
+                               if (normalized) {
+                                       return (double) input.readFloat();
+                               } else {
+                                       return input.readFloat();
+                               }
+                       case TYPE_DOUBLE:
+                               return input.readDouble();
+                       case TYPE_STRING:
+                               int stringSize = input.readInt();
+                               byte[] string = new byte[stringSize];
+                               input.readFully(string);
+                               return new String(string);
+                       case TYPE_BYTES:
+                               int bytessize = input.readInt();
+                               byte[] bytes = new byte[bytessize];
+                               input.readFully(bytes);
+                               return bytes;
+                       case TYPE_NULL:
+                               return null;
+                       default:
+                               int size = input.readInt();
+                               byte[] data = new byte[size];
+                               input.readFully(data);
+                               return new CustomTypeWrapper(type, data);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
new file mode 100644
index 0000000..16a1eba
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanSender.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.plan;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import org.apache.flink.api.java.tuple.Tuple;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING;
+import static 
org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE;
+import org.apache.flink.python.api.types.CustomTypeWrapper;
+
+/**
+ * Instances of this class can be used to send data to the plan process.
+ */
+public class PythonPlanSender implements Serializable {
+       private final DataOutputStream output;
+
+       public PythonPlanSender(OutputStream output) {
+               this.output = new DataOutputStream(output);
+       }
+
+       public void sendRecord(Object record) throws IOException {
+               String className = 
record.getClass().getSimpleName().toUpperCase();
+               if (className.startsWith("TUPLE")) {
+                       className = "TUPLE";
+               }
+               if (className.startsWith("BYTE[]")) {
+                       className = "BYTES";
+               }
+               SupportedTypes type = SupportedTypes.valueOf(className);
+               switch (type) {
+                       case TUPLE:
+                               output.write(TYPE_TUPLE);
+                               int arity = ((Tuple) record).getArity();
+                               output.writeInt(arity);
+                               for (int x = 0; x < arity; x++) {
+                                       sendRecord(((Tuple) 
record).getField(x));
+                               }
+                               return;
+                       case BOOLEAN:
+                               output.write(TYPE_BOOLEAN);
+                               output.write(((Boolean) record) ? (byte) 1 : 
(byte) 0);
+                               return;
+                       case BYTE:
+                               output.write(TYPE_BYTE);
+                               output.write((Byte) record);
+                               return;
+                       case BYTES:
+                               output.write(TYPE_BYTES);
+                               output.write((byte[]) record, 0, ((byte[]) 
record).length);
+                               return;
+                       case CHARACTER:
+                               output.write(TYPE_STRING);
+                               output.writeChars(((Character) record) + "");
+                               return;
+                       case SHORT:
+                               output.write(TYPE_SHORT);
+                               output.writeShort((Short) record);
+                               return;
+                       case INTEGER:
+                               output.write(TYPE_INTEGER);
+                               output.writeInt((Integer) record);
+                               return;
+                       case LONG:
+                               output.write(TYPE_LONG);
+                               output.writeLong((Long) record);
+                               return;
+                       case STRING:
+                               output.write(TYPE_STRING);
+                               output.writeBytes((String) record);
+                               return;
+                       case FLOAT:
+                               output.write(TYPE_FLOAT);
+                               output.writeFloat((Float) record);
+                               return;
+                       case DOUBLE:
+                               output.write(TYPE_DOUBLE);
+                               output.writeDouble((Double) record);
+                               return;
+                       case NULL:
+                               output.write(TYPE_NULL);
+                               return;
+                       case CUSTOMTYPEWRAPPER:
+                               output.write(((CustomTypeWrapper) 
record).getType());
+                               output.write(((CustomTypeWrapper) 
record).getData());
+                               return;
+                       default:
+                               throw new IllegalArgumentException("Unknown 
Type encountered: " + type);
+               }
+       }
+       
+       private enum SupportedTypes {
+               TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, 
FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
new file mode 100644
index 0000000..ecbc7f4
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.plan;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.ServerSocket;
+import java.net.Socket;
+import org.apache.flink.python.api.streaming.util.StreamPrinter;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
+import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
+import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
+
+/**
+ * Generic class to exchange data during the plan phase.
+ */
+public class PythonPlanStreamer implements Serializable {
+       protected PythonPlanSender sender;
+       protected PythonPlanReceiver receiver;
+
+       private Process process;
+       private ServerSocket server;
+       private Socket socket;
+
+       public Object getRecord() throws IOException {
+               return getRecord(false);
+       }
+
+       public Object getRecord(boolean normalize) throws IOException {
+               return receiver.getRecord(normalize);
+       }
+
+       public void sendRecord(Object record) throws IOException {
+               sender.sendRecord(record);
+       }
+
+       public void open(String tmpPath, String args) throws IOException {
+               server = new ServerSocket(0);
+               startPython(tmpPath, args);
+               socket = server.accept();
+               sender = new PythonPlanSender(socket.getOutputStream());
+               receiver = new PythonPlanReceiver(socket.getInputStream());
+       }
+
+       private void startPython(String tmpPath, String args) throws 
IOException {
+               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+
+               try {
+                       Runtime.getRuntime().exec(pythonBinaryPath);
+               } catch (IOException ex) {
+                       throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
+               }
+               process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
tmpPath + FLINK_PYTHON_PLAN_NAME + args);
+
+               new StreamPrinter(process.getInputStream()).start();
+               new StreamPrinter(process.getErrorStream()).start();
+
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException ex) {
+               }
+
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+                       if (value == 0) {
+                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                       }
+               } catch (IllegalThreadStateException ise) {//Process still 
running
+               }
+
+               process.getOutputStream().write("plan\n".getBytes());
+               process.getOutputStream().write((server.getLocalPort() + 
"\n").getBytes());
+               process.getOutputStream().flush();
+       }
+
+       public void close() {
+               try {
+                       process.exitValue();
+               } catch (NullPointerException npe) { //exception occurred 
before process was started
+               } catch (IllegalThreadStateException ise) { //process still 
active
+                       process.destroy();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
new file mode 100644
index 0000000..5ff3572
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/util/StreamPrinter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+/**
+ * Simple utility class to print all contents of an inputstream to stdout.
+ */
+public class StreamPrinter extends Thread {
+       private final BufferedReader reader;
+       private final boolean wrapInException;
+       private StringBuilder msg;
+
+       public StreamPrinter(InputStream stream) {
+               this(stream, false, null);
+       }
+
+       public StreamPrinter(InputStream stream, boolean wrapInException, 
StringBuilder msg) {
+               this.reader = new BufferedReader(new InputStreamReader(stream));
+               this.wrapInException = wrapInException;
+               this.msg = msg;
+       }
+
+       @Override
+       public void run() {
+               String line;
+               try {
+                       if (wrapInException) {
+                               while ((line = reader.readLine()) != null) {
+                                       msg.append("\n" + line);
+                               }
+                       } else {
+                               while ((line = reader.readLine()) != null) {
+                                       System.out.println(line);
+                                       System.out.flush();
+                               }
+                       }
+               } catch (IOException ex) {
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 143cea7..680f495 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -51,33 +51,19 @@ def recv_all(socket, toread):
         return b"".join(bits)
 
 
-class OneWayBusyBufferingMappedFileConnection(object):
-    def __init__(self, output_path):
-        self._output_file = open(output_path, "rb+")
-        if hasattr(mmap, 'MAP_SHARED'):
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-        else:
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE)
-
-        self._out = deque()
-        self._out_size = 0
-
-        self._offset_limit = MAPPED_FILE_SIZE - 1024 * 1024 * 3
+class PureTCPConnection(object):
+    def __init__(self, port):
+        self._socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)
+        self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
     def write(self, msg):
-        self._out.append(msg)
-        self._out_size += len(msg)
-        if self._out_size > self._offset_limit:
-            self._write_buffer()
+        self._socket.send(msg)
 
-    def _write_buffer(self):
-        self._file_output_buffer.seek(1, 0)
-        self._file_output_buffer.write(b"".join(self._out))
-        self._file_output_buffer.seek(0, 0)
-        self._file_output_buffer.write(b'\x01')
+    def read(self, size):
+        return recv_all(self._socket, size)
 
     def close(self):
-        self._file_output_buffer.close()
+        self._socket.close()
 
 
 class BufferingTCPMappedFileConnection(object):

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
index 0e740cf..3425cfa 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Iterator.py
@@ -296,3 +296,43 @@ class StringDeserializer(object):
 class NullDeserializer(object):
     def deserialize(self):
         return None
+
+
+class TypedIterator(object):
+    def __init__(self, con, env):
+        self._connection = con
+        self._env = env
+
+    def next(self):
+        read = self._connection.read
+        type = read(1)
+        if type == Types.TYPE_TUPLE:
+            size = unpack(">i", read(4))[0]
+            return tuple([self.next() for x in range(size)])
+        elif type == Types.TYPE_BYTE:
+            return unpack(">c", read(1))[0]
+        elif type == Types.TYPE_BYTES:
+            size = unpack(">i", read(4))[0]
+            return bytearray(read(size)) if size else bytearray(b"")
+        elif type == Types.TYPE_BOOLEAN:
+            return unpack(">?", read(1))[0]
+        elif type == Types.TYPE_FLOAT:
+            return unpack(">f", read(4))[0]
+        elif type == Types.TYPE_DOUBLE:
+            return unpack(">d", read(8))[0]
+        elif type == Types.TYPE_INTEGER:
+            return unpack(">i", read(4))[0]
+        elif type == Types.TYPE_LONG:
+            return unpack(">q", read(8))[0]
+        elif type == Types.TYPE_STRING:
+            length = unpack(">i", read(4))[0]
+            return read(length).decode("utf-8") if length else ""
+        elif type == Types.TYPE_NULL:
+            return None
+        else:
+            for entry in self._env._types:
+                if type == entry[0]:
+                    return entry[3]()
+            raise Exception("Unable to find deserializer for type ID " + 
str(type))
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 865487d..472592a 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -17,6 +17,7 @@
 
################################################################################
 from flink.connection import Connection
 from flink.connection import Collector
+from flink.connection import Iterator
 from flink.plan.DataSet import DataSet
 from flink.plan.Constants import _Identifier
 from flink.plan.OperationInfo import OperationInfo
@@ -156,12 +157,14 @@ class Environment(object):
         plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
 
         if plan_mode:
-            output_path = sys.stdin.readline().rstrip('\n')
-            self._connection = 
Connection.OneWayBusyBufferingMappedFileConnection(output_path)
+            port = int(sys.stdin.readline().rstrip('\n'))
+            self._connection = Connection.PureTCPConnection(port)
+            self._iterator = Iterator.TypedIterator(self._connection, self)
             self._collector = Collector.TypedCollector(self._connection, self)
             self._send_plan()
-            self._connection._write_buffer()
+            result = self._receive_result()
             self._connection.close()
+            return result
         else:
             import struct
             operator = None
@@ -383,3 +386,16 @@ class Environment(object):
             collect(entry.parent.id)
             collect(entry.other.id)
             collect(entry.name)
+
+    def _receive_result(self):
+        jer = JobExecutionResult()
+        jer._net_runtime = self._iterator.next()
+        return jer
+
+
+class JobExecutionResult:
+    def __init__(self):
+        self._net_runtime = 0
+
+    def get_net_runtime(self):
+        return self._net_runtime

Reply via email to