[FLINK-3057] Bidrectional plan connection
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc4d9469 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc4d9469 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc4d9469 Branch: refs/heads/master Commit: fc4d9469d9d3cf17f877b2c49f345ed0001322c1 Parents: 0ac5d40 Author: zentol <ches...@apache.org> Authored: Sun Nov 22 16:23:29 2015 +0100 Committer: zentol <s.mo...@web.de> Committed: Wed Jan 20 06:03:38 2016 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonOperationInfo.java | 126 +++--- .../flink/python/api/PythonPlanBinder.java | 69 +-- .../python/api/functions/PythonCoGroup.java | 2 +- .../api/functions/PythonCombineIdentity.java | 2 +- .../api/functions/PythonMapPartition.java | 2 +- .../python/api/streaming/PythonStreamer.java | 374 ---------------- .../flink/python/api/streaming/Receiver.java | 384 ----------------- .../flink/python/api/streaming/Sender.java | 427 ------------------- .../python/api/streaming/StreamPrinter.java | 55 --- .../api/streaming/data/PythonReceiver.java | 267 ++++++++++++ .../python/api/streaming/data/PythonSender.java | 420 ++++++++++++++++++ .../api/streaming/data/PythonStreamer.java | 375 ++++++++++++++++ .../api/streaming/plan/PythonPlanReceiver.java | 107 +++++ .../api/streaming/plan/PythonPlanSender.java | 116 +++++ .../api/streaming/plan/PythonPlanStreamer.java | 98 +++++ .../api/streaming/util/StreamPrinter.java | 55 +++ .../python/api/flink/connection/Connection.py | 30 +- .../python/api/flink/connection/Iterator.py | 40 ++ .../flink/python/api/flink/plan/Environment.py | 22 +- 19 files changed, 1588 insertions(+), 1383 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 0ccf568..5cf3621 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -21,7 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.python.api.PythonPlanBinder.Operation; -import org.apache.flink.python.api.streaming.Receiver; +import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; public class PythonOperationInfo { public int parentID; //DataSet that an operation is applied on @@ -48,88 +48,88 @@ public class PythonOperationInfo { public boolean toError; public String name; - public PythonOperationInfo(Receiver receiver, Operation identifier) throws IOException { + public PythonOperationInfo(PythonPlanStreamer streamer, Operation identifier) throws IOException { Object tmpType; switch (identifier) { case SOURCE_CSV: - setID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - fieldDelimiter = (String) receiver.getRecord(); - lineDelimiter = (String) receiver.getRecord(); - tmpType = (Tuple) receiver.getRecord(); + setID = (Integer) streamer.getRecord(true); + path = (String) streamer.getRecord(); + fieldDelimiter = (String) streamer.getRecord(); + lineDelimiter = (String) streamer.getRecord(); + tmpType = (Tuple) streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); return; case SOURCE_TEXT: - setID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); + setID = (Integer) streamer.getRecord(true); + path = (String) streamer.getRecord(); return; case SOURCE_VALUE: - setID = (Integer) receiver.getRecord(true); - int valueCount = (Integer) receiver.getRecord(true); + setID = (Integer) streamer.getRecord(true); + int valueCount = (Integer) streamer.getRecord(true); values = new Object[valueCount]; for (int x = 0; x < valueCount; x++) { - values[x] = receiver.getRecord(); + values[x] = streamer.getRecord(); } return; case SOURCE_SEQ: - setID = (Integer) receiver.getRecord(true); - from = (Long) receiver.getRecord(); - to = (Long) receiver.getRecord(); + setID = (Integer) streamer.getRecord(true); + from = (Long) streamer.getRecord(); + to = (Long) streamer.getRecord(); return; case SINK_CSV: - parentID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - fieldDelimiter = (String) receiver.getRecord(); - lineDelimiter = (String) receiver.getRecord(); - writeMode = ((Integer) receiver.getRecord(true)) == 1 + parentID = (Integer) streamer.getRecord(true); + path = (String) streamer.getRecord(); + fieldDelimiter = (String) streamer.getRecord(); + lineDelimiter = (String) streamer.getRecord(); + writeMode = ((Integer) streamer.getRecord(true)) == 1 ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; return; case SINK_TEXT: - parentID = (Integer) receiver.getRecord(true); - path = (String) receiver.getRecord(); - writeMode = ((Integer) receiver.getRecord(true)) == 1 + parentID = (Integer) streamer.getRecord(true); + path = (String) streamer.getRecord(); + writeMode = ((Integer) streamer.getRecord(true)) == 1 ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; return; case SINK_PRINT: - parentID = (Integer) receiver.getRecord(true); - toError = (Boolean) receiver.getRecord(); + parentID = (Integer) streamer.getRecord(true); + toError = (Boolean) streamer.getRecord(); return; case BROADCAST: - parentID = (Integer) receiver.getRecord(true); - otherID = (Integer) receiver.getRecord(true); - name = (String) receiver.getRecord(); + parentID = (Integer) streamer.getRecord(true); + otherID = (Integer) streamer.getRecord(true); + name = (String) streamer.getRecord(); return; } - setID = (Integer) receiver.getRecord(true); - parentID = (Integer) receiver.getRecord(true); + setID = (Integer) streamer.getRecord(true); + parentID = (Integer) streamer.getRecord(true); switch (identifier) { case AGGREGATE: - count = (Integer) receiver.getRecord(true); + count = (Integer) streamer.getRecord(true); aggregates = new AggregationEntry[count]; for (int x = 0; x < count; x++) { - int encodedAgg = (Integer) receiver.getRecord(true); - int field = (Integer) receiver.getRecord(true); + int encodedAgg = (Integer) streamer.getRecord(true); + int field = (Integer) streamer.getRecord(true); aggregates[x] = new AggregationEntry(encodedAgg, field); } return; case FIRST: - count = (Integer) receiver.getRecord(true); + count = (Integer) streamer.getRecord(true); return; case DISTINCT: case GROUPBY: case PARTITION_HASH: - keys = normalizeKeys(receiver.getRecord(true)); + keys = normalizeKeys(streamer.getRecord(true)); return; case PROJECTION: - fields = toIntArray(receiver.getRecord(true)); + fields = toIntArray(streamer.getRecord(true)); return; case REBALANCE: return; case SORT: - field = (Integer) receiver.getRecord(true); - int encodedOrder = (Integer) receiver.getRecord(true); + field = (Integer) streamer.getRecord(true); + int encodedOrder = (Integer) streamer.getRecord(true); switch (encodedOrder) { case 0: order = Order.NONE; @@ -149,62 +149,62 @@ public class PythonOperationInfo { } return; case UNION: - otherID = (Integer) receiver.getRecord(true); + otherID = (Integer) streamer.getRecord(true); return; case COGROUP: - otherID = (Integer) receiver.getRecord(true); - keys1 = normalizeKeys(receiver.getRecord(true)); - keys2 = normalizeKeys(receiver.getRecord(true)); - tmpType = receiver.getRecord(); + otherID = (Integer) streamer.getRecord(true); + keys1 = normalizeKeys(streamer.getRecord(true)); + keys2 = normalizeKeys(streamer.getRecord(true)); + tmpType = streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); - name = (String) receiver.getRecord(); + name = (String) streamer.getRecord(); return; case CROSS: case CROSS_H: case CROSS_T: - otherID = (Integer) receiver.getRecord(true); - tmpType = receiver.getRecord(); + otherID = (Integer) streamer.getRecord(true); + tmpType = streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); - int cProjectCount = (Integer) receiver.getRecord(true); + int cProjectCount = (Integer) streamer.getRecord(true); projections = new ProjectionEntry[cProjectCount]; for (int x = 0; x < cProjectCount; x++) { - String side = (String) receiver.getRecord(); - int[] keys = toIntArray((Tuple) receiver.getRecord(true)); + String side = (String) streamer.getRecord(); + int[] keys = toIntArray((Tuple) streamer.getRecord(true)); projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys); } - name = (String) receiver.getRecord(); + name = (String) streamer.getRecord(); return; case REDUCE: case GROUPREDUCE: - tmpType = receiver.getRecord(); + tmpType = streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); - combine = (Boolean) receiver.getRecord(); - name = (String) receiver.getRecord(); + combine = (Boolean) streamer.getRecord(); + name = (String) streamer.getRecord(); return; case JOIN: case JOIN_H: case JOIN_T: - keys1 = normalizeKeys(receiver.getRecord(true)); - keys2 = normalizeKeys(receiver.getRecord(true)); - otherID = (Integer) receiver.getRecord(true); - tmpType = receiver.getRecord(); + keys1 = normalizeKeys(streamer.getRecord(true)); + keys2 = normalizeKeys(streamer.getRecord(true)); + otherID = (Integer) streamer.getRecord(true); + tmpType = streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); - int jProjectCount = (Integer) receiver.getRecord(true); + int jProjectCount = (Integer) streamer.getRecord(true); projections = new ProjectionEntry[jProjectCount]; for (int x = 0; x < jProjectCount; x++) { - String side = (String) receiver.getRecord(); - int[] keys = toIntArray((Tuple) receiver.getRecord(true)); + String side = (String) streamer.getRecord(); + int[] keys = toIntArray((Tuple) streamer.getRecord(true)); projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys); } - name = (String) receiver.getRecord(); + name = (String) streamer.getRecord(); return; case MAPPARTITION: case FLATMAP: case MAP: case FILTER: - tmpType = receiver.getRecord(); + tmpType = streamer.getRecord(); types = tmpType == null ? null : getForObject(tmpType); - name = (String) receiver.getRecord(); + name = (String) streamer.getRecord(); return; default: throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier); http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 7c74054..d178dcb 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -19,6 +19,7 @@ import java.net.URISyntaxException; import java.util.Arrays; import java.util.HashMap; import java.util.Random; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.LocalEnvironment; @@ -49,8 +50,7 @@ import org.apache.flink.python.api.PythonOperationInfo.ProjectionEntry; import org.apache.flink.python.api.functions.PythonCoGroup; import org.apache.flink.python.api.functions.PythonCombineIdentity; import org.apache.flink.python.api.functions.PythonMapPartition; -import org.apache.flink.python.api.streaming.Receiver; -import org.apache.flink.python.api.streaming.StreamPrinter; +import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer; import org.apache.flink.runtime.filecache.FileCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,15 +76,13 @@ public class PythonPlanBinder { private static final Random r = new Random(); - private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; + public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR"); private static String FULL_PATH; public static StringBuilder arguments = new StringBuilder(); - private Process process; - public static boolean usePython3 = false; private static String FLINK_HDFS_PATH = "hdfs:/tmp"; @@ -94,7 +92,7 @@ public class PythonPlanBinder { private HashMap<Integer, Object> sets = new HashMap(); public ExecutionEnvironment env; - private Receiver receiver; + private PythonPlanStreamer streamer; public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64; @@ -145,7 +143,8 @@ public class PythonPlanBinder { } distributeFiles(tmpPath, env); - env.execute(); + JobExecutionResult jer = env.execute(); + sendResult(jer); close(); } catch (Exception e) { close(); @@ -205,41 +204,13 @@ public class PythonPlanBinder { for (String arg : args) { arguments.append(" ").append(arg); } - String mappedFilePath = FLINK_TMP_DATA_DIR + "/output" + r.nextInt(); - receiver = new Receiver(null); - receiver.open(mappedFilePath); - - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; - - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ex) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + tempPath + FLINK_PYTHON_PLAN_NAME + arguments.toString()); - - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream()).start(); - - try { - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - - try { - int value = process.exitValue(); - if (value != 0) { - throw new RuntimeException("Plan file caused an error. Check log-files for details."); - } - if (value == 0) { - throw new RuntimeException("Plan file exited prematurely without an error."); - } - } catch (IllegalThreadStateException ise) {//Process still running - } + streamer = new PythonPlanStreamer(); + streamer.open(tempPath, arguments.toString()); + } - process.getOutputStream().write("plan\n".getBytes()); - process.getOutputStream().write((mappedFilePath + "\n").getBytes()); - process.getOutputStream().flush(); + private void sendResult(JobExecutionResult jer) throws IOException { + long runtime = jer.getNetRuntime(); + streamer.sendRecord(runtime); } private void close() { @@ -252,18 +223,12 @@ public class PythonPlanBinder { FileSystem local = FileSystem.getLocalFileSystem(); local.delete(new Path(FLINK_PYTHON_FILE_PATH), true); local.delete(new Path(FLINK_TMP_DATA_DIR), true); - receiver.close(); + streamer.close(); } catch (NullPointerException npe) { } catch (IOException ioe) { LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage()); } catch (URISyntaxException use) { // can't occur } - try { - process.exitValue(); - } catch (NullPointerException npe) { //exception occurred before process was started - } catch (IllegalThreadStateException ise) { //process still active - process.destroy(); - } } //====Plan========================================================================================================== @@ -285,7 +250,7 @@ public class PythonPlanBinder { private void receiveParameters() throws IOException { for (int x = 0; x < 4; x++) { - Tuple value = (Tuple) receiver.getRecord(true); + Tuple value = (Tuple) streamer.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { case DOP: Integer dop = (Integer) value.getField(1); @@ -321,9 +286,9 @@ public class PythonPlanBinder { } private void receiveOperations() throws IOException { - Integer operationCount = (Integer) receiver.getRecord(true); + Integer operationCount = (Integer) streamer.getRecord(true); for (int x = 0; x < operationCount; x++) { - String identifier = (String) receiver.getRecord(); + String identifier = (String) streamer.getRecord(); Operation op = null; try { op = Operation.valueOf(identifier.toUpperCase()); @@ -435,7 +400,7 @@ public class PythonPlanBinder { * @throws IOException */ private PythonOperationInfo createOperationInfo(Operation operationIdentifier) throws IOException { - return new PythonOperationInfo(receiver, operationIdentifier); + return new PythonOperationInfo(streamer, operationIdentifier); } private void createCsvSource(PythonOperationInfo info) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java index 4d74878..2349aa9 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java @@ -14,7 +14,7 @@ package org.apache.flink.python.api.functions; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.streaming.PythonStreamer; +import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.util.Collector; import java.io.IOException; import org.apache.flink.api.common.functions.RichCoGroupFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java index 9b3189b..f80d975 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCombineIdentity.java @@ -13,7 +13,7 @@ package org.apache.flink.python.api.functions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.streaming.PythonStreamer; +import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.util.Collector; import java.io.IOException; import org.apache.flink.api.common.functions.RichGroupReduceFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java index 1578e18..50b2cf4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java @@ -17,7 +17,7 @@ import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.streaming.PythonStreamer; +import org.apache.flink.python.api.streaming.data.PythonStreamer; import org.apache.flink.util.Collector; /** http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java deleted file mode 100644 index 9e4f479..0000000 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/PythonStreamer.java +++ /dev/null @@ -1,374 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.python.api.streaming; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.util.Iterator; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.PythonPlanBinder; -import static org.apache.flink.python.api.PythonPlanBinder.DEBUG; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT; -import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This streamer is used by functions to send/receive data to/from an external python process. - */ -public class PythonStreamer implements Serializable { - protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class); - private static final int SIGNAL_BUFFER_REQUEST = 0; - private static final int SIGNAL_BUFFER_REQUEST_G0 = -3; - private static final int SIGNAL_BUFFER_REQUEST_G1 = -4; - private static final int SIGNAL_FINISHED = -1; - private static final int SIGNAL_ERROR = -2; - private static final byte SIGNAL_LAST = 32; - - private final int id; - private final boolean usePython3; - private final boolean debug; - private final String planArguments; - - private String inputFilePath; - private String outputFilePath; - - private final byte[] buffer = new byte[4]; - - private Process process; - private Thread shutdownThread; - protected ServerSocket server; - protected Socket socket; - protected InputStream in; - protected OutputStream out; - protected int port; - - protected Sender sender; - protected Receiver receiver; - - protected StringBuilder msg = new StringBuilder(); - - protected final AbstractRichFunction function; - - public PythonStreamer(AbstractRichFunction function, int id) { - this.id = id; - this.usePython3 = PythonPlanBinder.usePython3; - this.debug = DEBUG; - planArguments = PythonPlanBinder.arguments.toString(); - sender = new Sender(function); - receiver = new Receiver(function); - this.function = function; - } - - /** - * Starts the python script. - * - * @throws IOException - */ - public void open() throws IOException { - server = new ServerSocket(0); - startPython(); - } - - private void startPython() throws IOException { - this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; - this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; - - sender.open(inputFilePath); - receiver.open(outputFilePath); - - String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath(); - String planPath = path + FLINK_PYTHON_PLAN_NAME; - - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; - - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ex) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - - if (debug) { - socket.setSoTimeout(0); - LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName() - + " Run python " + planPath + planArguments); - } else { - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream(), true, msg).start(); - } - - shutdownThread = new Thread() { - @Override - public void run() { - try { - destroyProcess(); - } catch (IOException ex) { - } - } - }; - - Runtime.getRuntime().addShutdownHook(shutdownThread); - - OutputStream processOutput = process.getOutputStream(); - processOutput.write("operator\n".getBytes()); - processOutput.write(("" + server.getLocalPort() + "\n").getBytes()); - processOutput.write((id + "\n").getBytes()); - processOutput.write((inputFilePath + "\n").getBytes()); - processOutput.write((outputFilePath + "\n").getBytes()); - processOutput.flush(); - - try { // wait a bit to catch syntax errors - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - if (!debug) { - try { - process.exitValue(); - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } catch (IllegalThreadStateException ise) { //process still active -> start receiving data - } - } - - socket = server.accept(); - in = socket.getInputStream(); - out = socket.getOutputStream(); - } - - /** - * Closes this streamer. - * - * @throws IOException - */ - public void close() throws IOException { - try { - socket.close(); - sender.close(); - receiver.close(); - } catch (Exception e) { - LOG.error("Exception occurred while closing Streamer. :" + e.getMessage()); - } - if (!debug) { - destroyProcess(); - } - if (shutdownThread != null) { - Runtime.getRuntime().removeShutdownHook(shutdownThread); - } - } - - private void destroyProcess() throws IOException { - try { - process.exitValue(); - } catch (IllegalThreadStateException ise) { //process still active - if (process.getClass().getName().equals("java.lang.UNIXProcess")) { - int pid; - try { - Field f = process.getClass().getDeclaredField("pid"); - f.setAccessible(true); - pid = f.getInt(process); - } catch (Throwable e) { - process.destroy(); - return; - } - String[] args = new String[]{"kill", "-9", "" + pid}; - Runtime.getRuntime().exec(args); - } else { - process.destroy(); - } - } - } - - private void sendWriteNotification(int size, boolean hasNext) throws IOException { - byte[] tmp = new byte[5]; - putInt(tmp, 0, size); - tmp[4] = hasNext ? 0 : SIGNAL_LAST; - out.write(tmp, 0, 5); - out.flush(); - } - - private void sendReadConfirmation() throws IOException { - out.write(new byte[1], 0, 1); - out.flush(); - } - - private void checkForError() { - if (getInt(buffer, 0) == -2) { - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } - } - - /** - * Sends all broadcast-variables encoded in the configuration to the external process. - * - * @param config configuration object containing broadcast-variable count and names - * @throws IOException - */ - public final void sendBroadCastVariables(Configuration config) throws IOException { - try { - int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0); - - String[] names = new String[broadcastCount]; - - for (int x = 0; x < names.length; x++) { - names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null); - } - - in.read(buffer, 0, 4); - checkForError(); - int size = sender.sendRecord(broadcastCount); - sendWriteNotification(size, false); - - for (String name : names) { - Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator(); - - in.read(buffer, 0, 4); - checkForError(); - size = sender.sendRecord(name); - sendWriteNotification(size, false); - - while (bcv.hasNext() || sender.hasRemaining(0)) { - in.read(buffer, 0, 4); - checkForError(); - size = sender.sendBuffer(bcv, 0); - sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0)); - } - sender.reset(); - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - /** - * Sends all values contained in the iterator to the external process and collects all results. - * - * @param i iterator - * @param c collector - * @throws IOException - */ - public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException { - try { - int size; - if (i.hasNext()) { - while (true) { - in.read(buffer, 0, 4); - int sig = getInt(buffer, 0); - switch (sig) { - case SIGNAL_BUFFER_REQUEST: - if (i.hasNext() || sender.hasRemaining(0)) { - size = sender.sendBuffer(i, 0); - sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext()); - } else { - throw new RuntimeException("External process requested data even though none is available."); - } - break; - case SIGNAL_FINISHED: - return; - case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); - default: - receiver.collectBuffer(c, sig); - sendReadConfirmation(); - break; - } - } - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - /** - * Sends all values contained in both iterators to the external process and collects all results. - * - * @param i1 iterator - * @param i2 iterator - * @param c collector - * @throws IOException - */ - public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException { - try { - int size; - if (i1.hasNext() || i2.hasNext()) { - while (true) { - in.read(buffer, 0, 4); - int sig = getInt(buffer, 0); - switch (sig) { - case SIGNAL_BUFFER_REQUEST_G0: - if (i1.hasNext() || sender.hasRemaining(0)) { - size = sender.sendBuffer(i1, 0); - sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext()); - } - break; - case SIGNAL_BUFFER_REQUEST_G1: - if (i2.hasNext() || sender.hasRemaining(1)) { - size = sender.sendBuffer(i2, 1); - sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext()); - } - break; - case SIGNAL_FINISHED: - return; - case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); - default: - receiver.collectBuffer(c, sig); - sendReadConfirmation(); - break; - } - } - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - protected final static int getInt(byte[] array, int offset) { - return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff); - } - - protected final static void putInt(byte[] array, int offset, int value) { - array[offset] = (byte) (value >> 24); - array[offset + 1] = (byte) (value >> 16); - array[offset + 2] = (byte) (value >> 8); - array[offset + 3] = (byte) (value); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java deleted file mode 100644 index a706053..0000000 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Receiver.java +++ /dev/null @@ -1,384 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.python.api.streaming; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; -import static org.apache.flink.python.api.streaming.Sender.TYPE_BOOLEAN; -import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTE; -import static org.apache.flink.python.api.streaming.Sender.TYPE_BYTES; -import static org.apache.flink.python.api.streaming.Sender.TYPE_DOUBLE; -import static org.apache.flink.python.api.streaming.Sender.TYPE_FLOAT; -import static org.apache.flink.python.api.streaming.Sender.TYPE_INTEGER; -import static org.apache.flink.python.api.streaming.Sender.TYPE_LONG; -import static org.apache.flink.python.api.streaming.Sender.TYPE_NULL; -import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT; -import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING; -import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE; -import org.apache.flink.python.api.types.CustomTypeWrapper; -import org.apache.flink.util.Collector; - -/** - * General-purpose class to read data from memory-mapped files. - */ -public class Receiver implements Serializable { - private static final long serialVersionUID = -2474088929850009968L; - - private final AbstractRichFunction function; - - private File inputFile; - private RandomAccessFile inputRAF; - private FileChannel inputChannel; - private MappedByteBuffer fileBuffer; - - private Deserializer<?> deserializer = null; - - public Receiver(AbstractRichFunction function) { - this.function = function; - } - - //=====Setup======================================================================================================== - public void open(String path) throws IOException { - setupMappedFile(path); - } - - private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); - - inputFile = new File(inputFilePath); - if (inputFile.exists()) { - inputFile.delete(); - } - inputFile.createNewFile(); - inputRAF = new RandomAccessFile(inputFilePath, "rw"); - inputRAF.setLength(MAPPED_FILE_SIZE); - inputRAF.seek(MAPPED_FILE_SIZE - 1); - inputRAF.writeByte(0); - inputRAF.seek(0); - inputChannel = inputRAF.getChannel(); - fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); - } - - public void close() throws IOException { - closeMappedFile(); - } - - private void closeMappedFile() throws IOException { - inputChannel.close(); - inputRAF.close(); - } - - //=====Record-API=================================================================================================== - /** - * Loads a buffer from the memory-mapped file. The records contained within the buffer can be accessed using - * collectRecord(). These records do not necessarily have to be of the same type. This method requires external - * synchronization. - * - * @throws IOException - */ - private void loadBuffer() throws IOException { - int count = 0; - while (fileBuffer.get(0) == 0 && count < 10) { - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - } - fileBuffer.load(); - count++; - } - if (fileBuffer.get(0) == 0) { - throw new RuntimeException("External process not responding."); - } - fileBuffer.position(1); - } - - /** - * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or - * similar. The PlanBinder requires a method that can return any kind of object. - * - * @return read record - * @throws IOException - */ - public Object getRecord() throws IOException { - return getRecord(false); - } - - /** - * Returns a record from the buffer. Note: This method cannot be replaced with specific methods like readInt() or - * similar. The PlanBinder requires a method that can return any kind of object. - * - * @param normalized flag indicating whether certain types should be normalized - * @return read record - * @throws IOException - */ - public Object getRecord(boolean normalized) throws IOException { - if (fileBuffer.position() == 0) { - loadBuffer(); - } - return receiveField(normalized); - } - - /** - * Reads a single primitive value or tuple from the buffer. - * - * @return primitive value or tuple - * @throws IOException - */ - private Object receiveField(boolean normalized) throws IOException { - byte type = fileBuffer.get(); - switch (type) { - case TYPE_TUPLE: - int tupleSize = fileBuffer.get(); - Tuple tuple = createTuple(tupleSize); - for (int x = 0; x < tupleSize; x++) { - tuple.setField(receiveField(normalized), x); - } - return tuple; - case TYPE_BOOLEAN: - return fileBuffer.get() == 1; - case TYPE_BYTE: - return fileBuffer.get(); - case TYPE_SHORT: - if (normalized) { - return (int) fileBuffer.getShort(); - } else { - return fileBuffer.getShort(); - } - case TYPE_INTEGER: - return fileBuffer.getInt(); - case TYPE_LONG: - if (normalized) { - return new Long(fileBuffer.getLong()).intValue(); - } else { - return fileBuffer.getLong(); - } - case TYPE_FLOAT: - if (normalized) { - return (double) fileBuffer.getFloat(); - } else { - return fileBuffer.getFloat(); - } - case TYPE_DOUBLE: - return fileBuffer.getDouble(); - case TYPE_STRING: - int stringSize = fileBuffer.getInt(); - byte[] buffer = new byte[stringSize]; - fileBuffer.get(buffer); - return new String(buffer); - case TYPE_BYTES: - int bytessize = fileBuffer.getInt(); - byte[] bytebuffer = new byte[bytessize]; - fileBuffer.get(bytebuffer); - return bytebuffer; - case TYPE_NULL: - return null; - default: - return new CustomTypeDeserializer(type).deserialize(); - } - } - - //=====Buffered-API================================================================================================= - /** - * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method - * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization. - * The user must guarantee that the buffer was completely written before calling this method. - * - * @param c Collector to collect records - * @param bufferSize size of the buffer - * @throws IOException - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void collectBuffer(Collector c, int bufferSize) throws IOException { - fileBuffer.position(0); - - if (deserializer == null) { - byte type = fileBuffer.get(); - deserializer = getDeserializer(type); - } - while (fileBuffer.position() < bufferSize) { - c.collect(deserializer.deserialize()); - } - } - - //=====Deserializer================================================================================================= - private Deserializer<?> getDeserializer(byte type) { - switch (type) { - case TYPE_TUPLE: - return new TupleDeserializer(); - case TYPE_BOOLEAN: - return new BooleanDeserializer(); - case TYPE_BYTE: - return new ByteDeserializer(); - case TYPE_BYTES: - return new BytesDeserializer(); - case TYPE_SHORT: - return new ShortDeserializer(); - case TYPE_INTEGER: - return new IntDeserializer(); - case TYPE_LONG: - return new LongDeserializer(); - case TYPE_STRING: - return new StringDeserializer(); - case TYPE_FLOAT: - return new FloatDeserializer(); - case TYPE_DOUBLE: - return new DoubleDeserializer(); - case TYPE_NULL: - return new NullDeserializer(); - default: - return new CustomTypeDeserializer(type); - - } - } - - private interface Deserializer<T> { - public T deserialize(); - } - - private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> { - private final byte type; - - public CustomTypeDeserializer(byte type) { - this.type = type; - } - - @Override - public CustomTypeWrapper deserialize() { - int size = fileBuffer.getInt(); - byte[] data = new byte[size]; - fileBuffer.get(data); - return new CustomTypeWrapper(type, data); - } - } - - private class BooleanDeserializer implements Deserializer<Boolean> { - @Override - public Boolean deserialize() { - return fileBuffer.get() == 1; - } - } - - private class ByteDeserializer implements Deserializer<Byte> { - @Override - public Byte deserialize() { - return fileBuffer.get(); - } - } - - private class ShortDeserializer implements Deserializer<Short> { - @Override - public Short deserialize() { - return fileBuffer.getShort(); - } - } - - private class IntDeserializer implements Deserializer<Integer> { - @Override - public Integer deserialize() { - return fileBuffer.getInt(); - } - } - - private class LongDeserializer implements Deserializer<Long> { - @Override - public Long deserialize() { - return fileBuffer.getLong(); - } - } - - private class FloatDeserializer implements Deserializer<Float> { - @Override - public Float deserialize() { - return fileBuffer.getFloat(); - } - } - - private class DoubleDeserializer implements Deserializer<Double> { - @Override - public Double deserialize() { - return fileBuffer.getDouble(); - } - } - - private class StringDeserializer implements Deserializer<String> { - private int size; - - @Override - public String deserialize() { - size = fileBuffer.getInt(); - byte[] buffer = new byte[size]; - fileBuffer.get(buffer); - return new String(buffer); - } - } - - private class NullDeserializer implements Deserializer<Object> { - @Override - public Object deserialize() { - return null; - } - } - - private class BytesDeserializer implements Deserializer<byte[]> { - @Override - public byte[] deserialize() { - int length = fileBuffer.getInt(); - byte[] result = new byte[length]; - fileBuffer.get(result); - return result; - } - - } - - private class TupleDeserializer implements Deserializer<Tuple> { - Deserializer<?>[] deserializer = null; - Tuple reuse; - - public TupleDeserializer() { - int size = fileBuffer.getInt(); - reuse = createTuple(size); - deserializer = new Deserializer[size]; - for (int x = 0; x < deserializer.length; x++) { - deserializer[x] = getDeserializer(fileBuffer.get()); - } - } - - @Override - public Tuple deserialize() { - for (int x = 0; x < deserializer.length; x++) { - reuse.setField(deserializer[x].deserialize(), x); - } - return reuse; - } - } - - public static Tuple createTuple(int size) { - try { - return Tuple.getTupleClass(size).newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java deleted file mode 100644 index 2db1441..0000000 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/Sender.java +++ /dev/null @@ -1,427 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.python.api.streaming; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.io.Serializable; -import java.nio.ByteBuffer; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; -import org.apache.flink.python.api.types.CustomTypeWrapper; - -/** - * General-purpose class to write data to memory-mapped files. - */ -public class Sender implements Serializable { - public static final byte TYPE_TUPLE = (byte) 11; - public static final byte TYPE_BOOLEAN = (byte) 10; - public static final byte TYPE_BYTE = (byte) 9; - public static final byte TYPE_SHORT = (byte) 8; - public static final byte TYPE_INTEGER = (byte) 7; - public static final byte TYPE_LONG = (byte) 6; - public static final byte TYPE_DOUBLE = (byte) 4; - public static final byte TYPE_FLOAT = (byte) 5; - public static final byte TYPE_CHAR = (byte) 3; - public static final byte TYPE_STRING = (byte) 2; - public static final byte TYPE_BYTES = (byte) 1; - public static final byte TYPE_NULL = (byte) 0; - - private final AbstractRichFunction function; - - private File outputFile; - private RandomAccessFile outputRAF; - private FileChannel outputChannel; - private MappedByteBuffer fileBuffer; - - private final ByteBuffer[] saved = new ByteBuffer[2]; - - private final Serializer[] serializer = new Serializer[2]; - - public Sender(AbstractRichFunction function) { - this.function = function; - } - - //=====Setup======================================================================================================== - public void open(String path) throws IOException { - setupMappedFile(path); - } - - private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); - - outputFile = new File(outputFilePath); - if (outputFile.exists()) { - outputFile.delete(); - } - outputFile.createNewFile(); - outputRAF = new RandomAccessFile(outputFilePath, "rw"); - outputRAF.setLength(MAPPED_FILE_SIZE); - outputRAF.seek(MAPPED_FILE_SIZE - 1); - outputRAF.writeByte(0); - outputRAF.seek(0); - outputChannel = outputRAF.getChannel(); - fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); - } - - public void close() throws IOException { - closeMappedFile(); - } - - private void closeMappedFile() throws IOException { - outputChannel.close(); - outputRAF.close(); - } - - /** - * Resets this object to the post-configuration state. - */ - public void reset() { - serializer[0] = null; - serializer[1] = null; - fileBuffer.clear(); - } - - //=====Serialization================================================================================================ - /** - * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user - * must guarantee that the file may be written to before calling this method. This method essentially reserves the - * whole buffer for one record. As such it imposes some performance restrictions and should only be used when - * absolutely necessary. - * - * @param value record to send - * @return size of the written buffer - * @throws IOException - */ - public int sendRecord(Object value) throws IOException { - fileBuffer.clear(); - int group = 0; - - serializer[group] = getSerializer(value); - ByteBuffer bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - fileBuffer.put(bb); - - int size = fileBuffer.position(); - - reset(); - return size; - } - - public boolean hasRemaining(int group) { - return saved[group] != null; - } - - /** - * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values - * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must - * guarantee that the file may be written to before calling this method. - * - * @param i iterator containing records - * @param group group to which the iterator belongs, most notably used by CoGroup-functions. - * @return size of the written buffer - * @throws IOException - */ - public int sendBuffer(Iterator i, int group) throws IOException { - fileBuffer.clear(); - - Object value; - ByteBuffer bb; - if (serializer[group] == null) { - value = i.next(); - serializer[group] = getSerializer(value); - bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - fileBuffer.put(bb); - - } - if (saved[group] != null) { - fileBuffer.put(saved[group]); - saved[group] = null; - } - while (i.hasNext() && saved[group] == null) { - value = i.next(); - bb = serializer[group].serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { - throw new RuntimeException("Serialized object does not fit into a single buffer."); - } - if (bb.remaining() <= fileBuffer.remaining()) { - fileBuffer.put(bb); - } else { - saved[group] = bb; - } - } - - int size = fileBuffer.position(); - return size; - } - - private enum SupportedTypes { - TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER - } - - //=====Serializer=================================================================================================== - private Serializer getSerializer(Object value) throws IOException { - String className = value.getClass().getSimpleName().toUpperCase(); - if (className.startsWith("TUPLE")) { - className = "TUPLE"; - } - if (className.startsWith("BYTE[]")) { - className = "BYTES"; - } - SupportedTypes type = SupportedTypes.valueOf(className); - switch (type) { - case TUPLE: - fileBuffer.put(TYPE_TUPLE); - fileBuffer.putInt(((Tuple) value).getArity()); - return new TupleSerializer((Tuple) value); - case BOOLEAN: - fileBuffer.put(TYPE_BOOLEAN); - return new BooleanSerializer(); - case BYTE: - fileBuffer.put(TYPE_BYTE); - return new ByteSerializer(); - case BYTES: - fileBuffer.put(TYPE_BYTES); - return new BytesSerializer(); - case CHARACTER: - fileBuffer.put(TYPE_CHAR); - return new CharSerializer(); - case SHORT: - fileBuffer.put(TYPE_SHORT); - return new ShortSerializer(); - case INTEGER: - fileBuffer.put(TYPE_INTEGER); - return new IntSerializer(); - case LONG: - fileBuffer.put(TYPE_LONG); - return new LongSerializer(); - case STRING: - fileBuffer.put(TYPE_STRING); - return new StringSerializer(); - case FLOAT: - fileBuffer.put(TYPE_FLOAT); - return new FloatSerializer(); - case DOUBLE: - fileBuffer.put(TYPE_DOUBLE); - return new DoubleSerializer(); - case NULL: - fileBuffer.put(TYPE_NULL); - return new NullSerializer(); - case CUSTOMTYPEWRAPPER: - fileBuffer.put(((CustomTypeWrapper) value).getType()); - return new CustomTypeSerializer(); - default: - throw new IllegalArgumentException("Unknown Type encountered: " + type); - } - } - - private abstract class Serializer<T> { - protected ByteBuffer buffer; - - public Serializer(int capacity) { - buffer = ByteBuffer.allocate(capacity); - } - - public ByteBuffer serialize(T value) { - buffer.clear(); - serializeInternal(value); - buffer.flip(); - return buffer; - } - - public abstract void serializeInternal(T value); - } - - private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> { - public CustomTypeSerializer() { - super(0); - } - @Override - public void serializeInternal(CustomTypeWrapper value) { - byte[] bytes = value.getData(); - buffer = ByteBuffer.wrap(bytes); - buffer.position(bytes.length); - } - } - - private class ByteSerializer extends Serializer<Byte> { - public ByteSerializer() { - super(1); - } - - @Override - public void serializeInternal(Byte value) { - buffer.put(value); - } - } - - private class BooleanSerializer extends Serializer<Boolean> { - public BooleanSerializer() { - super(1); - } - - @Override - public void serializeInternal(Boolean value) { - buffer.put(value ? (byte) 1 : (byte) 0); - } - } - - private class CharSerializer extends Serializer<Character> { - public CharSerializer() { - super(4); - } - - @Override - public void serializeInternal(Character value) { - buffer.put((value + "").getBytes()); - } - } - - private class ShortSerializer extends Serializer<Short> { - public ShortSerializer() { - super(2); - } - - @Override - public void serializeInternal(Short value) { - buffer.putShort(value); - } - } - - private class IntSerializer extends Serializer<Integer> { - public IntSerializer() { - super(4); - } - - @Override - public void serializeInternal(Integer value) { - buffer.putInt(value); - } - } - - private class LongSerializer extends Serializer<Long> { - public LongSerializer() { - super(8); - } - - @Override - public void serializeInternal(Long value) { - buffer.putLong(value); - } - } - - private class StringSerializer extends Serializer<String> { - public StringSerializer() { - super(0); - } - - @Override - public void serializeInternal(String value) { - byte[] bytes = value.getBytes(); - buffer = ByteBuffer.allocate(bytes.length + 4); - buffer.putInt(bytes.length); - buffer.put(bytes); - } - } - - private class FloatSerializer extends Serializer<Float> { - public FloatSerializer() { - super(4); - } - - @Override - public void serializeInternal(Float value) { - buffer.putFloat(value); - } - } - - private class DoubleSerializer extends Serializer<Double> { - public DoubleSerializer() { - super(8); - } - - @Override - public void serializeInternal(Double value) { - buffer.putDouble(value); - } - } - - private class NullSerializer extends Serializer<Object> { - public NullSerializer() { - super(0); - } - - @Override - public void serializeInternal(Object value) { - } - } - - private class BytesSerializer extends Serializer<byte[]> { - public BytesSerializer() { - super(0); - } - - @Override - public void serializeInternal(byte[] value) { - buffer = ByteBuffer.allocate(4 + value.length); - buffer.putInt(value.length); - buffer.put(value); - } - } - - private class TupleSerializer extends Serializer<Tuple> { - private final Serializer[] serializer; - private final List<ByteBuffer> buffers; - - public TupleSerializer(Tuple value) throws IOException { - super(0); - serializer = new Serializer[value.getArity()]; - buffers = new ArrayList(); - for (int x = 0; x < serializer.length; x++) { - serializer[x] = getSerializer(value.getField(x)); - } - } - - @Override - public void serializeInternal(Tuple value) { - int length = 0; - for (int x = 0; x < serializer.length; x++) { - serializer[x].buffer.clear(); - serializer[x].serializeInternal(value.getField(x)); - length += serializer[x].buffer.position(); - buffers.add(serializer[x].buffer); - } - buffer = ByteBuffer.allocate(length); - for (ByteBuffer b : buffers) { - b.flip(); - buffer.put(b); - } - buffers.clear(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java deleted file mode 100644 index e42364b..0000000 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/StreamPrinter.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.python.api.streaming; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; - -/** - * Simple utility class to print all contents of an inputstream to stdout. - */ -public class StreamPrinter extends Thread { - private final BufferedReader reader; - private final boolean wrapInException; - private StringBuilder msg; - - public StreamPrinter(InputStream stream) { - this(stream, false, null); - } - - public StreamPrinter(InputStream stream, boolean wrapInException, StringBuilder msg) { - this.reader = new BufferedReader(new InputStreamReader(stream)); - this.wrapInException = wrapInException; - this.msg = msg; - } - - @Override - public void run() { - String line; - try { - if (wrapInException) { - while ((line = reader.readLine()) != null) { - msg.append("\n" + line); - } - } else { - while ((line = reader.readLine()) != null) { - System.out.println(line); - System.out.flush(); - } - } - } catch (IOException ex) { - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java new file mode 100644 index 0000000..9ed047c --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.data; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.Serializable; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import org.apache.flink.api.java.tuple.Tuple; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; +import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BOOLEAN; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_BYTES; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_DOUBLE; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_FLOAT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_INTEGER; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_LONG; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_NULL; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_SHORT; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_STRING; +import static org.apache.flink.python.api.streaming.data.PythonSender.TYPE_TUPLE; +import org.apache.flink.python.api.types.CustomTypeWrapper; +import org.apache.flink.util.Collector; + +/** + * General-purpose class to read data from memory-mapped files. + */ +public class PythonReceiver implements Serializable { + private static final long serialVersionUID = -2474088929850009968L; + + private File inputFile; + private RandomAccessFile inputRAF; + private FileChannel inputChannel; + private MappedByteBuffer fileBuffer; + + private Deserializer<?> deserializer = null; + + //=====Setup======================================================================================================== + public void open(String path) throws IOException { + setupMappedFile(path); + } + + private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException { + File x = new File(FLINK_TMP_DATA_DIR); + x.mkdirs(); + + inputFile = new File(inputFilePath); + if (inputFile.exists()) { + inputFile.delete(); + } + inputFile.createNewFile(); + inputRAF = new RandomAccessFile(inputFilePath, "rw"); + inputRAF.setLength(MAPPED_FILE_SIZE); + inputRAF.seek(MAPPED_FILE_SIZE - 1); + inputRAF.writeByte(0); + inputRAF.seek(0); + inputChannel = inputRAF.getChannel(); + fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); + } + + public void close() throws IOException { + closeMappedFile(); + } + + private void closeMappedFile() throws IOException { + inputChannel.close(); + inputRAF.close(); + } + + /** + * Reads a buffer of the given size from the memory-mapped file, and collects all records contained. This method + * assumes that all values in the buffer are of the same type. This method does NOT take care of synchronization. + * The user must guarantee that the buffer was completely written before calling this method. + * + * @param c Collector to collect records + * @param bufferSize size of the buffer + * @throws IOException + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void collectBuffer(Collector c, int bufferSize) throws IOException { + fileBuffer.position(0); + + if (deserializer == null) { + byte type = fileBuffer.get(); + deserializer = getDeserializer(type); + } + while (fileBuffer.position() < bufferSize) { + c.collect(deserializer.deserialize()); + } + } + + //=====Deserializer================================================================================================= + private Deserializer<?> getDeserializer(byte type) { + switch (type) { + case TYPE_TUPLE: + return new TupleDeserializer(); + case TYPE_BOOLEAN: + return new BooleanDeserializer(); + case TYPE_BYTE: + return new ByteDeserializer(); + case TYPE_BYTES: + return new BytesDeserializer(); + case TYPE_SHORT: + return new ShortDeserializer(); + case TYPE_INTEGER: + return new IntDeserializer(); + case TYPE_LONG: + return new LongDeserializer(); + case TYPE_STRING: + return new StringDeserializer(); + case TYPE_FLOAT: + return new FloatDeserializer(); + case TYPE_DOUBLE: + return new DoubleDeserializer(); + case TYPE_NULL: + return new NullDeserializer(); + default: + return new CustomTypeDeserializer(type); + + } + } + + private interface Deserializer<T> { + public T deserialize(); + } + + private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> { + private final byte type; + + public CustomTypeDeserializer(byte type) { + this.type = type; + } + + @Override + public CustomTypeWrapper deserialize() { + int size = fileBuffer.getInt(); + byte[] data = new byte[size]; + fileBuffer.get(data); + return new CustomTypeWrapper(type, data); + } + } + + private class BooleanDeserializer implements Deserializer<Boolean> { + @Override + public Boolean deserialize() { + return fileBuffer.get() == 1; + } + } + + private class ByteDeserializer implements Deserializer<Byte> { + @Override + public Byte deserialize() { + return fileBuffer.get(); + } + } + + private class ShortDeserializer implements Deserializer<Short> { + @Override + public Short deserialize() { + return fileBuffer.getShort(); + } + } + + private class IntDeserializer implements Deserializer<Integer> { + @Override + public Integer deserialize() { + return fileBuffer.getInt(); + } + } + + private class LongDeserializer implements Deserializer<Long> { + @Override + public Long deserialize() { + return fileBuffer.getLong(); + } + } + + private class FloatDeserializer implements Deserializer<Float> { + @Override + public Float deserialize() { + return fileBuffer.getFloat(); + } + } + + private class DoubleDeserializer implements Deserializer<Double> { + @Override + public Double deserialize() { + return fileBuffer.getDouble(); + } + } + + private class StringDeserializer implements Deserializer<String> { + private int size; + + @Override + public String deserialize() { + size = fileBuffer.getInt(); + byte[] buffer = new byte[size]; + fileBuffer.get(buffer); + return new String(buffer); + } + } + + private class NullDeserializer implements Deserializer<Object> { + @Override + public Object deserialize() { + return null; + } + } + + private class BytesDeserializer implements Deserializer<byte[]> { + @Override + public byte[] deserialize() { + int length = fileBuffer.getInt(); + byte[] result = new byte[length]; + fileBuffer.get(result); + return result; + } + + } + + private class TupleDeserializer implements Deserializer<Tuple> { + Deserializer<?>[] deserializer = null; + Tuple reuse; + + public TupleDeserializer() { + int size = fileBuffer.getInt(); + reuse = createTuple(size); + deserializer = new Deserializer[size]; + for (int x = 0; x < deserializer.length; x++) { + deserializer[x] = getDeserializer(fileBuffer.get()); + } + } + + @Override + public Tuple deserialize() { + for (int x = 0; x < deserializer.length; x++) { + reuse.setField(deserializer[x].deserialize(), x); + } + return reuse; + } + } + + public static Tuple createTuple(int size) { + try { + return Tuple.getTupleClass(size).newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/fc4d9469/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java new file mode 100644 index 0000000..1d17243 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.flink.python.api.streaming.data; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.flink.api.java.tuple.Tuple; +import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; +import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; +import org.apache.flink.python.api.types.CustomTypeWrapper; + +/** + * General-purpose class to write data to memory-mapped files. + */ +public class PythonSender implements Serializable { + public static final byte TYPE_TUPLE = (byte) 11; + public static final byte TYPE_BOOLEAN = (byte) 10; + public static final byte TYPE_BYTE = (byte) 9; + public static final byte TYPE_SHORT = (byte) 8; + public static final byte TYPE_INTEGER = (byte) 7; + public static final byte TYPE_LONG = (byte) 6; + public static final byte TYPE_DOUBLE = (byte) 4; + public static final byte TYPE_FLOAT = (byte) 5; + public static final byte TYPE_CHAR = (byte) 3; + public static final byte TYPE_STRING = (byte) 2; + public static final byte TYPE_BYTES = (byte) 1; + public static final byte TYPE_NULL = (byte) 0; + + private File outputFile; + private RandomAccessFile outputRAF; + private FileChannel outputChannel; + private MappedByteBuffer fileBuffer; + + private final ByteBuffer[] saved = new ByteBuffer[2]; + + private final Serializer[] serializer = new Serializer[2]; + + //=====Setup======================================================================================================== + public void open(String path) throws IOException { + setupMappedFile(path); + } + + private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException { + File x = new File(FLINK_TMP_DATA_DIR); + x.mkdirs(); + + outputFile = new File(outputFilePath); + if (outputFile.exists()) { + outputFile.delete(); + } + outputFile.createNewFile(); + outputRAF = new RandomAccessFile(outputFilePath, "rw"); + outputRAF.setLength(MAPPED_FILE_SIZE); + outputRAF.seek(MAPPED_FILE_SIZE - 1); + outputRAF.writeByte(0); + outputRAF.seek(0); + outputChannel = outputRAF.getChannel(); + fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); + } + + public void close() throws IOException { + closeMappedFile(); + } + + private void closeMappedFile() throws IOException { + outputChannel.close(); + outputRAF.close(); + } + + /** + * Resets this object to the post-configuration state. + */ + public void reset() { + serializer[0] = null; + serializer[1] = null; + fileBuffer.clear(); + } + + //=====Serialization================================================================================================ + /** + * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization. The user + * must guarantee that the file may be written to before calling this method. This method essentially reserves the + * whole buffer for one record. As such it imposes some performance restrictions and should only be used when + * absolutely necessary. + * + * @param value record to send + * @return size of the written buffer + * @throws IOException + */ + public int sendRecord(Object value) throws IOException { + fileBuffer.clear(); + int group = 0; + + serializer[group] = getSerializer(value); + ByteBuffer bb = serializer[group].serialize(value); + if (bb.remaining() > MAPPED_FILE_SIZE) { + throw new RuntimeException("Serialized object does not fit into a single buffer."); + } + fileBuffer.put(bb); + + int size = fileBuffer.position(); + + reset(); + return size; + } + + public boolean hasRemaining(int group) { + return saved[group] != null; + } + + /** + * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values + * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must + * guarantee that the file may be written to before calling this method. + * + * @param i iterator containing records + * @param group group to which the iterator belongs, most notably used by CoGroup-functions. + * @return size of the written buffer + * @throws IOException + */ + public int sendBuffer(Iterator i, int group) throws IOException { + fileBuffer.clear(); + + Object value; + ByteBuffer bb; + if (serializer[group] == null) { + value = i.next(); + serializer[group] = getSerializer(value); + bb = serializer[group].serialize(value); + if (bb.remaining() > MAPPED_FILE_SIZE) { + throw new RuntimeException("Serialized object does not fit into a single buffer."); + } + fileBuffer.put(bb); + + } + if (saved[group] != null) { + fileBuffer.put(saved[group]); + saved[group] = null; + } + while (i.hasNext() && saved[group] == null) { + value = i.next(); + bb = serializer[group].serialize(value); + if (bb.remaining() > MAPPED_FILE_SIZE) { + throw new RuntimeException("Serialized object does not fit into a single buffer."); + } + if (bb.remaining() <= fileBuffer.remaining()) { + fileBuffer.put(bb); + } else { + saved[group] = bb; + } + } + + int size = fileBuffer.position(); + return size; + } + + private enum SupportedTypes { + TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER + } + + //=====Serializer=================================================================================================== + private Serializer getSerializer(Object value) throws IOException { + String className = value.getClass().getSimpleName().toUpperCase(); + if (className.startsWith("TUPLE")) { + className = "TUPLE"; + } + if (className.startsWith("BYTE[]")) { + className = "BYTES"; + } + SupportedTypes type = SupportedTypes.valueOf(className); + switch (type) { + case TUPLE: + fileBuffer.put(TYPE_TUPLE); + fileBuffer.putInt(((Tuple) value).getArity()); + return new TupleSerializer((Tuple) value); + case BOOLEAN: + fileBuffer.put(TYPE_BOOLEAN); + return new BooleanSerializer(); + case BYTE: + fileBuffer.put(TYPE_BYTE); + return new ByteSerializer(); + case BYTES: + fileBuffer.put(TYPE_BYTES); + return new BytesSerializer(); + case CHARACTER: + fileBuffer.put(TYPE_CHAR); + return new CharSerializer(); + case SHORT: + fileBuffer.put(TYPE_SHORT); + return new ShortSerializer(); + case INTEGER: + fileBuffer.put(TYPE_INTEGER); + return new IntSerializer(); + case LONG: + fileBuffer.put(TYPE_LONG); + return new LongSerializer(); + case STRING: + fileBuffer.put(TYPE_STRING); + return new StringSerializer(); + case FLOAT: + fileBuffer.put(TYPE_FLOAT); + return new FloatSerializer(); + case DOUBLE: + fileBuffer.put(TYPE_DOUBLE); + return new DoubleSerializer(); + case NULL: + fileBuffer.put(TYPE_NULL); + return new NullSerializer(); + case CUSTOMTYPEWRAPPER: + fileBuffer.put(((CustomTypeWrapper) value).getType()); + return new CustomTypeSerializer(); + default: + throw new IllegalArgumentException("Unknown Type encountered: " + type); + } + } + + private abstract class Serializer<T> { + protected ByteBuffer buffer; + + public Serializer(int capacity) { + buffer = ByteBuffer.allocate(capacity); + } + + public ByteBuffer serialize(T value) { + buffer.clear(); + serializeInternal(value); + buffer.flip(); + return buffer; + } + + public abstract void serializeInternal(T value); + } + + private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> { + public CustomTypeSerializer() { + super(0); + } + @Override + public void serializeInternal(CustomTypeWrapper value) { + byte[] bytes = value.getData(); + buffer = ByteBuffer.wrap(bytes); + buffer.position(bytes.length); + } + } + + private class ByteSerializer extends Serializer<Byte> { + public ByteSerializer() { + super(1); + } + + @Override + public void serializeInternal(Byte value) { + buffer.put(value); + } + } + + private class BooleanSerializer extends Serializer<Boolean> { + public BooleanSerializer() { + super(1); + } + + @Override + public void serializeInternal(Boolean value) { + buffer.put(value ? (byte) 1 : (byte) 0); + } + } + + private class CharSerializer extends Serializer<Character> { + public CharSerializer() { + super(4); + } + + @Override + public void serializeInternal(Character value) { + buffer.put((value + "").getBytes()); + } + } + + private class ShortSerializer extends Serializer<Short> { + public ShortSerializer() { + super(2); + } + + @Override + public void serializeInternal(Short value) { + buffer.putShort(value); + } + } + + private class IntSerializer extends Serializer<Integer> { + public IntSerializer() { + super(4); + } + + @Override + public void serializeInternal(Integer value) { + buffer.putInt(value); + } + } + + private class LongSerializer extends Serializer<Long> { + public LongSerializer() { + super(8); + } + + @Override + public void serializeInternal(Long value) { + buffer.putLong(value); + } + } + + private class StringSerializer extends Serializer<String> { + public StringSerializer() { + super(0); + } + + @Override + public void serializeInternal(String value) { + byte[] bytes = value.getBytes(); + buffer = ByteBuffer.allocate(bytes.length + 4); + buffer.putInt(bytes.length); + buffer.put(bytes); + } + } + + private class FloatSerializer extends Serializer<Float> { + public FloatSerializer() { + super(4); + } + + @Override + public void serializeInternal(Float value) { + buffer.putFloat(value); + } + } + + private class DoubleSerializer extends Serializer<Double> { + public DoubleSerializer() { + super(8); + } + + @Override + public void serializeInternal(Double value) { + buffer.putDouble(value); + } + } + + private class NullSerializer extends Serializer<Object> { + public NullSerializer() { + super(0); + } + + @Override + public void serializeInternal(Object value) { + } + } + + private class BytesSerializer extends Serializer<byte[]> { + public BytesSerializer() { + super(0); + } + + @Override + public void serializeInternal(byte[] value) { + buffer = ByteBuffer.allocate(4 + value.length); + buffer.putInt(value.length); + buffer.put(value); + } + } + + private class TupleSerializer extends Serializer<Tuple> { + private final Serializer[] serializer; + private final List<ByteBuffer> buffers; + + public TupleSerializer(Tuple value) throws IOException { + super(0); + serializer = new Serializer[value.getArity()]; + buffers = new ArrayList(); + for (int x = 0; x < serializer.length; x++) { + serializer[x] = getSerializer(value.getField(x)); + } + } + + @Override + public void serializeInternal(Tuple value) { + int length = 0; + for (int x = 0; x < serializer.length; x++) { + serializer[x].buffer.clear(); + serializer[x].serializeInternal(value.getField(x)); + length += serializer[x].buffer.position(); + buffers.add(serializer[x].buffer); + } + buffer = ByteBuffer.allocate(length); + for (ByteBuffer b : buffers) { + b.flip(); + buffer.put(b); + } + buffers.clear(); + } + } +}