[Flink-671] Python API additions

Fixes several minor issues
Requesting non-available data throws Exception
Python process shutodwn more reliable
Synchronization done via TCP


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1618e28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1618e28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1618e28

Branch: refs/heads/master
Commit: e1618e287885390c9ea1d4954ea18eeed5783b4a
Parents: 1d669da
Author: zentol <s.mo...@web.de>
Authored: Wed Mar 4 17:20:00 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Apr 21 14:02:54 2015 +0200

----------------------------------------------------------------------
 .../api/java/common/streaming/Streamer.java     |  64 ++++++---------
 .../java/python/streaming/PythonStreamer.java   |  78 +++++++++++++------
 .../languagebinding/api/python/dill/__diff.py   |  18 -----
 .../languagebinding/api/python/dill/__init__.py |  18 -----
 .../languagebinding/api/python/dill/_objects.py |  18 -----
 .../languagebinding/api/python/dill/detect.py   |  18 -----
 .../languagebinding/api/python/dill/dill.py     |  18 -----
 .../languagebinding/api/python/dill/info.py     |  17 ----
 .../languagebinding/api/python/dill/objtypes.py |  18 -----
 .../languagebinding/api/python/dill/pointers.py |  18 -----
 .../languagebinding/api/python/dill/source.py   |  18 -----
 .../languagebinding/api/python/dill/temp.py     |  18 -----
 .../languagebinding/api/python/executor.py      |  30 ++++---
 .../api/python/flink/connection/Connection.py   |  32 ++++----
 .../api/python/flink/connection/Iterator.py     |   4 +-
 .../api/python/flink/connection/__init__.pyc    | Bin 243 -> 0 bytes
 .../flink/functions/GroupReduceFunction.py      |   2 +-
 pom.xml                                         |   2 +
 18 files changed, 108 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
index 1a96e98..8b19425 100644
--- 
a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
+++ 
b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java
@@ -13,10 +13,11 @@
 package org.apache.flink.languagebinding.api.java.common.streaming;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.net.DatagramPacket;
-import java.net.DatagramSocket;
-import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
@@ -41,12 +42,13 @@ public abstract class Streamer implements Serializable {
        private static final byte SIGNAL_LAST = 32;
 
        private final byte[] buffer = new byte[4];
-       private DatagramPacket packet;
-       protected InetAddress host;
 
-       protected DatagramSocket socket;
-       protected int port1;
-       protected int port2;
+       protected ServerSocket server;
+       protected Socket socket;
+       protected InputStream in;
+       protected OutputStream out;
+       protected int port;
+
        protected Sender sender;
        protected Receiver receiver;
 
@@ -61,17 +63,8 @@ public abstract class Streamer implements Serializable {
        }
 
        public void open() throws IOException {
-               host = InetAddress.getByName("localhost");
-               packet = new DatagramPacket(buffer, 0, 4);
-               socket = new DatagramSocket(0, host);
-               socket.setSoTimeout(10000);
-               try {
-                       setupProcess();
-                       setupPorts();
-               } catch (SocketTimeoutException ste) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
-               }
-               socket.setSoTimeout(300000);
+               server = new ServerSocket(0);
+               setupProcess();
        }
 
        /**
@@ -92,30 +85,17 @@ public abstract class Streamer implements Serializable {
                receiver.close();
        }
 
-       /**
-        * Setups the required UDP-ports.The streamer requires two UDP-ports to 
send control-signals to, one each for
-        * reading/writing operations.
-        *
-        * @throws IOException
-        */
-       private void setupPorts() throws IOException, SocketTimeoutException {
-               socket.receive(new DatagramPacket(buffer, 0, 4));
-               checkForError();
-               port1 = getInt(buffer, 0);
-               socket.receive(new DatagramPacket(buffer, 0, 4));
-               checkForError();
-               port2 = getInt(buffer, 0);
-       }
-
        private void sendWriteNotification(int size, boolean hasNext) throws 
IOException {
                byte[] tmp = new byte[5];
                putInt(tmp, 0, size);
                tmp[4] = hasNext ? 0 : SIGNAL_LAST;
-               socket.send(new DatagramPacket(tmp, 0, 5, host, port1));
+               out.write(tmp, 0, 5);
+               out.flush();
        }
 
        private void sendReadConfirmation() throws IOException {
-               socket.send(new DatagramPacket(new byte[1], 0, 1, host, port2));
+               out.write(new byte[1], 0, 1);
+               out.flush();
        }
 
        private void checkForError() {
@@ -145,7 +125,7 @@ public abstract class Streamer implements Serializable {
                                names[x] = 
config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null);
                        }
 
-                       socket.receive(packet);
+                       in.read(buffer, 0, 4);
                        checkForError();
                        int size = sender.sendRecord(broadcastCount);
                        sendWriteNotification(size, false);
