Repository: flink
Updated Branches:
  refs/heads/master 018f1fee9 -> 0ac5d4020


[FLINK-3165] [py] Windows OS support

This closes #1454


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac5d402
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac5d402
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac5d402

Branch: refs/heads/master
Commit: 0ac5d4020d9aa441d7c8d439280d4ea2d01bcade
Parents: 018f1fe
Author: zentol <ches...@apache.org>
Authored: Tue Dec 15 09:38:04 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Jan 19 18:09:36 2016 +0100

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/pyflink2.bat  | 25 +++++++++
 flink-dist/src/main/flink-bin/bin/pyflink3.bat  | 25 +++++++++
 .../flink/python/api/PythonPlanBinder.java      | 58 +++++++++++---------
 .../python/api/flink/connection/Connection.py   | 40 ++++++++++++--
 .../python/api/flink/functions/Function.py      |  1 +
 .../flink/python/api/flink/plan/Environment.py  |  8 +++
 6 files changed, 125 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink2.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat 
b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
new file mode 100644
index 0000000..9d94a69
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 2 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink3.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat 
b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
new file mode 100644
index 0000000..43c9384
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 3 %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a27a589..7c74054 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -12,11 +12,13 @@
  */
 package org.apache.flink.python.api;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Random;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.LocalEnvironment;
@@ -63,7 +65,7 @@ public class PythonPlanBinder {
        public static final String ARGUMENT_PYTHON_3 = "3";
 
        public static final String FLINK_PYTHON_DC_ID = "flink";
-       public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
+       public static final String FLINK_PYTHON_PLAN_NAME = File.separator + 
"plan.py";
 
        public static final String FLINK_PYTHON2_BINARY_KEY = 
"python.binary.python2";
        public static final String FLINK_PYTHON3_BINARY_KEY = 
"python.binary.python3";
@@ -72,8 +74,10 @@ public class PythonPlanBinder {
        public static String FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
        public static String FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 
-       private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + "/flink_plan";
-       private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
"/resources/python";
+       private static final Random r = new Random();
+
+       private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
+       private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
File.separator + "resources" + File.separator + "python";
        private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
        private static String FULL_PATH;
 
@@ -84,7 +88,7 @@ public class PythonPlanBinder {
        public static boolean usePython3 = false;
 
        private static String FLINK_HDFS_PATH = "hdfs:/tmp";
-       public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + "/flink_data";
+       public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
 
        public static boolean DEBUG = false;
 
@@ -102,7 +106,7 @@ public class PythonPlanBinder {
         */
        public static void main(String[] args) throws Exception {
                if (args.length < 2) {
-                       System.out.println("Usage: ./bin/pyflink<2/3>.sh 
<pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ 
<parameterX>]]");
+                       System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] 
<pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ 
<parameterX>]]");
                        return;
                }
                usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
@@ -114,9 +118,10 @@ public class PythonPlanBinder {
                FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
                FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
                FULL_PATH = FLINK_DIR != null
-                               ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH 
//command-line
-                               : 
FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing
-                               + 
"/src/main/python/org/apache/flink/python/api";
+                               //command-line
+                               ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH
+                               //testing
+                               : new 
Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), 
"src/main/python/org/apache/flink/python/api").toString();
        }
 
        private void runPlan(String[] args) throws Exception {
@@ -130,15 +135,16 @@ public class PythonPlanBinder {
                }
 
                try {
-                       prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 
: split));
-                       startPython(Arrays.copyOfRange(args, split == 0 ? 
args.length : split + 1, args.length));
+                       String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
+                       prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split 
== 0 ? 1 : split));
+                       startPython(tmpPath, Arrays.copyOfRange(args, split == 
0 ? args.length : split + 1, args.length));
                        receivePlan();
 
                        if (env instanceof LocalEnvironment) {
-                               FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + "/flink";
+                               FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
                        }
 
