http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java deleted file mode 100644 index 8b19425..0000000 --- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Streamer.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.common.streaming; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.Serializable; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.util.Iterator; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.configuration.Configuration; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX; -import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class is the basis for using an external process within a Java Flink operator. It contains logic to send and - * receive data, while taking care of synchronization. - */ -public abstract class Streamer implements Serializable { - protected static final Logger LOG = LoggerFactory.getLogger(Streamer.class); - private static final int SIGNAL_BUFFER_REQUEST = 0; - private static final int SIGNAL_BUFFER_REQUEST_G0 = -3; - private static final int SIGNAL_BUFFER_REQUEST_G1 = -4; - private static final int SIGNAL_FINISHED = -1; - private static final int SIGNAL_ERROR = -2; - private static final byte SIGNAL_LAST = 32; - - private final byte[] buffer = new byte[4]; - - protected ServerSocket server; - protected Socket socket; - protected InputStream in; - protected OutputStream out; - protected int port; - - protected Sender sender; - protected Receiver receiver; - - protected StringBuilder msg = new StringBuilder(); - - protected final AbstractRichFunction function; - - public Streamer(AbstractRichFunction function) { - this.function = function; - sender = new Sender(function); - receiver = new Receiver(function); - } - - public void open() throws IOException { - server = new ServerSocket(0); - setupProcess(); - } - - /** - * This method opens all required resources- - * - * @throws IOException - */ - public abstract void setupProcess() throws IOException; - - /** - * This method closes all previously opened resources. - * - * @throws IOException - */ - public void close() throws IOException { - socket.close(); - sender.close(); - receiver.close(); - } - - private void sendWriteNotification(int size, boolean hasNext) throws IOException { - byte[] tmp = new byte[5]; - putInt(tmp, 0, size); - tmp[4] = hasNext ? 0 : SIGNAL_LAST; - out.write(tmp, 0, 5); - out.flush(); - } - - private void sendReadConfirmation() throws IOException { - out.write(new byte[1], 0, 1); - out.flush(); - } - - private void checkForError() { - if (getInt(buffer, 0) == -2) { - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } - } - - /** - * Sends all broadcast-variables encoded in the configuration to the external process. - * - * @param config configuration object containing broadcast-variable count and names - * @throws IOException - */ - public final void sendBroadCastVariables(Configuration config) throws IOException { - try { - int broadcastCount = config.getInteger(PLANBINDER_CONFIG_BCVAR_COUNT, 0); - - String[] names = new String[broadcastCount]; - - for (int x = 0; x < names.length; x++) { - names[x] = config.getString(PLANBINDER_CONFIG_BCVAR_NAME_PREFIX + x, null); - } - - in.read(buffer, 0, 4); - checkForError(); - int size = sender.sendRecord(broadcastCount); - sendWriteNotification(size, false); - - for (String name : names) { - Iterator bcv = function.getRuntimeContext().getBroadcastVariable(name).iterator(); - - in.read(buffer, 0, 4); - checkForError(); - size = sender.sendRecord(name); - sendWriteNotification(size, false); - - while (bcv.hasNext() || sender.hasRemaining(0)) { - in.read(buffer, 0, 4); - checkForError(); - size = sender.sendBuffer(bcv, 0); - sendWriteNotification(size, bcv.hasNext() || sender.hasRemaining(0)); - } - sender.reset(); - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - /** - * Sends all values contained in the iterator to the external process and collects all results. - * - * @param i iterator - * @param c collector - * @throws IOException - */ - public final void streamBufferWithoutGroups(Iterator i, Collector c) throws IOException { - try { - int size; - if (i.hasNext()) { - while (true) { - in.read(buffer, 0, 4); - int sig = getInt(buffer, 0); - switch (sig) { - case SIGNAL_BUFFER_REQUEST: - if (i.hasNext() || sender.hasRemaining(0)) { - size = sender.sendBuffer(i, 0); - sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext()); - } else { - throw new RuntimeException("External process requested data even though none is available."); - } - break; - case SIGNAL_FINISHED: - return; - case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); - default: - receiver.collectBuffer(c, sig); - sendReadConfirmation(); - break; - } - } - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - /** - * Sends all values contained in both iterators to the external process and collects all results. - * - * @param i1 iterator - * @param i2 iterator - * @param c collector - * @throws IOException - */ - public final void streamBufferWithGroups(Iterator i1, Iterator i2, Collector c) throws IOException { - try { - int size; - if (i1.hasNext() || i2.hasNext()) { - while (true) { - in.read(buffer, 0, 4); - int sig = getInt(buffer, 0); - switch (sig) { - case SIGNAL_BUFFER_REQUEST_G0: - if (i1.hasNext() || sender.hasRemaining(0)) { - size = sender.sendBuffer(i1, 0); - sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext()); - } - break; - case SIGNAL_BUFFER_REQUEST_G1: - if (i2.hasNext() || sender.hasRemaining(1)) { - size = sender.sendBuffer(i2, 1); - sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext()); - } - break; - case SIGNAL_FINISHED: - return; - case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - throw new RuntimeException( - "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); - default: - receiver.collectBuffer(c, sig); - sendReadConfirmation(); - break; - } - } - } - } catch (SocketTimeoutException ste) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); - } - } - - protected final static int getInt(byte[] array, int offset) { - return (array[offset] << 24) | (array[offset + 1] & 0xff) << 16 | (array[offset + 2] & 0xff) << 8 | (array[offset + 3] & 0xff); - } - - protected final static void putInt(byte[] array, int offset, int value) { - array[offset] = (byte) (value >> 24); - array[offset + 1] = (byte) (value >> 16); - array[offset + 2] = (byte) (value >> 8); - array[offset + 3] = (byte) (value); - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/pom.xml b/flink-staging/flink-language-binding/flink-python/pom.xml deleted file mode 100644 index 06ef3ca..0000000 --- a/flink-staging/flink-language-binding/flink-python/pom.xml +++ /dev/null @@ -1,86 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-language-binding-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-python</artifactId> - <name>flink-python</name> - <packaging>jar</packaging> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - <archive> - <manifest> - <addClasspath>true</addClasspath> - <mainClass>org.apache.flink.languagebinding.api.java.python.PythonPlanBinder</mainClass> - </manifest> - </archive> - </configuration> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-optimizer</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-language-binding-generic</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java deleted file mode 100644 index 5aa1480..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java +++ /dev/null @@ -1,442 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.python; - -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Arrays; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.LocalEnvironment; -import org.apache.flink.api.java.operators.Keys; -import org.apache.flink.api.java.operators.CoGroupRawOperator; -import org.apache.flink.api.java.operators.SortedGrouping; -import org.apache.flink.api.java.operators.UnsortedGrouping; -import org.apache.flink.api.java.tuple.Tuple; -import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject; -import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.languagebinding.api.java.common.PlanBinder; -import org.apache.flink.languagebinding.api.java.common.OperationInfo; -import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo; -//CHECKSTYLE.OFF: AvoidStarImport - enum/function import -import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*; -import org.apache.flink.languagebinding.api.java.python.functions.*; -//CHECKSTYLE.ON: AvoidStarImport -import org.apache.flink.languagebinding.api.java.common.streaming.Receiver; -import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter; -import org.apache.flink.runtime.filecache.FileCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class allows the execution of a Flink plan written in python. - */ -public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> { - static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class); - - public static final String ARGUMENT_PYTHON_2 = "2"; - public static final String ARGUMENT_PYTHON_3 = "3"; - - public static final String FLINK_PYTHON_DC_ID = "flink"; - public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py"; - - public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2"; - public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3"; - public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); - public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); - - private static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + "/flink_plan"; - protected static final String FLINK_PYTHON_REL_LOCAL_PATH = "/resources/python"; - protected static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR"); - protected static String FULL_PATH; - - public static StringBuilder arguments = new StringBuilder(); - - private Process process; - - public static boolean usePython3 = false; - - /** - * Entry point for the execution of a python plan. - * - * @param args planPath[ package1[ packageX[ - parameter1[ parameterX]]]] - * @throws Exception - */ - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.out.println("Usage: ./bin/pyflink<2/3>.sh <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]"); - return; - } - usePython3 = args[0].equals(ARGUMENT_PYTHON_3); - PythonPlanBinder binder = new PythonPlanBinder(); - binder.runPlan(Arrays.copyOfRange(args, 1, args.length)); - } - - public PythonPlanBinder() throws IOException { - FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python"); - FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); - FULL_PATH = FLINK_DIR != null - ? FLINK_DIR + "/" + FLINK_PYTHON_REL_LOCAL_PATH //command-line - : FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing - + "/src/main/python/org/apache/flink/languagebinding/api/python"; - } - - protected void runPlan(String[] args) throws Exception { - env = ExecutionEnvironment.getExecutionEnvironment(); - - int split = 0; - for (int x = 0; x < args.length; x++) { - if (args[x].compareTo("-") == 0) { - split = x; - } - } - - try { - prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 : split)); - startPython(Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); - receivePlan(); - - if (env instanceof LocalEnvironment) { - FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + "/flink"; - } - - distributeFiles(env); - env.execute(); - close(); - } catch (Exception e) { - close(); - throw e; - } - } - - //=====Setup======================================================================================================== - /** - * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big - * package, and resolves PYTHONPATH issues. - * - * @param filePaths - * @throws IOException - * @throws URISyntaxException - */ - private void prepareFiles(String... filePaths) throws IOException, URISyntaxException { - //Flink python package - String tempFilePath = FLINK_PYTHON_FILE_PATH; - clearPath(tempFilePath); - FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false); - - //plan file - copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME); - - //additional files/folders - for (int x = 1; x < filePaths.length; x++) { - copyFile(filePaths[x], null); - } - } - - private static void clearPath(String path) throws IOException, URISyntaxException { - FileSystem fs = FileSystem.get(new URI(path)); - if (fs.exists(new Path(path))) { - fs.delete(new Path(path), true); - } - } - - private static void copyFile(String path, String name) throws IOException, URISyntaxException { - if (path.endsWith("/")) { - path = path.substring(0, path.length() - 1); - } - String identifier = name == null ? path.substring(path.lastIndexOf("/")) : name; - String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier; - clearPath(tmpFilePath); - Path p = new Path(path); - FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true); - } - - private static void distributeFiles(ExecutionEnvironment env) throws IOException, URISyntaxException { - clearPath(FLINK_HDFS_PATH); - FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new Path(FLINK_HDFS_PATH), true); - env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID); - clearPath(FLINK_PYTHON_FILE_PATH); - } - - private void startPython(String[] args) throws IOException { - for (String arg : args) { - arguments.append(" ").append(arg); - } - receiver = new Receiver(null); - receiver.open(FLINK_TMP_DATA_DIR + "/output"); - - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; - - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ex) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString()); - - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream()).start(); - - try { - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - - try { - int value = process.exitValue(); - if (value != 0) { - throw new RuntimeException("Plan file caused an error. Check log-files for details."); - } - if (value == 0) { - throw new RuntimeException("Plan file exited prematurely without an error."); - } - } catch (IllegalThreadStateException ise) {//Process still running - } - - process.getOutputStream().write("plan\n".getBytes()); - process.getOutputStream().write((FLINK_TMP_DATA_DIR + "/output\n").getBytes()); - process.getOutputStream().flush(); - } - - private void close() { - try { //prevent throwing exception so that previous exceptions aren't hidden. - if (!DEBUG) { - FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH)); - hdfs.delete(new Path(FLINK_HDFS_PATH), true); - } - - FileSystem local = FileSystem.getLocalFileSystem(); - local.delete(new Path(FLINK_PYTHON_FILE_PATH), true); - local.delete(new Path(FLINK_TMP_DATA_DIR), true); - receiver.close(); - } catch (NullPointerException npe) { - } catch (IOException ioe) { - LOG.error("PythonAPI file cleanup failed. " + ioe.getMessage()); - } catch (URISyntaxException use) { // can't occur - } - try { - process.exitValue(); - } catch (NullPointerException npe) { //exception occurred before process was started - } catch (IllegalThreadStateException ise) { //process still active - process.destroy(); - } - } - - //=====Plan Binding================================================================================================= - protected class PythonOperationInfo extends OperationInfo { - public boolean combine; - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(super.toString()); - sb.append("Combine: ").append(combine).append("\n"); - return sb.toString(); - } - - protected PythonOperationInfo(Receiver receiver, AbstractOperation identifier) throws IOException { - Object tmpType; - setID = (Integer) receiver.getRecord(true); - parentID = (Integer) receiver.getRecord(true); - switch (identifier) { - case COGROUP: - otherID = (Integer) receiver.getRecord(true); - keys1 = normalizeKeys(receiver.getRecord(true)); - keys2 = normalizeKeys(receiver.getRecord(true)); - tmpType = receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) receiver.getRecord(); - break; - case CROSS: - case CROSS_H: - case CROSS_T: - otherID = (Integer) receiver.getRecord(true); - tmpType = receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - int cProjectCount = (Integer) receiver.getRecord(true); - projections = new ProjectionEntry[cProjectCount]; - for (int x = 0; x < cProjectCount; x++) { - String side = (String) receiver.getRecord(); - int[] keys = toIntArray((Tuple) receiver.getRecord(true)); - projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys); - } - name = (String) receiver.getRecord(); - break; - case REDUCE: - case GROUPREDUCE: - tmpType = receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - combine = (Boolean) receiver.getRecord(); - name = (String) receiver.getRecord(); - break; - case JOIN: - case JOIN_H: - case JOIN_T: - keys1 = normalizeKeys(receiver.getRecord(true)); - keys2 = normalizeKeys(receiver.getRecord(true)); - otherID = (Integer) receiver.getRecord(true); - tmpType = receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - int jProjectCount = (Integer) receiver.getRecord(true); - projections = new ProjectionEntry[jProjectCount]; - for (int x = 0; x < jProjectCount; x++) { - String side = (String) receiver.getRecord(); - int[] keys = toIntArray((Tuple) receiver.getRecord(true)); - projections[x] = new ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys); - } - name = (String) receiver.getRecord(); - break; - case MAPPARTITION: - case FLATMAP: - case MAP: - case FILTER: - tmpType = receiver.getRecord(); - types = tmpType == null ? null : getForObject(tmpType); - name = (String) receiver.getRecord(); - break; - default: - throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier); - } - } - } - - @Override - protected PythonOperationInfo createOperationInfo(AbstractOperation identifier) throws IOException { - return new PythonOperationInfo(receiver, identifier); - } - - @Override - protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, PythonOperationInfo info) { - return new CoGroupRawOperator( - op1, - op2, - new Keys.ExpressionKeys(firstKeys, op1.getType()), - new Keys.ExpressionKeys(secondKeys, op2.getType()), - new PythonCoGroup(info.setID, info.types), - info.types, info.name); - } - - @Override - protected DataSet applyCrossOperation(DataSet op1, DataSet op2, DatasizeHint mode, PythonOperationInfo info) { - switch (mode) { - case NONE: - return op1.cross(op2).name("PythonCrossPreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - case HUGE: - return op1.crossWithHuge(op2).name("PythonCrossPreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - case TINY: - return op1.crossWithTiny(op2).name("PythonCrossPreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - default: - throw new IllegalArgumentException("Invalid Cross mode specified: " + mode); - } - } - - @Override - protected DataSet applyFilterOperation(DataSet op1, PythonOperationInfo info) { - return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - } - - @Override - protected DataSet applyFlatMapOperation(DataSet op1, PythonOperationInfo info) { - return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - } - - @Override - protected DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) { - if (info.combine) { - return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1)) - .setCombinable(true).name("PythonCombine") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } else { - return op1.reduceGroup(new PythonCombineIdentity()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } - } - - @Override - protected DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { - if (info.combine) { - return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1)) - .setCombinable(true).name("PythonCombine") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } else { - return op1.reduceGroup(new PythonCombineIdentity()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } - } - - @Override - protected DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) { - if (info.combine) { - return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1)) - .setCombinable(true).name("PythonCombine") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } else { - return op1.reduceGroup(new PythonCombineIdentity()) - .setCombinable(false).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } - } - - @Override - protected DataSet applyJoinOperation(DataSet op1, DataSet op2, String[] firstKeys, String[] secondKeys, DatasizeHint mode, PythonOperationInfo info) { - return createDefaultJoin(op1, op2, firstKeys, secondKeys, mode).name("PythonJoinPreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - } - - @Override - protected DataSet applyMapOperation(DataSet op1, PythonOperationInfo info) { - return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - } - - @Override - protected DataSet applyMapPartitionOperation(DataSet op1, PythonOperationInfo info) { - return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name); - } - - @Override - protected DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) { - return op1.reduceGroup(new PythonCombineIdentity()) - .setCombinable(false).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } - - @Override - protected DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { - if (info.combine) { - return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1)) - .setCombinable(true).name("PythonCombine") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } else { - return op1.reduceGroup(new PythonCombineIdentity()) - .setCombinable(false).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)) - .name(info.name); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java deleted file mode 100644 index 26d554d..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.python.functions; - -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer; -import org.apache.flink.util.Collector; -import java.io.IOException; -import org.apache.flink.api.common.functions.RichCoGroupFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -/** - * CoGroupFunction that uses a python script. - * - * @param <IN1> - * @param <IN2> - * @param <OUT> - */ -public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, OUT> implements ResultTypeQueryable { - private final PythonStreamer streamer; - private transient final TypeInformation<OUT> typeInformation; - - public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) { - this.typeInformation = typeInformation; - streamer = new PythonStreamer(this, id); - } - - /** - * Opens this function. - * - * @param config configuration - * @throws IOException - */ - @Override - public void open(Configuration config) throws IOException { - streamer.open(); - streamer.sendBroadCastVariables(config); - } - - /** - * Calls the external python function. - * - * @param first - * @param second - * @param out collector - * @throws IOException - */ - @Override - public final void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<OUT> out) throws Exception { - streamer.streamBufferWithGroups(first.iterator(), second.iterator(), out); - } - - /** - * Closes this function. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - streamer.close(); - } - - @Override - public TypeInformation<OUT> getProducedType() { - return typeInformation; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java deleted file mode 100644 index a8ff96c..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.python.functions; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer; -import org.apache.flink.util.Collector; -import java.io.IOException; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; - -/** - * Multi-purpose class, used for Combine-operations using a python script, and as a preprocess step for - * GroupReduce-operations. - * - * @param <IN> - */ -public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN> { - private PythonStreamer streamer; - - public PythonCombineIdentity() { - streamer = null; - } - - public PythonCombineIdentity(int id) { - streamer = new PythonStreamer(this, id); - } - - @Override - public void open(Configuration config) throws IOException { - if (streamer != null) { - streamer.open(); - streamer.sendBroadCastVariables(config); - } - } - - /** - * Calls the external python function. - * - * @param values function input - * @param out collector - * @throws IOException - */ - @Override - public final void reduce(Iterable<IN> values, Collector<IN> out) throws Exception { - for (IN value : values) { - out.collect(value); - } - } - - /** - * Calls the external python function. - * - * @param values function input - * @param out collector - * @throws IOException - */ - @Override - public final void combine(Iterable<IN> values, Collector<IN> out) throws Exception { - streamer.streamBufferWithoutGroups(values.iterator(), out); - } - - @Override - public void close() throws IOException { - if (streamer != null) { - streamer.close(); - streamer = null; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java deleted file mode 100644 index 1f13e5c..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.python.functions; - -import java.io.IOException; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer; -import org.apache.flink.util.Collector; - -/** - * Multi-purpose class, usable by all operations using a python script with one input source and possibly differing - * in-/output types. - * - * @param <IN> - * @param <OUT> - */ -public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OUT> implements ResultTypeQueryable { - private final PythonStreamer streamer; - private transient final TypeInformation<OUT> typeInformation; - - public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) { - this.typeInformation = typeInformation; - streamer = new PythonStreamer(this, id); - } - - /** - * Opens this function. - * - * @param config configuration - * @throws IOException - */ - @Override - public void open(Configuration config) throws IOException { - streamer.open(); - streamer.sendBroadCastVariables(config); - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception { - streamer.streamBufferWithoutGroups(values.iterator(), out); - } - - /** - * Closes this function. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - streamer.close(); - } - - @Override - public TypeInformation<OUT> getProducedType() { - return typeInformation; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java deleted file mode 100644 index 6d21c4c..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.flink.languagebinding.api.java.python.streaming; - -import java.io.IOException; -import java.io.OutputStream; -import java.lang.reflect.Field; -import org.apache.flink.api.common.functions.AbstractRichFunction; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG; -import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_DC_ID; -import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; -import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR; -import org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter; -import org.apache.flink.languagebinding.api.java.common.streaming.Streamer; -import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder; -import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; -import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; - -/** - * This streamer is used by functions to send/receive data to/from an external python process. - */ -public class PythonStreamer extends Streamer { - private Process process; - private final int id; - private final boolean usePython3; - private final boolean debug; - private Thread shutdownThread; - private final String planArguments; - - private String inputFilePath; - private String outputFilePath; - - public PythonStreamer(AbstractRichFunction function, int id) { - super(function); - this.id = id; - this.usePython3 = PythonPlanBinder.usePython3; - this.debug = DEBUG; - planArguments = PythonPlanBinder.arguments.toString(); - } - - /** - * Starts the python script. - * - * @throws IOException - */ - @Override - public void setupProcess() throws IOException { - startPython(); - } - - private void startPython() throws IOException { - this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; - this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; - - sender.open(inputFilePath); - receiver.open(outputFilePath); - - String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath(); - String planPath = path + FLINK_PYTHON_PLAN_NAME; - - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; - - try { - Runtime.getRuntime().exec(pythonBinaryPath); - } catch (IOException ex) { - throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); - } - - if (debug) { - socket.setSoTimeout(0); - LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName() - + " Run python " + planPath + planArguments); - } else { - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream(), true, msg).start(); - } - - shutdownThread = new Thread() { - @Override - public void run() { - try { - destroyProcess(); - } catch (IOException ex) { - } - } - }; - - Runtime.getRuntime().addShutdownHook(shutdownThread); - - OutputStream processOutput = process.getOutputStream(); - processOutput.write("operator\n".getBytes()); - processOutput.write(("" + server.getLocalPort() + "\n").getBytes()); - processOutput.write((id + "\n").getBytes()); - processOutput.write((inputFilePath + "\n").getBytes()); - processOutput.write((outputFilePath + "\n").getBytes()); - processOutput.flush(); - - try { // wait a bit to catch syntax errors - Thread.sleep(2000); - } catch (InterruptedException ex) { - } - if (!debug) { - try { - process.exitValue(); - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } catch (IllegalThreadStateException ise) { //process still active -> start receiving data - } - } - - socket = server.accept(); - in = socket.getInputStream(); - out = socket.getOutputStream(); - } - - /** - * Closes this streamer. - * - * @throws IOException - */ - @Override - public void close() throws IOException { - try { - super.close(); - } catch (Exception e) { - LOG.error("Exception occurred while closing Streamer. :" + e.getMessage()); - } - if (!debug) { - destroyProcess(); - } - if (shutdownThread != null) { - Runtime.getRuntime().removeShutdownHook(shutdownThread); - } - } - - private void destroyProcess() throws IOException { - try { - process.exitValue(); - } catch (IllegalThreadStateException ise) { //process still active - if (process.getClass().getName().equals("java.lang.UNIXProcess")) { - int pid; - try { - Field f = process.getClass().getDeclaredField("pid"); - f.setAccessible(true); - pid = f.getInt(process); - } catch (Throwable e) { - process.destroy(); - return; - } - String[] args = new String[]{"kill", "-9", "" + pid}; - Runtime.getRuntime().exec(args); - } else { - process.destroy(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py deleted file mode 100644 index d35bf39..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py deleted file mode 100644 index d35bf39..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py deleted file mode 100644 index bf35756..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py +++ /dev/null @@ -1,157 +0,0 @@ -# ############################################################################### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from struct import pack -import sys - -from flink.connection.Constants import Types - -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 - -if PY2: - stringtype = basestring -else: - stringtype = str - - -class Collector(object): - def __init__(self, con): - self._connection = con - self._serializer = None - - def _close(self): - self._connection.send_end_signal() - - def collect(self, value): - self._serializer = _get_serializer(self._connection.write, value) - self.collect = self._collect - self.collect(value) - - def _collect(self, value): - self._connection.write(self._serializer.serialize(value)) - - -def _get_serializer(write, value): - if isinstance(value, (list, tuple)): - write(Types.TYPE_TUPLE) - write(pack(">I", len(value))) - return TupleSerializer(write, value) - elif value is None: - write(Types.TYPE_NULL) - return NullSerializer() - elif isinstance(value, stringtype): - write(Types.TYPE_STRING) - return StringSerializer() - elif isinstance(value, bool): - write(Types.TYPE_BOOLEAN) - return BooleanSerializer() - elif isinstance(value, int) or PY2 and isinstance(value, long): - write(Types.TYPE_LONG) - return LongSerializer() - elif isinstance(value, bytearray): - write(Types.TYPE_BYTES) - return ByteArraySerializer() - elif isinstance(value, float): - write(Types.TYPE_DOUBLE) - return FloatSerializer() - else: - raise Exception("Unsupported Type encountered.") - - -class TupleSerializer(object): - def __init__(self, write, value): - self.serializer = [_get_serializer(write, field) for field in value] - - def serialize(self, value): - bits = [] - for i in range(len(value)): - bits.append(self.serializer[i].serialize(value[i])) - return b"".join(bits) - - -class BooleanSerializer(object): - def serialize(self, value): - return pack(">?", value) - - -class FloatSerializer(object): - def serialize(self, value): - return pack(">d", value) - - -class LongSerializer(object): - def serialize(self, value): - return pack(">q", value) - - -class ByteArraySerializer(object): - def serialize(self, value): - value = bytes(value) - return pack(">I", len(value)) + value - - -class StringSerializer(object): - def serialize(self, value): - value = value.encode("utf-8") - return pack(">I", len(value)) + value - - -class NullSerializer(object): - def serialize(self, value): - return b"" - - -class TypedCollector(object): - def __init__(self, con): - self._connection = con - - def collect(self, value): - if not isinstance(value, (list, tuple)): - self._send_field(value) - else: - self._connection.write(Types.TYPE_TUPLE) - meta = pack(">I", len(value)) - self._connection.write(bytes([meta[3]]) if PY3 else meta[3]) - for field in value: - self.collect(field) - - def _send_field(self, value): - if value is None: - self._connection.write(Types.TYPE_NULL) - elif isinstance(value, stringtype): - value = value.encode("utf-8") - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_STRING, size, value])) - elif isinstance(value, bytes): - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_BYTES, size, value])) - elif isinstance(value, bool): - data = pack(">?", value) - self._connection.write(b"".join([Types.TYPE_BOOLEAN, data])) - elif isinstance(value, int) or PY2 and isinstance(value, long): - data = pack(">q", value) - self._connection.write(b"".join([Types.TYPE_LONG, data])) - elif isinstance(value, float): - data = pack(">d", value) - self._connection.write(b"".join([Types.TYPE_DOUBLE, data])) - elif isinstance(value, bytearray): - value = bytes(value) - size = pack(">I", len(value)) - self._connection.write(b"".join([Types.TYPE_BYTES, size, value])) - else: - raise Exception("Unsupported Type encountered.") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py deleted file mode 100644 index 988bf25..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py +++ /dev/null @@ -1,166 +0,0 @@ -# ############################################################################### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -import mmap -import socket as SOCKET -from struct import pack, unpack -from collections import deque -import sys -PY2 = sys.version_info[0] == 2 -PY3 = sys.version_info[0] == 3 - -MAPPED_FILE_SIZE = 1024 * 1024 * 64 - -SIGNAL_REQUEST_BUFFER = b"\x00\x00\x00\x00" -SIGNAL_REQUEST_BUFFER_G0 = b"\xFF\xFF\xFF\xFD" -SIGNAL_REQUEST_BUFFER_G1 = b"\xFF\xFF\xFF\xFC" -SIGNAL_FINISHED = b"\xFF\xFF\xFF\xFF" - -if PY2: - SIGNAL_WAS_LAST = "\x20" -else: - SIGNAL_WAS_LAST = 32 - - -class OneWayBusyBufferingMappedFileConnection(object): - def __init__(self, output_path): - self._output_file = open(output_path, "rb+") - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) - - self._out = deque() - self._out_size = 0 - - self._offset_limit = MAPPED_FILE_SIZE - 1024 * 1024 * 3 - - def write(self, msg): - self._out.append(msg) - self._out_size += len(msg) - if self._out_size > self._offset_limit: - self._write_buffer() - - def _write_buffer(self): - self._file_output_buffer.seek(1, 0) - self._file_output_buffer.write(b"".join(self._out)) - self._file_output_buffer.seek(0, 0) - self._file_output_buffer.write(b'\x01') - - -class BufferingTCPMappedFileConnection(object): - def __init__(self, input_file, output_file, port): - self._input_file = open(input_file, "rb+") - self._output_file = open(output_file, "rb+") - self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ) - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) - self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) - self._socket.connect((SOCKET.gethostbyname("localhost"), port)) - - self._out = deque() - self._out_size = 0 - - self._input = b"" - self._input_offset = 0 - self._input_size = 0 - self._was_last = False - - def write(self, msg): - length = len(msg) - if length > MAPPED_FILE_SIZE: - raise Exception("Serialized object does not fit into a single buffer.") - tmp = self._out_size + length - if tmp > MAPPED_FILE_SIZE: - self._write_buffer() - self.write(msg) - else: - self._out.append(msg) - self._out_size = tmp - - def _write_buffer(self): - self._file_output_buffer.seek(0, 0) - self._file_output_buffer.write(b"".join(self._out)) - self._socket.send(pack(">i", self._out_size)) - self._out.clear() - self._out_size = 0 - self._socket.recv(1, SOCKET.MSG_WAITALL) - - def read(self, des_size, ignored=None): - if self._input_size == self._input_offset: - self._read_buffer() - old_offset = self._input_offset - self._input_offset += des_size - return self._input[old_offset:self._input_offset] - - def _read_buffer(self): - self._socket.send(SIGNAL_REQUEST_BUFFER) - self._file_input_buffer.seek(0, 0) - self._input_offset = 0 - meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) - self._input_size = unpack(">I", meta_size[:4])[0] - self._was_last = meta_size[4] == SIGNAL_WAS_LAST - self._input = self._file_input_buffer.read(self._input_size) - - def send_end_signal(self): - if self._out_size: - self._write_buffer() - self._socket.send(SIGNAL_FINISHED) - - def has_next(self, ignored=None): - return not self._was_last or not self._input_size == self._input_offset - - def reset(self): - self._was_last = False - self._input_size = 0 - self._input_offset = 0 - self._input = b"" - - -class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection): - def __init__(self, input_file, output_file, port): - super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, port) - self._input = [b"", b""] - self._input_offset = [0, 0] - self._input_size = [0, 0] - self._was_last = [False, False] - - def read(self, des_size, group): - if self._input_size[group] == self._input_offset[group]: - self._read_buffer(group) - old_offset = self._input_offset[group] - self._input_offset[group] += des_size - return self._input[group][old_offset:self._input_offset[group]] - - def _read_buffer(self, group): - if group: - self._socket.send(SIGNAL_REQUEST_BUFFER_G1) - else: - self._socket.send(SIGNAL_REQUEST_BUFFER_G0) - self._file_input_buffer.seek(0, 0) - self._input_offset[group] = 0 - meta_size = self._socket.recv(5, SOCKET.MSG_WAITALL) - self._input_size[group] = unpack(">I", meta_size[:4])[0] - self._was_last[group] = meta_size[4] == SIGNAL_WAS_LAST - self._input[group] = self._file_input_buffer.read(self._input_size[group]) - - def has_next(self, group): - return not self._was_last[group] or not self._input_size[group] == self._input_offset[group] - - def reset(self): - self._was_last = [False, False] - self._input_size = [0, 0] - self._input_offset = [0, 0] - self._input = [b"", b""] - - http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py deleted file mode 100644 index 0ca2232..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Constants.py +++ /dev/null @@ -1,31 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - - -class Types(object): - TYPE_TUPLE = b'\x0B' - TYPE_BOOLEAN = b'\x0A' - TYPE_BYTE = b'\x09' - TYPE_SHORT = b'\x08' - TYPE_INTEGER = b'\x07' - TYPE_LONG = b'\x06' - TYPE_DOUBLE = b'\x04' - TYPE_FLOAT = b'\x05' - TYPE_STRING = b'\x02' - TYPE_NULL = b'\x00' - TYPE_BYTES = b'\x01' http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py deleted file mode 100644 index fb0e26d..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Iterator.py +++ /dev/null @@ -1,327 +0,0 @@ -# ############################################################################### -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from struct import unpack -from collections import deque - -try: - import _abcoll as defIter -except: - import _collections_abc as defIter - -from flink.connection.Constants import Types - - -class ListIterator(defIter.Iterator): - def __init__(self, values): - super(ListIterator, self).__init__() - self._values = deque(values) - - def __next__(self): - return self.next() - - def next(self): - if self.has_next(): - return self._values.popleft() - else: - raise StopIteration - - def has_next(self): - return self._values - - -class GroupIterator(defIter.Iterator): - def __init__(self, iterator, keys=None): - super(GroupIterator, self).__init__() - self.iterator = iterator - self.key = None - self.keys = keys - if self.keys is None: - self._extract_keys = self._extract_keys_id - self.cur = None - self.empty = False - - def _init(self): - if self.iterator.has_next(): - self.empty = False - self.cur = self.iterator.next() - self.key = self._extract_keys(self.cur) - else: - self.empty = True - - def __next__(self): - return self.next() - - def next(self): - if self.has_next(): - tmp = self.cur - if self.iterator.has_next(): - self.cur = self.iterator.next() - if self.key != self._extract_keys(self.cur): - self.empty = True - else: - self.cur = None - self.empty = True - return tmp - else: - raise StopIteration - - def has_next(self): - if self.empty: - return False - return self.key == self._extract_keys(self.cur) - - def has_group(self): - return self.cur is not None - - def next_group(self): - self.key = self._extract_keys(self.cur) - self.empty = False - - def _extract_keys(self, x): - return [x[k] for k in self.keys] - - def _extract_keys_id(self, x): - return x - - -class CoGroupIterator(object): - NONE_REMAINED = 1 - FIRST_REMAINED = 2 - SECOND_REMAINED = 3 - FIRST_EMPTY = 4 - SECOND_EMPTY = 5 - - def __init__(self, c1, c2, k1, k2): - self.i1 = GroupIterator(c1, k1) - self.i2 = GroupIterator(c2, k2) - self.p1 = None - self.p2 = None - self.match = None - self.key = None - - def _init(self): - self.i1._init() - self.i2._init() - - def next(self): - first_empty = True - second_empty = True - - if self.match != CoGroupIterator.FIRST_EMPTY: - if self.match == CoGroupIterator.FIRST_REMAINED: - first_empty = False - else: - if self.i1.has_group(): - self.i1.next_group() - self.key = self.i1.key - first_empty = False - - if self.match != CoGroupIterator.SECOND_EMPTY: - if self.match == CoGroupIterator.SECOND_REMAINED: - second_empty = False - else: - if self.i2.has_group(): - self.i2.next_group() - second_empty = False - - if first_empty and second_empty: - return False - elif first_empty and (not second_empty): - self.p1 = DummyIterator() - self.p2 = self.i2 - self.match = CoGroupIterator.FIRST_EMPTY - return True - elif (not first_empty) and second_empty: - self.p1 = self.i1 - self.p2 = DummyIterator() - self.match = CoGroupIterator.SECOND_EMPTY - return True - else: - if self.key == self.i2.key: - self.p1 = self.i1 - self.p2 = self.i2 - self.match = CoGroupIterator.NONE_REMAINED - elif self.key < self.i2.key: - self.p1 = self.i1 - self.p2 = DummyIterator() - self.match = CoGroupIterator.SECOND_REMAINED - else: - self.p1 = DummyIterator() - self.p2 = self.i2 - self.match = CoGroupIterator.FIRST_REMAINED - return True - - -class Iterator(defIter.Iterator): - def __init__(self, con, group=0): - super(Iterator, self).__init__() - self._connection = con - self._init = True - self._group = group - self._deserializer = None - - def __next__(self): - return self.next() - - def next(self): - if self.has_next(): - if self._deserializer is None: - self._deserializer = _get_deserializer(self._group, self._connection.read) - return self._deserializer.deserialize() - else: - raise StopIteration - - def has_next(self): - return self._connection.has_next(self._group) - - def _reset(self): - self._deserializer = None - - -class DummyIterator(Iterator): - def __init__(self): - super(Iterator, self).__init__() - - def __next__(self): - raise StopIteration - - def next(self): - raise StopIteration - - def has_next(self): - return False - - -def _get_deserializer(group, read, type=None): - if type is None: - type = read(1, group) - return _get_deserializer(group, read, type) - elif type == Types.TYPE_TUPLE: - return TupleDeserializer(read, group) - elif type == Types.TYPE_BYTE: - return ByteDeserializer(read, group) - elif type == Types.TYPE_BYTES: - return ByteArrayDeserializer(read, group) - elif type == Types.TYPE_BOOLEAN: - return BooleanDeserializer(read, group) - elif type == Types.TYPE_FLOAT: - return FloatDeserializer(read, group) - elif type == Types.TYPE_DOUBLE: - return DoubleDeserializer(read, group) - elif type == Types.TYPE_INTEGER: - return IntegerDeserializer(read, group) - elif type == Types.TYPE_LONG: - return LongDeserializer(read, group) - elif type == Types.TYPE_STRING: - return StringDeserializer(read, group) - elif type == Types.TYPE_NULL: - return NullDeserializer(read, group) - - -class TupleDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - size = unpack(">I", self.read(4, self._group))[0] - self.deserializer = [_get_deserializer(self._group, self.read) for _ in range(size)] - - def deserialize(self): - return tuple([s.deserialize() for s in self.deserializer]) - - -class ByteDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">c", self.read(1, self._group))[0] - - -class ByteArrayDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - size = unpack(">i", self.read(4, self._group))[0] - return bytearray(self.read(size, self._group)) if size else bytearray(b"") - - -class BooleanDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">?", self.read(1, self._group))[0] - - -class FloatDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">f", self.read(4, self._group))[0] - - -class DoubleDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">d", self.read(8, self._group))[0] - - -class IntegerDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">i", self.read(4, self._group))[0] - - -class LongDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return unpack(">q", self.read(8, self._group))[0] - - -class StringDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - length = unpack(">i", self.read(4, self._group))[0] - return self.read(length, self._group).decode("utf-8") if length else "" - - -class NullDeserializer(object): - def __init__(self, read, group): - self.read = read - self._group = group - - def deserialize(self): - return None http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py deleted file mode 100644 index 65b48d4..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py deleted file mode 100644 index 032ef85..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery10.py +++ /dev/null @@ -1,115 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -import sys - -from datetime import datetime -from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING, FLOAT, WriteMode -from flink.functions.FilterFunction import FilterFunction -from flink.functions.ReduceFunction import ReduceFunction -from flink.functions.JoinFunction import JoinFunction -from flink.functions.MapFunction import MapFunction - -class OrderFilter(FilterFunction): - def filter(self, value): - if (datetime.strptime(value[2], "%Y-%m-%d") > datetime.strptime("1990-12-31", "%Y-%m-%d")): - return True - else: - return False - -class LineitemFilter(FilterFunction): - def filter(self, value): - if value[3] == "R": - return True - else: - return False - -class ComputeRevenue(MapFunction): - def map(self, value): - return (value[0], value[1] * (1 - value[2])) - -class SumReducer(ReduceFunction): - def reduce(self, value1, value2): - return (value1[0], value1[1] + value2[1]) - -class test(JoinFunction): - def join(self, value1, value2): - return (value1[1], value2[1]) - - -if __name__ == "__main__": - - env = get_environment() - - if len(sys.argv) != 6: - sys.exit("Usage: ./bin/pyflink.sh TPCHQuery10 <customer path> <order path> <lineitem path> <nation path> <output path>") - - customer = env \ - .read_csv(sys.argv[1], - [INT, STRING, STRING, INT, STRING, FLOAT, STRING, STRING], '\n', '|') \ - .project(0,1,2,3,5) - - order = env \ - .read_csv(sys.argv[2], - [INT, INT, STRING, FLOAT, STRING, STRING, STRING, INT, STRING], '\n', '|') \ - .project(0,1,4) \ - .filter(OrderFilter()) \ - .project(0,1) - - lineitem = env \ - .read_csv(sys.argv[3], [INT, INT, INT, INT, INT, FLOAT, FLOAT, FLOAT, - STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], '\n', '|') \ - .project(0,5,6,8) \ - .filter(LineitemFilter()) \ - .map(ComputeRevenue(), [INT, FLOAT]) - - nation = env \ - .read_csv(sys.argv[4], [INT, STRING, INT, STRING], '\n', '|') \ - .project(0,1) - - revenueByCustomer = order \ - .join_with_huge(lineitem) \ - .where(0) \ - .equal_to(0) \ - .project_first(1) \ - .project_second(1) - - revenueByCustomer = revenueByCustomer \ - .group_by(0) \ - .reduce(SumReducer()) - - customerWithNation = customer \ - .join_with_tiny(nation) \ - .where(3) \ - .equal_to(0) \ - .project_first(0,1,2) \ - .project_second(1) \ - .project_first(4) - - result = customerWithNation \ - .join(revenueByCustomer) \ - .where(0) \ - .equal_to(0) \ - .project_first(0,1,2,3,4) \ - .project_second(1) - - result.write_csv(sys.argv[5], '\n', '|', WriteMode.OVERWRITE) - - env.set_degree_of_parallelism(1) - - env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/824074aa/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py ---------------------------------------------------------------------- diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py deleted file mode 100644 index 5fafb01..0000000 --- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/example/TPCHQuery3.py +++ /dev/null @@ -1,106 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -import sys - -from datetime import datetime -from flink.plan.Environment import get_environment -from flink.plan.Constants import INT, STRING, FLOAT, WriteMode -from flink.functions.FilterFunction import FilterFunction -from flink.functions.ReduceFunction import ReduceFunction -from flink.functions.JoinFunction import JoinFunction - -class CustomerFilter(FilterFunction): - def filter(self, value): - if value[1] == "AUTOMOBILE": - return True - else: - return False - -class LineitemFilter(FilterFunction): - def filter(self, value): - if (datetime.strptime(value[3], "%Y-%m-%d") > datetime.strptime("1995-03-12", "%Y-%m-%d")): - return True - else: - return False - -class OrderFilter(FilterFunction): - def filter(self, value): - if (datetime.strptime(value[2], "%Y-%m-%d") < datetime.strptime("1995-03-12", "%Y-%m-%d")): - return True - else: - return False - -class SumReducer(ReduceFunction): - def reduce(self, value1, value2): - return (value1[0], value1[1] + value2[1], value1[2], value1[3]) - -class CustomerOrderJoin(JoinFunction): - def join(self, value1, value2): - return (value2[0], 0.0, value2[2], value2[3]) - -class CustomerOrderLineitemJoin(JoinFunction): - def join(self, value1, value2): - return (value1[0], value2[1] * (1 - value2[2]), value1[2], value1[3]) - - - -if __name__ == "__main__": - env = get_environment() - - if len(sys.argv) != 5: - sys.exit("Usage: ./bin/pyflink.sh TPCHQuery3 <lineitem path> <customer path> <order path> <output path>") - lineitem = env \ - .read_csv(sys.argv[1], [INT, INT, INT, INT, INT, FLOAT, FLOAT, FLOAT, - STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING], '\n', '|') \ - .project(0,5,6,10) \ - .filter(LineitemFilter()) - - - customer = env \ - .read_csv(sys.argv[2], - [INT, STRING, STRING, INT, STRING, FLOAT, STRING, STRING], '\n', '|') \ - .project(0,6) \ - .filter(CustomerFilter()) - - order = env \ - .read_csv(sys.argv[3], - [INT, INT, STRING, FLOAT, STRING, STRING, STRING, INT, STRING], '\n', '|') \ - .project(0,1,4,7) \ - .filter(OrderFilter()) - - customerWithOrder = customer \ - .join(order) \ - .where(0) \ - .equal_to(1) \ - .using(CustomerOrderJoin(),[INT, FLOAT, STRING, INT]) - - result = customerWithOrder \ - .join(lineitem) \ - .where(0) \ - .equal_to(0) \ - .using(CustomerOrderLineitemJoin(), [INT, FLOAT, STRING, INT]) \ - .group_by(0, 2, 3) \ - .reduce(SumReducer()) - - result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) - - env.set_degree_of_parallelism(1) - - env.execute(local=True) - -