@@ -153,13 +133,13 @@ public abstract class Streamer implements Serializable {
                        for (String name : names) {
                                Iterator bcv = 
function.getRuntimeContext().getBroadcastVariable(name).iterator();
 
-                               socket.receive(packet);
+                               in.read(buffer, 0, 4);
                                checkForError();
                                size = sender.sendRecord(name);
                                sendWriteNotification(size, false);
 
                                while (bcv.hasNext() || sender.hasRemaining(0)) 
{
-                                       socket.receive(packet);
+                                       in.read(buffer, 0, 4);
                                        checkForError();
                                        size = sender.sendBuffer(bcv, 0);
                                        sendWriteNotification(size, 
bcv.hasNext() || sender.hasRemaining(0));
@@ -183,13 +163,15 @@ public abstract class Streamer implements Serializable {
                        int size;
                        if (i.hasNext()) {
                                while (true) {
-                                       socket.receive(packet);
+                                       in.read(buffer, 0, 4);
                                        int sig = getInt(buffer, 0);
                                        switch (sig) {
                                                case SIGNAL_BUFFER_REQUEST:
                                                        if (i.hasNext() || 
sender.hasRemaining(0)) {
                                                                size = 
sender.sendBuffer(i, 0);
                                                                
sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
+                                                       } else {
+                                                               throw new 
RuntimeException("External process requested data even though none is 
available.");
                                                        }
                                                        break;
                                                case SIGNAL_FINISHED:
@@ -226,7 +208,7 @@ public abstract class Streamer implements Serializable {
                        int size;
                        if (i1.hasNext() || i2.hasNext()) {
                                while (true) {
-                                       socket.receive(packet);
+                                       in.read(buffer, 0, 4);
                                        int sig = getInt(buffer, 0);
                                        switch (sig) {
                                                case SIGNAL_BUFFER_REQUEST_G0:

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index 835d95b..f636caf 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -13,7 +13,7 @@
 package org.apache.flink.languagebinding.api.java.python.streaming;
 
 import java.io.IOException;
-import java.net.DatagramPacket;
+import java.lang.reflect.Field;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
 import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
@@ -38,6 +38,7 @@ public class PythonStreamer extends Streamer {
        private final int id;
        private final boolean usePython3;
        private final boolean debug;
+       private Thread shutdownThread;
 
        private String inputFilePath;
        private String outputFilePath;
@@ -90,54 +91,62 @@ public class PythonStreamer extends Streamer {
                        } catch (IOException ex) {
                                throw new 
RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " 
does not point to a valid python binary.");
                        }
-                       pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", 
executorPath, "" + socket.getLocalPort());
+                       pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", 
executorPath, "" + server.getLocalPort());
                } else {
                        try {
                                
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
                        } catch (IOException ex) {
                                throw new 
RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " 
does not point to a valid python binary.");
                        }
-                       pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", 
executorPath, "" + socket.getLocalPort());
+                       pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", 
executorPath, "" + server.getLocalPort());
                }
                if (debug) {
                        socket.setSoTimeout(0);
                        LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
-                                       + " Run python /tmp/flink" + 
FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort());
+                                       + " Run python /tmp/flink" + 
FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort());
                } else {
                        process = pb.start();
                        new StreamPrinter(process.getInputStream()).start();
                        new StreamPrinter(process.getErrorStream(), true, 
msg).start();
                }
-               byte[] executorPort = new byte[4];
-               socket.receive(new DatagramPacket(executorPort, 0, 4));
-               int exPort = getInt(executorPort, 0);
-               if (exPort == -2) {
-                       try { //wait before terminating to ensure that the 
complete error message is printed
-                               Thread.sleep(2000);
-                       } catch (InterruptedException ex) {
+
+               shutdownThread = new Thread() {
+                       @Override
+                       public void run() {
+                               try {
+                                       destroyProcess();
+                               } catch (IOException ex) {
+                               }
                        }
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " terminated prematurely." + 
msg);
-               }
+               };
+
+               Runtime.getRuntime().addShutdownHook(shutdownThread);
+
+               socket = server.accept();
+               in = socket.getInputStream();
+               out = socket.getOutputStream();
 
                byte[] opSize = new byte[4];
                putInt(opSize, 0, operator.length);
-               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-               socket.send(new DatagramPacket(operator, 0, operator.length, 
host, exPort));
+               out.write(opSize, 0, 4);
+               out.write(operator, 0, operator.length);
 
                byte[] meta = importString.toString().getBytes("utf-8");
                putInt(opSize, 0, meta.length);
-               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-               socket.send(new DatagramPacket(meta, 0, meta.length, host, 
exPort));
+               out.write(opSize, 0, 4);
+               out.write(meta, 0, meta.length);
 
                byte[] input = inputFilePath.getBytes("utf-8");
                putInt(opSize, 0, input.length);
-               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-               socket.send(new DatagramPacket(input, 0, input.length, host, 
exPort));
+               out.write(opSize, 0, 4);
+               out.write(input, 0, input.length);
 
                byte[] output = outputFilePath.getBytes("utf-8");
                putInt(opSize, 0, output.length);
-               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
-               socket.send(new DatagramPacket(output, 0, output.length, host, 
exPort));
+               out.write(opSize, 0, 4);
+               out.write(output, 0, output.length);
+
+               out.flush();
 
                try { // wait a bit to catch syntax errors
                        Thread.sleep(2000);
@@ -165,9 +174,30 @@ public class PythonStreamer extends Streamer {
                        LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
                }
                if (!debug) {
-                       try {
-                               process.exitValue();
-                       } catch (IllegalThreadStateException ise) { //process 
still active
+                       destroyProcess();
+               }
+               if (shutdownThread != null) {
+                       Runtime.getRuntime().removeShutdownHook(shutdownThread);
+               }
+       }
+
+       private void destroyProcess() throws IOException {
+               try {
+                       process.exitValue();
+               } catch (IllegalThreadStateException ise) { //process still 
active
+                       if 
(process.getClass().getName().equals("java.lang.UNIXProcess")) {
+                               int pid;
+                               try {
+                                       Field f = 
process.getClass().getDeclaredField("pid");
+                                       f.setAccessible(true);
+                                       pid = f.getInt(process);
+                               } catch (Throwable e) {
+                                       process.destroy();
+                                       return;
+                               }
+                               String[] args = new String[]{"kill", "-9", "" + 
pid};
+                               Runtime.getRuntime().exec(args);
+                       } else {
                                process.destroy();
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
index cebdc5d..79301a6 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
index b4cbc34..b03eda9 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
index 457ae1e..b89bc0e 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
index 76357a5..749a573 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
index 238527c..cddb9ca 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 # -*- coding: utf-8 -*-
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
deleted file mode 100644
index 65b48d4..0000000
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/info.py
+++ /dev/null
@@ -1,17 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
index a6f159e..bf0b557 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
index 2cad3be..25714ea 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
index b51c007..b55ca55 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 # #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
index 827efda..9dedb41 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
@@ -1,21 +1,3 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
 #!/usr/bin/env python
 #
 # Author: Mike McKerns (mmckerns @caltech and @uqfoundation)

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
index 9d80c82..2cfb9d3 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
@@ -20,39 +20,35 @@ import socket
 import struct
 #argv[1] = port
 
-
+s = None
 try:
     import dill
     port = int(sys.argv[1])
 
-    s1 = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
-    s1.bind((socket.gethostbyname("localhost"), 0))
-    s1.sendto(struct.pack(">i", s1.getsockname()[1]), 
(socket.gethostbyname("localhost"), port))
+    s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
+    s.connect((socket.gethostbyname("localhost"), port))
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    serialized_operator = s1.recv(size)
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    serialized_operator = s.recv(size, socket.MSG_WAITALL)
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    import_string = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    import_string = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    input_file = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    input_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
-    size = struct.unpack(">i", s1.recv(4))[0]
-    output_file = s1.recv(size).decode("utf-8")
+    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
+    output_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
 
     exec(import_string)
     
     operator = dill.loads(serialized_operator)
-    operator._configure(input_file, output_file, port)
+    operator._configure(input_file, output_file, s)
     operator._go()
     sys.stdout.flush()
     sys.stderr.flush()
 except:
     sys.stdout.flush()
     sys.stderr.flush()
-    s = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
-    s.bind((socket.gethostbyname("localhost"), 0))
-    destination = (socket.gethostbyname("localhost"), int(sys.argv[1]))
-    s.sendto(struct.pack(">i", -2), destination)
+    s.send(struct.pack(">i", -2))
     raise
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index 7d64352..cffe79e 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 import mmap
-import socket
+import socket as SOCKET
 import tempfile
 from struct import pack, unpack
 from collections import deque
@@ -61,18 +61,12 @@ class OneWayBusyBufferingMappedFileConnection(object):
 
 
 class BufferingUDPMappedFileConnection(object):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", 
output_file=tempfile.gettempdir() + "/flink_data/output", port=25000):
+    def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", 
output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
         self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
         self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-        self._socket = socket.socket(family=socket.AF_INET, 
type=socket.SOCK_DGRAM)
-        self._socket.bind((socket.gethostbyname("localhost"), 0))
-        self._socket.settimeout(300)
-        self._destination = (socket.gethostbyname("localhost"), port)
-
-        self._socket.sendto(pack(">I", self._socket.getsockname()[1]), 
self._destination)
-        self._socket.sendto(pack(">I", self._socket.getsockname()[1]), 
self._destination)
+        self._socket = socket
 
         self._out = deque()
         self._out_size = 0
@@ -97,10 +91,10 @@ class BufferingUDPMappedFileConnection(object):
     def _write_buffer(self):
         self._file_output_buffer.seek(0, 0)
         self._file_output_buffer.write(b"".join(self._out))
-        self._socket.sendto(pack(">i", self._out_size), self._destination)
+        self._socket.send(pack(">i", self._out_size))
         self._out.clear()
         self._out_size = 0
-        self._socket.recvfrom(1)
+        self._socket.recv(1, SOCKET.MSG_WAITALL)
 
     def read(self, des_size, ignored=None):
         if self._input_size == self._input_offset:
@@ -110,10 +104,10 @@ class BufferingUDPMappedFileConnection(object):
         return self._input[old_offset:self._input_offset]
 
     def _read_buffer(self):
-        self._socket.sendto(SIGNAL_REQUEST_BUFFER, self._destination)
+        self._socket.send(SIGNAL_REQUEST_BUFFER)
         self._file_input_buffer.seek(0, 0)
         self._input_offset = 0
-        meta_size = self._socket.recvfrom(5)[0]
+        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
         self._input_size = unpack(">I", meta_size[:4])[0]
         self._was_last = meta_size[4] == SIGNAL_WAS_LAST
         self._input = self._file_input_buffer.read(self._input_size)
@@ -121,7 +115,7 @@ class BufferingUDPMappedFileConnection(object):
     def send_end_signal(self):
         if self._out_size:
             self._write_buffer()
-        self._socket.sendto(SIGNAL_FINISHED, self._destination)
+        self._socket.send(SIGNAL_FINISHED)
 
     def has_next(self, ignored=None):
         return not self._was_last or not self._input_size == self._input_offset
@@ -134,8 +128,8 @@ class BufferingUDPMappedFileConnection(object):
 
 
 class TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", 
output_file=tempfile.gettempdir() + "/flink/data/output", port=25000):
-        super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, 
output_file, port)
+    def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", 
output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
+        super(TwinBufferingUDPMappedFileConnection, self).__init__(input_file, 
output_file, socket)
         self._input = ["", ""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]
@@ -150,12 +144,12 @@ class 
TwinBufferingUDPMappedFileConnection(BufferingUDPMappedFileConnection):
 
     def _read_buffer(self, group):
         if group:
-            self._socket.sendto(SIGNAL_REQUEST_BUFFER_G1, self._destination)
+            self._socket.send(SIGNAL_REQUEST_BUFFER_G1)
         else:
-            self._socket.sendto(SIGNAL_REQUEST_BUFFER_G0, self._destination)
+            self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
         self._file_input_buffer.seek(0, 0)
         self._input_offset[group] = 0
-        meta_size = self._socket.recvfrom(5)[0]
+        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
         self._input_size[group] = unpack(">I", meta_size[:4])[0]
         self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
         self._input[group] = 
self._file_input_buffer.read(self._input_size[group])

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
index 7b1c5c5..fb0e26d 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py
@@ -260,7 +260,7 @@ class ByteArrayDeserializer(object):
 
     def deserialize(self):
         size = unpack(">i", self.read(4, self._group))[0]
-        return bytearray(self.read(size, self._group))
+        return bytearray(self.read(size, self._group)) if size else 
bytearray(b"")
 
 
 class BooleanDeserializer(object):
@@ -315,7 +315,7 @@ class StringDeserializer(object):
 
     def deserialize(self):
         length = unpack(">i", self.read(4, self._group))[0]
-        return self.read(length, self._group).decode("utf-8")
+        return self.read(length, self._group).decode("utf-8") if length else ""
 
 
 class NullDeserializer(object):

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
deleted file mode 100644
index 7f3cf94..0000000
Binary files 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.pyc
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
index 35359e2..0ce23ff 100644
--- 
a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/GroupReduceFunction.py
@@ -103,7 +103,7 @@ class GroupReduceFunction(Function.Function):
         keys.sort()
         for key in keys:
             values = grouping[key]
-            for op in self._sort_ops:
+            for op in reversed(self._sort_ops):
                 values.sort(key=lambda x:x[op[0]], reverse = op[1] == 
Order.DESCENDING)
             result = function(Iterator.ListIterator(values), collector)
             if result is not None:

http://git-wip-us.apache.org/repos/asf/flink/blob/e1618e28/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fef5a9e..4232fb7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -717,6 +717,8 @@ under the License.
                                                
<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
                                                
<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv</exclude>
                                                
<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text</exclude>
+                                               <!-- Python -->                 
                        
+                                               
<exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
                                                <!-- Configuration Files. -->
                                                
<exclude>**/flink-bin/conf/slaves</exclude>
                         
<exclude>flink-contrib/docker-flink/flink/conf/slaves</exclude>

Reply via email to