-                       distributeFiles(env);
+                       distributeFiles(tmpPath, env);
                        env.execute();
                        close();
                } catch (Exception e) {
@@ -156,52 +162,52 @@ public class PythonPlanBinder {
         * @throws IOException
         * @throws URISyntaxException
         */
-       private void prepareFiles(String... filePaths) throws IOException, 
URISyntaxException {
+       private void prepareFiles(String tempFilePath, String... filePaths) 
throws IOException, URISyntaxException {
                //Flink python package
-               String tempFilePath = FLINK_PYTHON_FILE_PATH;
                clearPath(tempFilePath);
                FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), 
false);
 
                //plan file             
-               copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME);
+               copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
 
                //additional files/folders
                for (int x = 1; x < filePaths.length; x++) {
-                       copyFile(filePaths[x], null);
+                       copyFile(filePaths[x], tempFilePath, null);
                }
        }
 
        private static void clearPath(String path) throws IOException, 
URISyntaxException {
-               FileSystem fs = FileSystem.get(new URI(path));
+               FileSystem fs = FileSystem.get(new Path(path).toUri());
                if (fs.exists(new Path(path))) {
                        fs.delete(new Path(path), true);
                }
        }
 
-       private static void copyFile(String path, String name) throws 
IOException, URISyntaxException {
+       private static void copyFile(String path, String target, String name) 
throws IOException, URISyntaxException {
                if (path.endsWith("/")) {
                        path = path.substring(0, path.length() - 1);
                }
                String identifier = name == null ? 
path.substring(path.lastIndexOf("/")) : name;
-               String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
+               String tmpFilePath = target + "/" + identifier;
                clearPath(tmpFilePath);
                Path p = new Path(path);
                FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
        }
 
-       private static void distributeFiles(ExecutionEnvironment env) throws 
IOException, URISyntaxException {
+       private static void distributeFiles(String tmpPath, 
ExecutionEnvironment env) throws IOException, URISyntaxException {
                clearPath(FLINK_HDFS_PATH);
-               FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new 
Path(FLINK_HDFS_PATH), true);
+               FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), 
true);
                env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
-               clearPath(FLINK_PYTHON_FILE_PATH);
+               clearPath(tmpPath);
        }
 
-       private void startPython(String[] args) throws IOException {
+       private void startPython(String tempPath, String[] args) throws 
IOException {
                for (String arg : args) {
                        arguments.append(" ").append(arg);
                }
+               String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + 
r.nextInt();
                receiver = new Receiver(null);
-               receiver.open(FLINK_TMP_DATA_DIR + "/output");
+               receiver.open(mappedFilePath);
 
                String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
 
@@ -210,7 +216,7 @@ public class PythonPlanBinder {
                } catch (IOException ex) {
                        throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
                }
-               process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString());
+               process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + 
tempPath + FLINK_PYTHON_PLAN_NAME + arguments.toString());
 
                new StreamPrinter(process.getInputStream()).start();
                new StreamPrinter(process.getErrorStream()).start();
@@ -232,7 +238,7 @@ public class PythonPlanBinder {
                }
 
                process.getOutputStream().write("plan\n".getBytes());
-               process.getOutputStream().write((FLINK_TMP_DATA_DIR + 
"/output\n").getBytes());
+               process.getOutputStream().write((mappedFilePath + 
"\n").getBytes());
                process.getOutputStream().flush();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 988bf25..143cea7 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -36,10 +36,28 @@ else:
     SIGNAL_WAS_LAST = 32
 
 
+def recv_all(socket, toread):
+    initial = socket.recv(toread)
+    bytes_read = len(initial)
+    if bytes_read == toread:
+        return initial
+    else:
+        bits = [initial]
+        toread = toread - bytes_read
+        while toread:
+            bit = socket.recv(toread)
+            bits.append(bit)
+            toread = toread - len(bit)
+        return b"".join(bits)
+
+
 class OneWayBusyBufferingMappedFileConnection(object):
     def __init__(self, output_path):
         self._output_file = open(output_path, "rb+")
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        if hasattr(mmap, 'MAP_SHARED'):
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        else:
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE)
 
         self._out = deque()
         self._out_size = 0
