[FLINK-3235] Remove Flink on Tez code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b8dfc16 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b8dfc16 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b8dfc16 Branch: refs/heads/master Commit: 8b8dfc1671f21a2e381376e78184c7b46adf26b0 Parents: 16cf8b1 Author: Stephan Ewen <se...@apache.org> Authored: Thu Jan 14 21:45:32 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Jan 15 11:44:20 2016 +0100 ---------------------------------------------------------------------- .travis.yml | 2 +- docs/_includes/navbar.html | 1 - docs/index.md | 1 - docs/internals/general_arch.md | 1 - docs/setup/flink_on_tez.md | 291 ---------- flink-contrib/flink-tez/pom.xml | 224 ------- .../flink-tez/src/assembly/flink-fat-jar.xml | 42 -- .../flink/tez/client/LocalTezEnvironment.java | 76 --- .../flink/tez/client/RemoteTezEnvironment.java | 83 --- .../apache/flink/tez/client/TezExecutor.java | 219 ------- .../flink/tez/client/TezExecutorTool.java | 80 --- .../flink/tez/dag/FlinkBroadcastEdge.java | 70 --- .../flink/tez/dag/FlinkDataSinkVertex.java | 61 -- .../flink/tez/dag/FlinkDataSourceVertex.java | 82 --- .../org/apache/flink/tez/dag/FlinkEdge.java | 45 -- .../apache/flink/tez/dag/FlinkForwardEdge.java | 71 --- .../flink/tez/dag/FlinkPartitionEdge.java | 71 --- .../flink/tez/dag/FlinkProcessorVertex.java | 61 -- .../apache/flink/tez/dag/FlinkUnionVertex.java | 61 -- .../org/apache/flink/tez/dag/FlinkVertex.java | 114 ---- .../apache/flink/tez/dag/TezDAGGenerator.java | 460 --------------- .../tez/examples/ConnectedComponentsStep.java | 203 ------- .../flink/tez/examples/ExampleDriver.java | 119 ---- .../flink/tez/examples/PageRankBasicStep.java | 241 -------- .../apache/flink/tez/examples/TPCHQuery3.java | 224 ------- .../examples/TransitiveClosureNaiveStep.java | 135 ----- .../apache/flink/tez/examples/WordCount.java | 129 ----- .../flink/tez/runtime/DataSinkProcessor.java | 228 -------- .../flink/tez/runtime/DataSourceProcessor.java | 190 ------ .../flink/tez/runtime/RegularProcessor.java | 138 ----- .../tez/runtime/TezRuntimeEnvironment.java | 44 -- .../org/apache/flink/tez/runtime/TezTask.java | 578 ------------------- .../apache/flink/tez/runtime/TezTaskConfig.java | 163 ------ .../flink/tez/runtime/UnionProcessor.java | 106 ---- .../flink/tez/runtime/input/FlinkInput.java | 139 ----- .../runtime/input/FlinkInputSplitGenerator.java | 94 --- .../tez/runtime/input/TezReaderIterator.java | 66 --- .../tez/runtime/output/SimplePartitioner.java | 35 -- .../tez/runtime/output/TezChannelSelector.java | 36 -- .../tez/runtime/output/TezOutputCollector.java | 72 --- .../tez/runtime/output/TezOutputEmitter.java | 190 ------ .../apache/flink/tez/util/DummyInvokable.java | 51 -- .../apache/flink/tez/util/EncodingUtils.java | 64 -- .../flink/tez/util/FlinkSerialization.java | 310 ---------- .../src/main/resources/log4j.properties | 30 - .../tez/test/ConnectedComponentsStepITCase.java | 83 --- .../flink/tez/test/PageRankBasicStepITCase.java | 54 -- .../flink/tez/test/TezProgramTestBase.java | 108 ---- .../flink/tez/test/WebLogAnalysisITCase.java | 48 -- .../apache/flink/tez/test/WordCountITCase.java | 47 -- .../src/test/resources/log4j-test.properties | 30 - .../src/test/resources/logback-test.xml | 37 -- flink-contrib/pom.xml | 11 - flink-quickstart/flink-tez-quickstart/pom.xml | 37 -- .../src/main/java/Dummy.java | 28 - .../META-INF/maven/archetype-metadata.xml | 36 -- .../main/resources/archetype-resources/pom.xml | 186 ------ .../src/assembly/flink-fat-jar.xml | 40 -- .../src/main/java/Driver.java | 113 ---- .../src/main/java/LocalJob.java | 72 --- .../src/main/java/LocalWordCount.java | 96 --- .../src/main/java/YarnJob.java | 75 --- .../src/main/java/YarnWordCount.java | 124 ---- .../projects/testArtifact/archetype.properties | 21 - .../resources/projects/testArtifact/goal.txt | 1 - flink-quickstart/pom.xml | 10 - pom.xml | 9 - 67 files changed, 1 insertion(+), 6966 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index de27987..50b5fab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -17,7 +17,7 @@ language: java matrix: include: - jdk: "oraclejdk8" - env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-tez -Pinclude-yarn-tests" + env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests" - jdk: "oraclejdk8" env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests" - jdk: "openjdk7" http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/_includes/navbar.html ---------------------------------------------------------------------- diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html index b5a04d9..362558d 100644 --- a/docs/_includes/navbar.html +++ b/docs/_includes/navbar.html @@ -64,7 +64,6 @@ under the License. <li><a href="{{ setup }}/cluster_setup.html">Cluster (Standalone)</a></li> <li><a href="{{ setup }}/yarn_setup.html">YARN</a></li> <li><a href="{{ setup }}/gce_setup.html">GCloud</a></li> - <li><a href="{{ setup }}/flink_on_tez.html">Flink on Tez <span class="badge">Beta</span></a></li> <li><a href="{{ setup }}/jobmanager_high_availability.html">JobManager High Availability</a></li> <li class="divider"></li> http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/index.md ---------------------------------------------------------------------- diff --git a/docs/index.md b/docs/index.md index 6168934..9a7809c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -54,5 +54,4 @@ This is an overview of Flink's stack. Click on any component to go to the respec <area shape="rect" coords="333,405,473,455" alt="Remote" href="apis/cluster_execution.html"> <area shape="rect" coords="478,405,638,455" alt="Embedded" href="apis/local_execution.html"> <area shape="rect" coords="643,405,765,455" alt="YARN" href="setup/yarn_setup.html"> - <area shape="rect" coords="770,405,893,455" alt="Tez" href="setup/flink_on_tez.html"> </map> http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/internals/general_arch.md ---------------------------------------------------------------------- diff --git a/docs/internals/general_arch.md b/docs/internals/general_arch.md index a81ae85..b49d9ef 100644 --- a/docs/internals/general_arch.md +++ b/docs/internals/general_arch.md @@ -77,7 +77,6 @@ You can click on the components in the figure to learn more. <area shape="rect" coords="333,405,473,455" alt="Remote" href="../apis/cluster_execution.html"> <area shape="rect" coords="478,405,638,455" alt="Embedded" href="../apis/local_execution.html"> <area shape="rect" coords="643,405,765,455" alt="YARN" href="../setup/yarn_setup.html"> - <area shape="rect" coords="770,405,893,455" alt="Tez" href="../setup/flink_on_tez.html"> </map> ## Projects and Dependencies http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/setup/flink_on_tez.md ---------------------------------------------------------------------- diff --git a/docs/setup/flink_on_tez.md b/docs/setup/flink_on_tez.md deleted file mode 100644 index daa0d7d..0000000 --- a/docs/setup/flink_on_tez.md +++ /dev/null @@ -1,291 +0,0 @@ ---- -title: "Running Flink on YARN leveraging Tez" ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<a href="#top"></a> - -You can run Flink using Tez as an execution environment. Flink on Tez -is currently included in the *flink-contrib* module, which means it is -in alpha stability. All classes are located in the *org.apache.flink.tez* -package. - -* This will be replaced by the TOC -{:toc} - -## Why Flink on Tez - -[Apache Tez](http://tez.apache.org) is a scalable data processing -platform. Tez provides an API for specifying a directed acyclic -graph (DAG), and functionality for placing the DAG vertices in YARN -containers, as well as data shuffling. In Flink's architecture, -Tez is at about the same level as Flink's network stack. While Flink's -network stack focuses heavily on low latency in order to support -pipelining, data streaming, and iterative algorithms, Tez -focuses on scalability and elastic resource usage. - -Thus, by replacing Flink's network stack with Tez, users can get scalability -and elastic resource usage in shared clusters while retaining Flink's -APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc). - -Flink programs can run almost unmodified using Tez as an execution -environment. Tez supports local execution (e.g., for debugging), and -remote execution on YARN. - - -## Local execution - -The `LocalTezEnvironment` can be used run programs using the local -mode provided by Tez. This example shows how WordCount can be run using the Tez local mode. -It is identical to a normal Flink WordCount, except that the `LocalTezEnvironment` is used. -To run in local Tez mode, you can simply run a Flink on Tez program -from your IDE (e.g., right click and run). - -{% highlight java %} -public class WordCountExample { - public static void main(String[] args) throws Exception { - final LocalTezEnvironment env = LocalTezEnvironment.create(); - - DataSet<String> text = env.fromElements( - "Who's there?", - "I think I hear them. Stand, ho! Who's there?"); - - DataSet<Tuple2<String, Integer>> wordCounts = text - .flatMap(new LineSplitter()) - .groupBy(0) - .sum(1); - - wordCounts.print(); - - env.execute("Word Count Example"); - } - - public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { - @Override - public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { - for (String word : line.split(" ")) { - out.collect(new Tuple2<String, Integer>(word, 1)); - } - } - } -} -{% endhighlight %} - -## YARN execution - -### Setup - -- Install Tez on your Hadoop 2 cluster following the instructions from the - [Apache Tez website](http://tez.apache.org/install.html). If you are able to run - the examples that ship with Tez, then Tez has been successfully installed. - -- Currently, you need to build Flink yourself to obtain Flink on Tez - (the reason is a Hadoop version compatibility: Tez releases artifacts - on Maven central with a Hadoop 2.6.0 dependency). Build Flink - using `mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X -Dtez.version=X.X.X`. - Make sure that the Hadoop version matches the version that Tez uses. - Obtain the jar file contained in the Flink distribution under - `flink-contrib/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar` - and upload it to some directory in HDFS. E.g., to upload the file - to the directory `/apps`, execute - {% highlight bash %} - $ hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps - {% endhighlight %} - -- Edit the tez-site.xml configuration file, adding an entry that points to the - location of the file. E.g., assuming that the file is in the directory `/apps/`, - add the following entry to tez-site.xml: - {% highlight xml %} -<property> - <name>tez.aux.uris</name> - <value>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar</value> -</property> - {% endhighlight %} - -- At this point, you should be able to run the pre-packaged examples, e.g., run WordCount: - {% highlight bash %} - $ hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc hdfs:/path/to/text hdfs:/path/to/output - {% endhighlight %} - - -### Packaging your program - -Application packaging is currently a bit different than in Flink standalone mode. - Flink programs that run on Tez need to be packaged in a "fat jar" - file that contain the Flink client. This jar can then be executed via the `hadoop jar` command. - An easy way to do that is to use the provided `flink-tez-quickstart` maven archetype. - Create a new project as - - {% highlight bash %} - $ mvn archetype:generate \ - -DarchetypeGroupId=org.apache.flink \ - -DarchetypeArtifactId=flink-tez-quickstart \ - -DarchetypeVersion={{site.version}} - {% endhighlight %} - - and specify the group id, artifact id, version, and package of your project. For example, - let us assume the following options: `org.myorganization`, `flink-on-tez`, `0.1`, and `org.myorganization`. - You should see the following output on your terminal: - - {% highlight bash %} - $ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-tez-quickstart - [INFO] Scanning for projects... - [INFO] - [INFO] ------------------------------------------------------------------------ - [INFO] Building Maven Stub Project (No POM) 1 - [INFO] ------------------------------------------------------------------------ - [INFO] - [INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > generate-sources @ standalone-pom >>> - [INFO] - [INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < generate-sources @ standalone-pom <<< - [INFO] - [INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom --- - [INFO] Generating project in Interactive mode - [INFO] Archetype [org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT] found in catalog local - Define value for property 'groupId': : org.myorganization - Define value for property 'artifactId': : flink-on-tez - Define value for property 'version': 1.0-SNAPSHOT: : 0.1 - Define value for property 'package': org.myorganization: : - Confirm properties configuration: - groupId: org.myorganization - artifactId: flink-on-tez - version: 0.1 - package: org.myorganization - Y: : Y - [INFO] ---------------------------------------------------------------------------- - [INFO] Using following parameters for creating project from Archetype: flink-tez-quickstart:0.9-SNAPSHOT - [INFO] ---------------------------------------------------------------------------- - [INFO] Parameter: groupId, Value: org.myorganization - [INFO] Parameter: artifactId, Value: flink-on-tez - [INFO] Parameter: version, Value: 0.1 - [INFO] Parameter: package, Value: org.myorganization - [INFO] Parameter: packageInPathFormat, Value: org/myorganization - [INFO] Parameter: package, Value: org.myorganization - [INFO] Parameter: version, Value: 0.1 - [INFO] Parameter: groupId, Value: org.myorganization - [INFO] Parameter: artifactId, Value: flink-on-tez - [INFO] project created from Archetype in dir: /Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez - [INFO] ------------------------------------------------------------------------ - [INFO] BUILD SUCCESS - [INFO] ------------------------------------------------------------------------ - [INFO] Total time: 44.130 s - [INFO] Finished at: 2015-02-26T17:59:45+01:00 - [INFO] Final Memory: 15M/309M - [INFO] ------------------------------------------------------------------------ - {% endhighlight %} - - The project contains an example called `YarnJob.java` that provides the skeleton - for a Flink-on-Tez job. Program execution is currently done using Hadoop's `ProgramDriver`, - see the `Driver.java` class for an example. Create the fat jar using - `mvn -DskipTests clean package`. The resulting jar will be located in the `target/` directory. - You can now execute a job as follows: - - {% highlight bash %} -$ mvn -DskipTests clean package -$ hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob [command-line parameters] - {% endhighlight %} - - Flink programs that run on YARN using Tez as an execution engine need to use the `RemoteTezEnvironment` and - register the class that contains the `main` method with that environment: - {% highlight java %} - public class WordCountExample { - public static void main(String[] args) throws Exception { - final RemoteTezEnvironment env = RemoteTezEnvironment.create(); - - DataSet<String> text = env.fromElements( - "Who's there?", - "I think I hear them. Stand, ho! Who's there?"); - - DataSet<Tuple2<String, Integer>> wordCounts = text - .flatMap(new LineSplitter()) - .groupBy(0) - .sum(1); - - wordCounts.print(); - - env.registerMainClass(WordCountExample.class); - env.execute("Word Count Example"); - } - - public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { - @Override - public void flatMap(String line, Collector<Tuple2<String, Integer>> out) { - for (String word : line.split(" ")) { - out.collect(new Tuple2<String, Integer>(word, 1)); - } - } - } - } - {% endhighlight %} - - -## How it works - -Flink on Tez reuses the Flink APIs, the Flink optimizer, -and the Flink local runtime, including Flink's hash table and sort implementations. Tez -replaces Flink's network stack and control plan, and is responsible for scheduling and -network shuffles. - -The figure below shows how a Flink program passes through the Flink stack and generates -a Tez DAG (instead of a JobGraph that would be created using normal Flink execution). - -<div style="text-align: center;"> -<img src="fig/flink_on_tez_translation.png" alt="Translation of a Flink program to a Tez DAG." height="600px" vspace="20px" style="text-align: center;"/> -</div> - -All local processing, including memory management, sorting, and hashing is performed by -Flink as usual. Local processing is encapsulated in Tez vertices, as seen in the figure -below. Tez vertices are connected by edges. Tez is currently based on a key-value data -model. In the current implementation, the elements that are processed by Flink operators -are wrapped inside Tez values, and the Tez key field is used to indicate the index of the target task -that the elements are destined to. - -<div style="text-align: center;"> -<img src="fig/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside Tez vertices." height="200px" vspace="20px" style="text-align: center;"/> -</div> - -## Limitations - -Currently, Flink on Tez does not support all features of the Flink API. We are working -to enable all of the missing features listed below. In the meantime, if your project depends on these features, we suggest -to use [Flink on YARN]({{site.baseurl}}/setup/yarn_setup.html) or [Flink standalone]({{site.baseurl}}/quickstart/setup_quickstart.html). - -The following features are currently missing. - -- Dedicated client: jobs need to be submitted via Hadoop's command-line client - -- Self-joins: currently binary operators that receive the same input are not supported due to - [TEZ-1190](https://issues.apache.org/jira/browse/TEZ-1190). - -- Iterative programs are currently not supported. - -- Broadcast variables are currently not supported. - -- Accummulators and counters are currently not supported. - -- Performance: The current implementation has not been heavily tested for performance, and misses several optimizations, - including task chaining. - -- Streaming API: Streaming programs will not currently compile to Tez DAGs. - -- Scala API: The current implementation has only been tested with the Java API. - - - http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/pom.xml b/flink-contrib/flink-tez/pom.xml deleted file mode 100644 index 412640a..0000000 --- a/flink-contrib/flink-tez/pom.xml +++ /dev/null @@ -1,224 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-contrib-parent</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-tez</artifactId> - <name>flink-tez</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-optimizer</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-batch</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-api</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-common</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-dag</artifactId> - <version>${tez.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.tez</groupId> - <artifactId>tez-runtime-library</artifactId> - <version>${tez.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop.version}</version> - </dependency> - - <dependency> - <groupId>commons-codec</groupId> - <artifactId>commons-codec</artifactId> - <version>1.4</version> - </dependency> - - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.4.1</version> - <configuration> - <descriptors> - <descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor> - </descriptors> - <archive> - <manifest> - <mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass> - </manifest> - </archive> - </configuration> - <executions> - <execution> - <!--<id>assemble-all</id>--> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml deleted file mode 100644 index 504761a..0000000 --- a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml +++ /dev/null @@ -1,42 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>flink-fat-jar</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <dependencySets> - <dependencySet> - <outputDirectory>/</outputDirectory> - <useProjectArtifact>true</useProjectArtifact> - <!--<excludes> - <exclude>org.apache.flink:*</exclude> - </excludes>--> - <useTransitiveFiltering>true</useTransitiveFiltering> - <unpack>true</unpack> - <scope>runtime</scope> - <excludes> - <exclude>com.google.guava:guava</exclude> - </excludes> - </dependencySet> - </dependencySets> -</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java deleted file mode 100644 index 4c091e5..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; - -public class LocalTezEnvironment extends ExecutionEnvironment { - - TezExecutor executor; - Optimizer compiler; - - private LocalTezEnvironment() { - compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration()); - executor = new TezExecutor(compiler, this.getParallelism()); - } - - public static LocalTezEnvironment create() { - return new LocalTezEnvironment(); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - TezConfiguration tezConf = new TezConfiguration(); - tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); - tezConf.set("fs.defaultFS", "file:///"); - tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); - executor.setConfiguration(tezConf); - return executor.executePlan(createProgramPlan(jobName)); - } - - @Override - public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(null, false); - return executor.getOptimizerPlanAsJSON(p); - } - - public void setAsContext() { - ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return LocalTezEnvironment.this; - } - }; - initializeContextEnvironment(factory); - } - - @Override - public void startNewSession() throws Exception { - throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java deleted file mode 100644 index 131937e..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.optimizer.DataStatistics; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.costs.DefaultCostEstimator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.ClassUtil; -import org.apache.hadoop.util.ToolRunner; - - -public class RemoteTezEnvironment extends ExecutionEnvironment { - - private static final Log LOG = LogFactory.getLog(RemoteTezEnvironment.class); - - private Optimizer compiler; - private TezExecutor executor; - private Path jarPath = null; - - - public void registerMainClass (Class mainClass) { - jarPath = new Path(ClassUtil.findContainingJar(mainClass)); - LOG.info ("Registering main class " + mainClass.getName() + " contained in " + jarPath.toString()); - } - - @Override - public JobExecutionResult execute(String jobName) throws Exception { - TezExecutorTool tool = new TezExecutorTool(executor, createProgramPlan()); - if (jarPath != null) { - tool.setJobJar(jarPath); - } - try { - int executionResult = ToolRunner.run(new Configuration(), tool, new String[]{jobName}); - } - finally { - return new JobExecutionResult(null, -1, null); - } - - } - - @Override - public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(null, false); - return executor.getOptimizerPlanAsJSON(p); - } - - public static RemoteTezEnvironment create () { - return new RemoteTezEnvironment(); - } - - public RemoteTezEnvironment() { - compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration()); - executor = new TezExecutor(compiler, getParallelism()); - } - - @Override - public void startNewSession() throws Exception { - throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java deleted file mode 100644 index 60449db..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.PlanExecutor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.tez.dag.TezDAGGenerator; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.security.TokenCache; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.hadoop.yarn.api.records.LocalResourceType; -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.client.TezClient; -import org.apache.tez.client.TezClientUtils; -import org.apache.tez.common.TezCommonUtils; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; - -import java.util.Map; -import java.util.TreeMap; - -public class TezExecutor extends PlanExecutor { - - private static final Log LOG = LogFactory.getLog(TezExecutor.class); - - private TezConfiguration tezConf; - private Optimizer compiler; - - private Path jarPath; - - private long runTime = -1; //TODO get DAG execution time from Tez - private int parallelism; - - public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int parallelism) { - this.tezConf = tezConf; - this.compiler = compiler; - this.parallelism = parallelism; - } - - public TezExecutor(Optimizer compiler, int parallelism) { - this.tezConf = null; - this.compiler = compiler; - this.parallelism = parallelism; - } - - public void setConfiguration (TezConfiguration tezConf) { - this.tezConf = tezConf; - } - - private JobExecutionResult executePlanWithConf (TezConfiguration tezConf, Plan plan) throws Exception { - - String jobName = plan.getJobName(); - - TezClient tezClient = TezClient.create(jobName, tezConf); - tezClient.start(); - try { - OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism); - TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration()); - DAG dag = dagGenerator.createDAG(optPlan); - - if (jarPath != null) { - addLocalResource(tezConf, jarPath, dag); - } - - tezClient.waitTillReady(); - LOG.info("Submitting DAG to Tez Client"); - DAGClient dagClient = tezClient.submitDAG(dag); - - LOG.info("Submitted DAG to Tez Client"); - - // monitoring - DAGStatus dagStatus = dagClient.waitForCompletion(); - - if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { - LOG.error (jobName + " failed with diagnostics: " + dagStatus.getDiagnostics()); - throw new RuntimeException(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics()); - } - LOG.info(jobName + " finished successfully"); - - return new JobExecutionResult(null, runTime, null); - - } - finally { - tezClient.stop(); - } - } - - @Override - public void start() throws Exception { - throw new IllegalStateException("Session management is not supported in the TezExecutor."); - } - - @Override - public void stop() throws Exception { - throw new IllegalStateException("Session management is not supported in the TezExecutor."); - } - - @Override - public void endSession(JobID jobID) throws Exception { - throw new IllegalStateException("Session management is not supported in the TezExecutor."); - } - - @Override - public boolean isRunning() { - return false; - } - - @Override - public JobExecutionResult executePlan(Plan plan) throws Exception { - return executePlanWithConf(tezConf, plan); - } - - private static void addLocalResource (TezConfiguration tezConf, Path jarPath, DAG dag) { - - try { - org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(tezConf); - - LOG.info("Jar path received is " + jarPath.toString()); - - String jarFile = jarPath.getName(); - - Path remoteJarPath = null; - - /* - if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == null) { - LOG.info("Tez staging directory is null, setting it."); - Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); - LOG.info("Setting Tez staging directory to " + stagingDir.toString()); - tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); - LOG.info("Set Tez staging directory to " + stagingDir.toString()); - } - Path stagingDir = new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR)); - LOG.info("Ensuring that Tez staging directory exists"); - TezClientUtils.ensureStagingDirExists(tezConf, stagingDir); - LOG.info("Tez staging directory exists and is " + stagingDir.toString()); - */ - - - Path stagingDir = TezCommonUtils.getTezBaseStagingPath(tezConf); - LOG.info("Tez staging path is " + stagingDir); - TezClientUtils.ensureStagingDirExists(tezConf, stagingDir); - LOG.info("Tez staging dir exists"); - - remoteJarPath = fs.makeQualified(new Path(stagingDir, jarFile)); - LOG.info("Copying " + jarPath.toString() + " to " + remoteJarPath.toString()); - fs.copyFromLocalFile(jarPath, remoteJarPath); - - - FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath); - Credentials credentials = new Credentials(); - TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, tezConf); - - Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>(); - LocalResource jobJar = LocalResource.newInstance( - ConverterUtils.getYarnUrlFromPath(remoteJarPath), - LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, - remoteJarStatus.getLen(), remoteJarStatus.getModificationTime()); - localResources.put(jarFile.toString(), jobJar); - - dag.addTaskLocalFiles(localResources); - - LOG.info("Added job jar as local resource."); - } - catch (Exception e) { - System.out.println(e.getMessage()); - e.printStackTrace(); - System.exit(-1); - } - } - - public void setJobJar (Path jarPath) { - this.jarPath = jarPath; - } - - - @Override - public String getOptimizerPlanAsJSON(Plan plan) throws Exception { - OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism); - PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - return jsonGen.getOptimizerPlanAsJSON(optPlan); - } - - public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException { - if (parallelism > 0 && p.getDefaultParallelism() <= 0) { - p.setDefaultParallelism(parallelism); - } - return this.compiler.compile(p); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java deleted file mode 100644 index 09289fb..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.client; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.JobExecutionResult; -import org.apache.flink.api.common.Plan; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Tool; -import org.apache.tez.dag.api.TezConfiguration; - - -public class TezExecutorTool extends Configured implements Tool { - - private static final Log LOG = LogFactory.getLog(TezExecutorTool.class); - - private TezExecutor executor; - Plan plan; - private Path jarPath = null; - - public TezExecutorTool(TezExecutor executor, Plan plan) { - this.executor = executor; - this.plan = plan; - } - - public void setJobJar (Path jarPath) { - this.jarPath = jarPath; - } - - @Override - public int run(String[] args) throws Exception { - - Configuration conf = getConf(); - - TezConfiguration tezConf; - if (conf != null) { - tezConf = new TezConfiguration(conf); - } else { - tezConf = new TezConfiguration(); - } - - UserGroupInformation.setConfiguration(tezConf); - - executor.setConfiguration(tezConf); - - try { - if (jarPath != null) { - executor.setJobJar(jarPath); - } - JobExecutionResult result = executor.executePlan(plan); - } - catch (Exception e) { - LOG.error("Job execution failed due to: " + e.getMessage()); - throw new RuntimeException(e.getMessage()); - } - return 0; - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java deleted file mode 100644 index 6597733..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.tez.util.FlinkSerialization; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; - -import java.util.HashMap; -import java.util.Map; - -public class FlinkBroadcastEdge extends FlinkEdge { - - public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { - super(source, target, typeSerializer); - } - - @Override - public Edge createEdge(TezConfiguration tezConf) { - - Map<String,String> serializerMap = new HashMap<String,String>(); - serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); - - try { - UnorderedKVEdgeConfig edgeConfig = - (UnorderedKVEdgeConfig - .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName()) - .setFromConfiguration(tezConf) - .setKeySerializationClass(WritableSerialization.class.getName(), null) - .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) - .configureInput() - .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)) - ) - .done() - .build(); - - EdgeProperty property = edgeConfig.createDefaultBroadcastEdgeProperty(); - this.cached = Edge.create(source.getVertex(), target.getVertex(), property); - return cached; - - } catch (Exception e) { - throw new CompilerException( - "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java deleted file mode 100644 index e3ddb9e..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.runtime.DataSinkProcessor; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; - -import java.io.IOException; - -public class FlinkDataSinkVertex extends FlinkVertex { - - public FlinkDataSinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { - super(taskName, parallelism, taskConfig); - } - - @Override - public Vertex createVertex(TezConfiguration conf) { - try { - this.writeInputPositionsToConfig(); - this.writeSubTasksInOutputToConfig(); - - conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); - - ProcessorDescriptor descriptor = ProcessorDescriptor.create( - DataSinkProcessor.class.getName()); - - descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - - cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); - - return cached; - } - catch (IOException e) { - throw new CompilerException( - "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java deleted file mode 100644 index 913b854..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.runtime.DataSourceProcessor; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.tez.runtime.input.FlinkInput; -import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.hadoop.security.Credentials; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.DataSourceDescriptor; -import org.apache.tez.dag.api.InputDescriptor; -import org.apache.tez.dag.api.InputInitializerDescriptor; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; - -import java.io.IOException; - -public class FlinkDataSourceVertex extends FlinkVertex { - - public FlinkDataSourceVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { - super(taskName, parallelism, taskConfig); - } - - - @Override - public Vertex createVertex (TezConfiguration conf) { - try { - this.writeInputPositionsToConfig(); - this.writeSubTasksInOutputToConfig(); - - taskConfig.setDatasourceProcessorName(this.getUniqueName()); - conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); - - ProcessorDescriptor descriptor = ProcessorDescriptor.create( - DataSourceProcessor.class.getName()); - - descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - - InputDescriptor inputDescriptor = InputDescriptor.create(FlinkInput.class.getName()); - - InputInitializerDescriptor inputInitializerDescriptor = - InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - - DataSourceDescriptor dataSourceDescriptor = DataSourceDescriptor.create( - inputDescriptor, - inputInitializerDescriptor, - new Credentials() - ); - - cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); - - cached.addDataSource("Input " + this.getUniqueName(), dataSourceDescriptor); - - return cached; - } - catch (IOException e) { - throw new CompilerException( - "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java deleted file mode 100644 index 181e675..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.TezConfiguration; - -public abstract class FlinkEdge { - - protected FlinkVertex source; - protected FlinkVertex target; - protected TypeSerializer<?> typeSerializer; - protected Edge cached; - - protected FlinkEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { - this.source = source; - this.target = target; - this.typeSerializer = typeSerializer; - } - - public abstract Edge createEdge(TezConfiguration tezConf); - - public Edge getEdge () { - return cached; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java deleted file mode 100644 index 4602e96..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.tez.util.FlinkSerialization; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; - -import java.util.HashMap; -import java.util.Map; - -public class FlinkForwardEdge extends FlinkEdge { - - public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { - super(source, target, typeSerializer); - } - - @Override - public Edge createEdge(TezConfiguration tezConf) { - - Map<String,String> serializerMap = new HashMap<String,String>(); - serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); - - try { - UnorderedKVEdgeConfig edgeConfig = - (UnorderedKVEdgeConfig - .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName()) - .setFromConfiguration(tezConf) - .setKeySerializationClass(WritableSerialization.class.getName(), null) - .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) - .configureInput() - .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString( - this.typeSerializer - ))) - .done() - .build(); - - EdgeProperty property = edgeConfig.createDefaultOneToOneEdgeProperty(); - this.cached = Edge.create(source.getVertex(), target.getVertex(), property); - return cached; - - } catch (Exception e) { - throw new CompilerException( - "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java deleted file mode 100644 index b5f8c2e..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.tez.util.FlinkSerialization; -import org.apache.flink.tez.runtime.output.SimplePartitioner; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.serializer.WritableSerialization; -import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.EdgeProperty; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; - -import java.util.HashMap; -import java.util.Map; - -public class FlinkPartitionEdge extends FlinkEdge { - - public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { - super(source, target, typeSerializer); - } - - @Override - public Edge createEdge(TezConfiguration tezConf) { - - Map<String,String> serializerMap = new HashMap<String,String>(); - serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); - - try { - UnorderedPartitionedKVEdgeConfig edgeConfig = - (UnorderedPartitionedKVEdgeConfig - .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName(), SimplePartitioner.class.getName()) - .setFromConfiguration(tezConf) - .setKeySerializationClass(WritableSerialization.class.getName(), null) - .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) - .configureInput() - .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer))) - .done() - .build(); - - - EdgeProperty property = edgeConfig.createDefaultEdgeProperty(); - this.cached = Edge.create(source.getVertex(), target.getVertex(), property); - return cached; - - } catch (Exception e) { - throw new CompilerException( - "An error occurred while creating a Tez Shuffle Edge: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java deleted file mode 100644 index 2fbba36..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.runtime.RegularProcessor; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; - -import java.io.IOException; - - -public class FlinkProcessorVertex extends FlinkVertex { - - public FlinkProcessorVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { - super(taskName, parallelism, taskConfig); - } - - @Override - public Vertex createVertex(TezConfiguration conf) { - try { - this.writeInputPositionsToConfig(); - this.writeSubTasksInOutputToConfig(); - - conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); - - ProcessorDescriptor descriptor = ProcessorDescriptor.create( - RegularProcessor.class.getName()); - - descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - - cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); - - return cached; - } catch (IOException e) { - throw new CompilerException( - "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java deleted file mode 100644 index 0cf9990..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.tez.runtime.UnionProcessor; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.ProcessorDescriptor; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; - -import java.io.IOException; - -public class FlinkUnionVertex extends FlinkVertex { - - public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { - super(taskName, parallelism, taskConfig); - } - - @Override - public Vertex createVertex(TezConfiguration conf) { - try { - this.writeInputPositionsToConfig(); - this.writeSubTasksInOutputToConfig(); - - conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); - - ProcessorDescriptor descriptor = ProcessorDescriptor.create( - UnionProcessor.class.getName()); - - descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - - cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); - - return cached; - } - catch (IOException e) { - throw new CompilerException( - "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java deleted file mode 100644 index 883acc6..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - - -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.tez.dag.api.TezConfiguration; -import org.apache.tez.dag.api.Vertex; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; - -public abstract class FlinkVertex { - - protected Vertex cached; - private String taskName; - private int parallelism; - protected TezTaskConfig taskConfig; - - // Tez-specific bookkeeping - protected String uniqueName; //Unique name in DAG - private Map<FlinkVertex,ArrayList<Integer>> inputPositions; - private ArrayList<Integer> numberOfSubTasksInOutputs; - - public TezTaskConfig getConfig() { - return taskConfig; - } - - public FlinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { - this.cached = null; - this.taskName = taskName; - this.parallelism = parallelism; - this.taskConfig = taskConfig; - this.uniqueName = taskName + UUID.randomUUID().toString(); - this.inputPositions = new HashMap<FlinkVertex, ArrayList<Integer>>(); - this.numberOfSubTasksInOutputs = new ArrayList<Integer>(); - } - - public int getParallelism () { - return parallelism; - } - - public void setParallelism (int parallelism) { - this.parallelism = parallelism; - } - - public abstract Vertex createVertex (TezConfiguration conf); - - public Vertex getVertex () { - return cached; - } - - protected String getUniqueName () { - return uniqueName; - } - - public void addInput (FlinkVertex vertex, int position) { - if (inputPositions.containsKey(vertex)) { - inputPositions.get(vertex).add(position); - } - else { - ArrayList<Integer> lst = new ArrayList<Integer>(); - lst.add(position); - inputPositions.put(vertex,lst); - } - } - - public void addNumberOfSubTasksInOutput (int subTasks, int position) { - if (numberOfSubTasksInOutputs.isEmpty()) { - numberOfSubTasksInOutputs.add(-1); - } - int currSize = numberOfSubTasksInOutputs.size(); - for (int i = currSize; i <= position; i++) { - numberOfSubTasksInOutputs.add(i, -1); - } - numberOfSubTasksInOutputs.set(position, subTasks); - } - - // Must be called before taskConfig is written to Tez configuration - protected void writeInputPositionsToConfig () { - HashMap<String,ArrayList<Integer>> toWrite = new HashMap<String, ArrayList<Integer>>(); - for (FlinkVertex v: inputPositions.keySet()) { - String name = v.getUniqueName(); - List<Integer> positions = inputPositions.get(v); - toWrite.put(name, new ArrayList<Integer>(positions)); - } - this.taskConfig.setInputPositions(toWrite); - } - - // Must be called before taskConfig is written to Tez configuration - protected void writeSubTasksInOutputToConfig () { - this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs); - } - -}