Repository: flink Updated Branches: refs/heads/master 018f1fee9 -> 0ac5d4020
[FLINK-3165] [py] Windows OS support This closes #1454 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac5d402 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac5d402 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac5d402 Branch: refs/heads/master Commit: 0ac5d4020d9aa441d7c8d439280d4ea2d01bcade Parents: 018f1fe Author: zentol <ches...@apache.org> Authored: Tue Dec 15 09:38:04 2015 +0100 Committer: zentol <s.mo...@web.de> Committed: Tue Jan 19 18:09:36 2016 +0100 ---------------------------------------------------------------------- flink-dist/src/main/flink-bin/bin/pyflink2.bat | 25 +++++++++ flink-dist/src/main/flink-bin/bin/pyflink3.bat | 25 +++++++++ .../flink/python/api/PythonPlanBinder.java | 58 +++++++++++--------- .../python/api/flink/connection/Connection.py | 40 ++++++++++++-- .../python/api/flink/functions/Function.py | 1 + .../flink/python/api/flink/plan/Environment.py | 8 +++ 6 files changed, 125 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink2.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat b/flink-dist/src/main/flink-bin/bin/pyflink2.bat new file mode 100644 index 0000000..9d94a69 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/pyflink2.bat @@ -0,0 +1,25 @@ +::############################################################################### +:: Licensed to the Apache Software Foundation (ASF) under one +:: or more contributor license agreements. See the NOTICE file +:: distributed with this work for additional information +:: regarding copyright ownership. The ASF licenses this file +:: to you under the Apache License, Version 2.0 (the +:: "License"); you may not use this file except in compliance +:: with the License. You may obtain a copy of the License at +:: +:: http://www.apache.org/licenses/LICENSE-2.0 +:: +:: Unless required by applicable law or agreed to in writing, software +:: distributed under the License is distributed on an "AS IS" BASIS, +:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +:: See the License for the specific language governing permissions and +:: limitations under the License. +::############################################################################### + +@echo off +setlocal EnableDelayedExpansion + +SET bin=%~dp0 +SET FLINK_ROOT_DIR=%bin%.. + +%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 2 %* \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-dist/src/main/flink-bin/bin/pyflink3.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat b/flink-dist/src/main/flink-bin/bin/pyflink3.bat new file mode 100644 index 0000000..43c9384 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/pyflink3.bat @@ -0,0 +1,25 @@ +::############################################################################### +:: Licensed to the Apache Software Foundation (ASF) under one +:: or more contributor license agreements. See the NOTICE file +:: distributed with this work for additional information +:: regarding copyright ownership. The ASF licenses this file +:: to you under the Apache License, Version 2.0 (the +:: "License"); you may not use this file except in compliance +:: with the License. You may obtain a copy of the License at +:: +:: http://www.apache.org/licenses/LICENSE-2.0 +:: +:: Unless required by applicable law or agreed to in writing, software +:: distributed under the License is distributed on an "AS IS" BASIS, +:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +:: See the License for the specific language governing permissions and +:: limitations under the License. +::############################################################################### + +@echo off +setlocal EnableDelayedExpansion + +SET bin=%~dp0 +SET FLINK_ROOT_DIR=%bin%.. + +%FLINK_ROOT_DIR%\bin\flink run -v %FLINK_ROOT_DIR%\lib\flink-python*.jar 3 %* \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index a27a589..7c74054 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -12,11 +12,13 @@ */ package org.apache.flink.python.api; +import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.HashMap; +import java.util.Random; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; @@ -63,7 +65,7 @@ public class PythonPlanBinder { public static final String ARGUMENT_PYTHON_3 = "3"; public static final String FLINK_PYTHON_DC_ID = "flink"; - public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py"; + public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py"; public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2"; public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3"; @@ -72,8 +74,10 @@ public class PythonPlanBinder { public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); - private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan"; - private static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python"; + private static final Random r = new Random(); + + private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; + private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR"); private static String FULL_PATH; @@ -84,7 +88,7 @@ public class PythonPlanBinder { public static boolean usePython3 = false; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; - public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + "/flink_data"; + public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; public static boolean DEBUG = false; @@ -102,7 +106,7 @@ public class PythonPlanBinder { */ public static void main(String[] args) throws Exception { if (args.length < 2) { - System.out.println("Usage: ./bin/pyflink<2/3>.sh <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]"); + System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]"); return; } usePython3 = args[0].equals(ARGUMENT_PYTHON_3); @@ -114,9 +118,10 @@ public class PythonPlanBinder { FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); FULL_PATH = FLINK_DIR != null - ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH //command-line - : FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing - + "/src/main/python/org/apache/flink/python/api"; + //command-line + ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH + //testing + : new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString(); } private void runPlan(String[] args) throws Exception { @@ -130,15 +135,16 @@ public class PythonPlanBinder { } try { - prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split)); - startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); + String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); + prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? 1 : split)); + startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); receivePlan(); if (env instanceof LocalEnvironment) { - FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink"; + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; } - distributeFiles(env); + distributeFiles(tmpPath, env); env.execute(); close(); } catch (Exception e) { @@ -156,52 +162,52 @@ public class PythonPlanBinder { * @throws IOException * @throws URISyntaxException */ - private void prepareFiles(String... filePaths) throws IOException, URISyntaxException { + private void prepareFiles(String tempFilePath, String... filePaths) throws IOException, URISyntaxException { //Flink python package - String tempFilePath = FLINK_PYTHON_FILE_PATH; clearPath(tempFilePath); FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false); //plan file - copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME); + copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME); //additional files/folders for (int x = 1; x < filePaths.length; x++) { - copyFile(filePaths[x], null); + copyFile(filePaths[x], tempFilePath, null); } } private static void clearPath(String path) throws IOException, URISyntaxException { - FileSystem fs = FileSystem.get(new URI(path)); + FileSystem fs = FileSystem.get(new Path(path).toUri()); if (fs.exists(new Path(path))) { fs.delete(new Path(path), true); } } - private static void copyFile(String path, String name) throws IOException, URISyntaxException { + private static void copyFile(String path, String target, String name) throws IOException, URISyntaxException { if (path.endsWith("/")) { path = path.substring(0, path.length() - 1); } String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name; - String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier; + String tmpFilePath = target + "/" + identifier; clearPath(tmpFilePath); Path p = new Path(path); FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true); } - private static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException { + private static void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException, URISyntaxException { clearPath(FLINK_HDFS_PATH); - FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true); + FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), true); env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID); - clearPath(FLINK_PYTHON_FILE_PATH); + clearPath(tmpPath); } - private void startPython(String[] args) throws IOException { + private void startPython(String tempPath, String[] args) throws IOException { for (String arg : args) { arguments.append(" ").append(arg); } + String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + r.nextInt(); receiver = new Receiver(null); - receiver.open(FLINK_TMP_DATA_DIR + "/output"); + receiver.open(mappedFilePath); String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; @@ -210,7 +216,7 @@ public class PythonPlanBinder { } catch (IOException ex) { throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); } - process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString()); + process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tempPath + FLINK_PYTHON_PLAN_NAME + arguments.toString()); new StreamPrinter(process.getInputStream()).start(); new StreamPrinter(process.getErrorStream()).start(); @@ -232,7 +238,7 @@ public class PythonPlanBinder { } process.getOutputStream().write("plan\n".getBytes()); - process.getOutputStream().write((FLINK_TMP_DATA_DIR + "/output\n").getBytes()); + process.getOutputStream().write((mappedFilePath + "\n").getBytes()); process.getOutputStream().flush(); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py index 988bf25..143cea7 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py @@ -36,10 +36,28 @@ else: SIGNAL_WAS_LAST = 32 +def recv_all(socket, toread): + initial = socket.recv(toread) + bytes_read = len(initial) + if bytes_read == toread: + return initial + else: + bits = [initial] + toread = toread - bytes_read + while toread: + bit = socket.recv(toread) + bits.append(bit) + toread = toread - len(bit) + return b"".join(bits) + + class OneWayBusyBufferingMappedFileConnection(object): def __init__(self, output_path): self._output_file = open(output_path, "rb+") - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) + if hasattr(mmap, 'MAP_SHARED'): + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) + else: + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE) self._out = deque() self._out_size = 0 @@ -58,13 +76,20 @@ class OneWayBusyBufferingMappedFileConnection(object): self._file_output_buffer.seek(0, 0) self._file_output_buffer.write(b'\x01') + def close(self): + self._file_output_buffer.close() + class BufferingTCPMappedFileConnection(object): def __init__(self, input_file, output_file, port): self._input_file = open(input_file, "rb+") self._output_file = open(output_file, "rb+") - self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ) - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) + if hasattr(mmap, 'MAP_SHARED'): + self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ) + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) + else: + self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_READ) + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE) self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) self._socket.connect((SOCKET.gethostbyname("localhost"), port)) @@ -76,6 +101,9 @@ class BufferingTCPMappedFileConnection(object): self._input_size = 0 self._was_last = False + def close(self): + self._socket.close() + def write(self, msg): length = len(msg) if length > MAPPED_FILE_SIZE: @@ -94,7 +122,7 @@ class BufferingTCPMappedFileConnection(object): self._socket.send(pack(">i", self._out_size)) self._out.clear() self._out_size = 0 - self._socket.recv(1, SOCKET.MSG_WAITALL) + recv_all(self._socket, 1) def read(self, des_size, ignored=None): if self._input_size == self._input_offset: @@ -107,7 +135,7 @@ class BufferingTCPMappedFileConnection(object): self._socket.send(SIGNAL_REQUEST_BUFFER) self._file_input_buffer.seek(0, 0) self._input_offset = 0 - meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) + meta_size = recv_all(self._socket, 5) self._input_size = unpack(">I", meta_size[:4])[0] self._was_last = meta_size[4] == SIGNAL_WAS_LAST self._input = self._file_input_buffer.read(self._input_size) @@ -149,7 +177,7 @@ class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection): self._socket.send(SIGNAL_REQUEST_BUFFER_G0) self._file_input_buffer.seek(0, 0) self._input_offset[group] = 0 - meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) + meta_size = recv_all(self._socket, 5) self._input_size[group] = unpack(">I", meta_size[:4])[0] self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST self._input[group] = self._file_input_buffer.read(self._input_size[group]) http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py index 4bf8b3a..f874a25 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py @@ -59,6 +59,7 @@ class Function(object): def _close(self): self._collector._close() + self._connection.close() def _go(self): self._receive_broadcast_variables() http://git-wip-us.apache.org/repos/asf/flink/blob/0ac5d402/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 169e31b..865487d 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -21,6 +21,7 @@ from flink.plan.DataSet import DataSet from flink.plan.Constants import _Identifier from flink.plan.OperationInfo import OperationInfo from flink.utilities import Switch +import socket as SOCKET import copy import sys from struct import pack @@ -160,6 +161,7 @@ class Environment(object): self._collector = Collector.TypedCollector(self._connection, self) self._send_plan() self._connection._write_buffer() + self._connection.close() else: import struct operator = None @@ -178,6 +180,7 @@ class Environment(object): operator = set.combineop operator._configure(input_path, output_path, port, self) operator._go() + operator._close() sys.stdout.flush() sys.stderr.flush() except: @@ -185,6 +188,11 @@ class Environment(object): sys.stderr.flush() if operator is not None: operator._connection._socket.send(struct.pack(">i", -2)) + else: + socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) + socket.connect((SOCKET.gethostbyname("localhost"), port)) + socket.send(struct.pack(">i", -2)) + socket.close() raise def _optimize_plan(self):