@@ -58,13 +76,20 @@ class OneWayBusyBufferingMappedFileConnection(object):
         self._file_output_buffer.seek(0, 0)
         self._file_output_buffer.write(b'\x01')
 
+    def close(self):
+        self._file_output_buffer.close()
+
 
 class BufferingTCPMappedFileConnection(object):
     def __init__(self, input_file, output_file, port):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
-        self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
-        self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        if hasattr(mmap, 'MAP_SHARED'):
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+        else:
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE)
         self._socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)
         self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
@@ -76,6 +101,9 @@ class BufferingTCPMappedFileConnection(object):
         self._input_size = 0
         self._was_last = False
 
+    def close(self):
+        self._socket.close()
+
     def write(self, msg):
         length = len(msg)
         if length > MAPPED_FILE_SIZE:
@@ -94,7 +122,7 @@ class BufferingTCPMappedFileConnection(object):
         self._socket.send(pack(">i", self._out_size))
         self._out.clear()
         self._out_size = 0
-        self._socket.recv(1, SOCKET.MSG_WAITALL)
+        recv_all(self._socket, 1)
 
     def read(self, des_size, ignored=None):
         if self._input_size == self._input_offset:
@@ -107,7 +135,7 @@ class BufferingTCPMappedFileConnection(object):
         self._socket.send(SIGNAL_REQUEST_BUFFER)
         self._file_input_buffer.seek(0, 0)
         self._input_offset = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
+        meta_size = recv_all(self._socket, 5)
         self._input_size = unpack(">I", meta_size[:4])[0]
         self._was_last = meta_size[4] == SIGNAL_WAS_LAST
         self._input = self._file_input_buffer.read(self._input_size)
@@ -149,7 +177,7 @@ class 
TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
             self._socket.send(SIGNAL_REQUEST_BUFFER_G0)
         self._file_input_buffer.seek(0, 0)
         self._input_offset[group] = 0
-        meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL)
+        meta_size = recv_all(self._socket, 5)
         self._input_size[group] = unpack(">I", meta_size[:4])[0]
         self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST
         self._input[group] = 
self._file_input_buffer.read(self._input_size[group])

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index 4bf8b3a..f874a25 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -59,6 +59,7 @@ class Function(object):
 
     def _close(self):
         self._collector._close()
+        self._connection.close()
 
     def _go(self):
         self._receive_broadcast_variables()

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 169e31b..865487d 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -21,6 +21,7 @@ from flink.plan.DataSet import DataSet
 from flink.plan.Constants import _Identifier
 from flink.plan.OperationInfo import OperationInfo
 from flink.utilities import Switch
+import socket as SOCKET
 import copy
 import sys
 from struct import pack
@@ -160,6 +161,7 @@ class Environment(object):
             self._collector = Collector.TypedCollector(self._connection, self)
             self._send_plan()
             self._connection._write_buffer()
+            self._connection.close()
         else:
             import struct
             operator = None
@@ -178,6 +180,7 @@ class Environment(object):
                         operator = set.combineop
                 operator._configure(input_path, output_path, port, self)
                 operator._go()
+                operator._close()
                 sys.stdout.flush()
                 sys.stderr.flush()
             except:
@@ -185,6 +188,11 @@ class Environment(object):
                 sys.stderr.flush()
                 if operator is not None:
                     operator._connection._socket.send(struct.pack(">i", -2))
+                else:
+                    socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)
+                    socket.connect((SOCKET.gethostbyname("localhost"), port))
+                    socket.send(struct.pack(">i", -2))
+                    socket.close()
                 raise
 
     def _optimize_plan(self):

Reply via email to