[FLINK-3235] Remove Flink on Tez code

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b8dfc16
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b8dfc16
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b8dfc16

Branch: refs/heads/master
Commit: 8b8dfc1671f21a2e381376e78184c7b46adf26b0
Parents: 16cf8b1
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Jan 14 21:45:32 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Jan 15 11:44:20 2016 +0100

----------------------------------------------------------------------
 .travis.yml                                     |   2 +-
 docs/_includes/navbar.html                      |   1 -
 docs/index.md                                   |   1 -
 docs/internals/general_arch.md                  |   1 -
 docs/setup/flink_on_tez.md                      | 291 ----------
 flink-contrib/flink-tez/pom.xml                 | 224 -------
 .../flink-tez/src/assembly/flink-fat-jar.xml    |  42 --
 .../flink/tez/client/LocalTezEnvironment.java   |  76 ---
 .../flink/tez/client/RemoteTezEnvironment.java  |  83 ---
 .../apache/flink/tez/client/TezExecutor.java    | 219 -------
 .../flink/tez/client/TezExecutorTool.java       |  80 ---
 .../flink/tez/dag/FlinkBroadcastEdge.java       |  70 ---
 .../flink/tez/dag/FlinkDataSinkVertex.java      |  61 --
 .../flink/tez/dag/FlinkDataSourceVertex.java    |  82 ---
 .../org/apache/flink/tez/dag/FlinkEdge.java     |  45 --
 .../apache/flink/tez/dag/FlinkForwardEdge.java  |  71 ---
 .../flink/tez/dag/FlinkPartitionEdge.java       |  71 ---
 .../flink/tez/dag/FlinkProcessorVertex.java     |  61 --
 .../apache/flink/tez/dag/FlinkUnionVertex.java  |  61 --
 .../org/apache/flink/tez/dag/FlinkVertex.java   | 114 ----
 .../apache/flink/tez/dag/TezDAGGenerator.java   | 460 ---------------
 .../tez/examples/ConnectedComponentsStep.java   | 203 -------
 .../flink/tez/examples/ExampleDriver.java       | 119 ----
 .../flink/tez/examples/PageRankBasicStep.java   | 241 --------
 .../apache/flink/tez/examples/TPCHQuery3.java   | 224 -------
 .../examples/TransitiveClosureNaiveStep.java    | 135 -----
 .../apache/flink/tez/examples/WordCount.java    | 129 -----
 .../flink/tez/runtime/DataSinkProcessor.java    | 228 --------
 .../flink/tez/runtime/DataSourceProcessor.java  | 190 ------
 .../flink/tez/runtime/RegularProcessor.java     | 138 -----
 .../tez/runtime/TezRuntimeEnvironment.java      |  44 --
 .../org/apache/flink/tez/runtime/TezTask.java   | 578 -------------------
 .../apache/flink/tez/runtime/TezTaskConfig.java | 163 ------
 .../flink/tez/runtime/UnionProcessor.java       | 106 ----
 .../flink/tez/runtime/input/FlinkInput.java     | 139 -----
 .../runtime/input/FlinkInputSplitGenerator.java |  94 ---
 .../tez/runtime/input/TezReaderIterator.java    |  66 ---
 .../tez/runtime/output/SimplePartitioner.java   |  35 --
 .../tez/runtime/output/TezChannelSelector.java  |  36 --
 .../tez/runtime/output/TezOutputCollector.java  |  72 ---
 .../tez/runtime/output/TezOutputEmitter.java    | 190 ------
 .../apache/flink/tez/util/DummyInvokable.java   |  51 --
 .../apache/flink/tez/util/EncodingUtils.java    |  64 --
 .../flink/tez/util/FlinkSerialization.java      | 310 ----------
 .../src/main/resources/log4j.properties         |  30 -
 .../tez/test/ConnectedComponentsStepITCase.java |  83 ---
 .../flink/tez/test/PageRankBasicStepITCase.java |  54 --
 .../flink/tez/test/TezProgramTestBase.java      | 108 ----
 .../flink/tez/test/WebLogAnalysisITCase.java    |  48 --
 .../apache/flink/tez/test/WordCountITCase.java  |  47 --
 .../src/test/resources/log4j-test.properties    |  30 -
 .../src/test/resources/logback-test.xml         |  37 --
 flink-contrib/pom.xml                           |  11 -
 flink-quickstart/flink-tez-quickstart/pom.xml   |  37 --
 .../src/main/java/Dummy.java                    |  28 -
 .../META-INF/maven/archetype-metadata.xml       |  36 --
 .../main/resources/archetype-resources/pom.xml  | 186 ------
 .../src/assembly/flink-fat-jar.xml              |  40 --
 .../src/main/java/Driver.java                   | 113 ----
 .../src/main/java/LocalJob.java                 |  72 ---
 .../src/main/java/LocalWordCount.java           |  96 ---
 .../src/main/java/YarnJob.java                  |  75 ---
 .../src/main/java/YarnWordCount.java            | 124 ----
 .../projects/testArtifact/archetype.properties  |  21 -
 .../resources/projects/testArtifact/goal.txt    |   1 -
 flink-quickstart/pom.xml                        |  10 -
 pom.xml                                         |   9 -
 67 files changed, 1 insertion(+), 6966 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index de27987..50b5fab 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -17,7 +17,7 @@ language: java
 matrix:
   include:
     - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-tez 
-Pinclude-yarn-tests"
+      env: PROFILE="-Dhadoop.version=2.7.1 -Dscala-2.11 -Pinclude-yarn-tests"
     - jdk: "oraclejdk8"
       env: PROFILE="-Dhadoop.version=2.5.0 -Pinclude-yarn-tests"
     - jdk: "openjdk7"

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index b5a04d9..362558d 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -64,7 +64,6 @@ under the License.
                 <li><a href="{{ setup }}/cluster_setup.html">Cluster 
(Standalone)</a></li>
                 <li><a href="{{ setup }}/yarn_setup.html">YARN</a></li>
                 <li><a href="{{ setup }}/gce_setup.html">GCloud</a></li>
-                <li><a href="{{ setup }}/flink_on_tez.html">Flink on Tez <span 
class="badge">Beta</span></a></li>
                 <li><a href="{{ setup 
}}/jobmanager_high_availability.html">JobManager High Availability</a></li>
 
                 <li class="divider"></li>

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/index.md
----------------------------------------------------------------------
diff --git a/docs/index.md b/docs/index.md
index 6168934..9a7809c 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -54,5 +54,4 @@ This is an overview of Flink's stack. Click on any component 
to go to the respec
   <area shape="rect" coords="333,405,473,455" alt="Remote" 
href="apis/cluster_execution.html">
   <area shape="rect" coords="478,405,638,455" alt="Embedded" 
href="apis/local_execution.html">
   <area shape="rect" coords="643,405,765,455" alt="YARN" 
href="setup/yarn_setup.html">
-  <area shape="rect" coords="770,405,893,455" alt="Tez" 
href="setup/flink_on_tez.html">
 </map>

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/internals/general_arch.md
----------------------------------------------------------------------
diff --git a/docs/internals/general_arch.md b/docs/internals/general_arch.md
index a81ae85..b49d9ef 100644
--- a/docs/internals/general_arch.md
+++ b/docs/internals/general_arch.md
@@ -77,7 +77,6 @@ You can click on the components in the figure to learn more.
   <area shape="rect" coords="333,405,473,455" alt="Remote" 
href="../apis/cluster_execution.html">
   <area shape="rect" coords="478,405,638,455" alt="Embedded" 
href="../apis/local_execution.html">
   <area shape="rect" coords="643,405,765,455" alt="YARN" 
href="../setup/yarn_setup.html">
-  <area shape="rect" coords="770,405,893,455" alt="Tez" 
href="../setup/flink_on_tez.html">
 </map>
 
 ## Projects and Dependencies

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/docs/setup/flink_on_tez.md
----------------------------------------------------------------------
diff --git a/docs/setup/flink_on_tez.md b/docs/setup/flink_on_tez.md
deleted file mode 100644
index daa0d7d..0000000
--- a/docs/setup/flink_on_tez.md
+++ /dev/null
@@ -1,291 +0,0 @@
----
-title: "Running Flink on YARN leveraging Tez"
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<a href="#top"></a>
-
-You can run Flink using Tez as an execution environment. Flink on Tez 
-is currently included in the *flink-contrib* module, which means it is 
-in alpha stability. All classes are located in the *org.apache.flink.tez* 
-package.
-
-* This will be replaced by the TOC
-{:toc}
-
-## Why Flink on Tez
-
-[Apache Tez](http://tez.apache.org) is a scalable data processing
-platform. Tez provides an API for specifying a directed acyclic
-graph (DAG), and functionality for placing the DAG vertices in YARN
-containers, as well as data shuffling.  In Flink's architecture,
-Tez is at about the same level as Flink's network stack. While Flink's
-network stack focuses heavily on low latency in order to support 
-pipelining, data streaming, and iterative algorithms, Tez
-focuses on scalability and elastic resource usage.
-
-Thus, by replacing Flink's network stack with Tez, users can get scalability
-and elastic resource usage in shared clusters while retaining Flink's 
-APIs, optimizer, and runtime algorithms (local sorts, hash tables, etc).
-
-Flink programs can run almost unmodified using Tez as an execution
-environment. Tez supports local execution (e.g., for debugging), and 
-remote execution on YARN.
-
-
-## Local execution
-
-The `LocalTezEnvironment` can be used run programs using the local
-mode provided by Tez. This example shows how WordCount can be run using the 
Tez local mode.
-It is identical to a normal Flink WordCount, except that the 
`LocalTezEnvironment` is used.
-To run in local Tez mode, you can simply run a Flink on Tez program
-from your IDE (e.g., right click and run).
-  
-{% highlight java %}
-public class WordCountExample {
-    public static void main(String[] args) throws Exception {
-        final LocalTezEnvironment env = LocalTezEnvironment.create();
-
-        DataSet<String> text = env.fromElements(
-            "Who's there?",
-            "I think I hear them. Stand, ho! Who's there?");
-
-        DataSet<Tuple2<String, Integer>> wordCounts = text
-            .flatMap(new LineSplitter())
-            .groupBy(0)
-            .sum(1);
-
-        wordCounts.print();
-
-        env.execute("Word Count Example");
-    }
-
-    public static class LineSplitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-        @Override
-        public void flatMap(String line, Collector<Tuple2<String, Integer>> 
out) {
-            for (String word : line.split(" ")) {
-                out.collect(new Tuple2<String, Integer>(word, 1));
-            }
-        }
-    }
-}
-{% endhighlight %}
-
-## YARN execution
-
-### Setup
-
-- Install Tez on your Hadoop 2 cluster following the instructions from the
-  [Apache Tez website](http://tez.apache.org/install.html). If you are able to 
run 
-  the examples that ship with Tez, then Tez has been successfully installed.
-  
-- Currently, you need to build Flink yourself to obtain Flink on Tez
-  (the reason is a Hadoop version compatibility: Tez releases artifacts
-  on Maven central with a Hadoop 2.6.0 dependency). Build Flink
-  using `mvn -DskipTests clean package -Pinclude-tez -Dhadoop.version=X.X.X 
-Dtez.version=X.X.X`.
-  Make sure that the Hadoop version matches the version that Tez uses.
-  Obtain the jar file contained in the Flink distribution under
-  `flink-contrib/flink-tez/target/flink-tez-x.y.z-flink-fat-jar.jar` 
-  and upload it to some directory in HDFS. E.g., to upload the file
-  to the directory `/apps`, execute
-  {% highlight bash %}
-  $ hadoop fs -put /path/to/flink-tez-x.y.z-flink-fat-jar.jar /apps
-  {% endhighlight %}  
- 
-- Edit the tez-site.xml configuration file, adding an entry that points to the
-  location of the file. E.g., assuming that the file is in the directory 
`/apps/`, 
-  add the following entry to tez-site.xml:
-    {% highlight xml %}
-<property>
-  <name>tez.aux.uris</name>
-  <value>${fs.default.name}/apps/flink-tez-x.y.z-flink-fat-jar.jar</value>
-</property>
-    {% endhighlight %}  
-    
-- At this point, you should be able to run the pre-packaged examples, e.g., 
run WordCount:
-  {% highlight bash %}
-  $ hadoop jar /path/to/flink-tez-x.y.z-flink-fat-jar.jar wc 
hdfs:/path/to/text hdfs:/path/to/output
-  {% endhighlight %}  
-
-
-### Packaging your program
-
-Application packaging is currently a bit different than in Flink standalone 
mode.
-  Flink programs that run on Tez need to be packaged in a "fat jar"
-  file that contain the Flink client. This jar can then be executed via the 
`hadoop jar` command.
-  An easy way to do that is to use the provided `flink-tez-quickstart` maven 
archetype.
-  Create a new project as
-  
-  {% highlight bash %}
-  $ mvn archetype:generate                             \
-    -DarchetypeGroupId=org.apache.flink              \
-    -DarchetypeArtifactId=flink-tez-quickstart           \
-    -DarchetypeVersion={{site.version}}
-  {% endhighlight %}
-  
-  and specify the group id, artifact id, version, and package of your project. 
For example,
-  let us assume the following options: `org.myorganization`, `flink-on-tez`, 
`0.1`, and `org.myorganization`.
-  You should see the following output on your terminal:
-  
-  {% highlight bash %}
-  $ mvn archetype:generate -DarchetypeGroupId=org.apache.flink 
-DarchetypeArtifactId=flink-tez-quickstart
-  [INFO] Scanning for projects...
-  [INFO]
-  [INFO] 
------------------------------------------------------------------------
-  [INFO] Building Maven Stub Project (No POM) 1
-  [INFO] 
------------------------------------------------------------------------
-  [INFO]
-  [INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) > 
generate-sources @ standalone-pom >>>
-  [INFO]
-  [INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) < 
generate-sources @ standalone-pom <<<
-  [INFO]
-  [INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ 
standalone-pom ---
-  [INFO] Generating project in Interactive mode
-  [INFO] Archetype [org.apache.flink:flink-tez-quickstart:0.9-SNAPSHOT] found 
in catalog local
-  Define value for property 'groupId': : org.myorganization
-  Define value for property 'artifactId': : flink-on-tez
-  Define value for property 'version':  1.0-SNAPSHOT: : 0.1
-  Define value for property 'package':  org.myorganization: :
-  Confirm properties configuration:
-  groupId: org.myorganization
-  artifactId: flink-on-tez
-  version: 0.1
-  package: org.myorganization
-   Y: : Y
-  [INFO] 
----------------------------------------------------------------------------
-  [INFO] Using following parameters for creating project from Archetype: 
flink-tez-quickstart:0.9-SNAPSHOT
-  [INFO] 
----------------------------------------------------------------------------
-  [INFO] Parameter: groupId, Value: org.myorganization
-  [INFO] Parameter: artifactId, Value: flink-on-tez
-  [INFO] Parameter: version, Value: 0.1
-  [INFO] Parameter: package, Value: org.myorganization
-  [INFO] Parameter: packageInPathFormat, Value: org/myorganization
-  [INFO] Parameter: package, Value: org.myorganization
-  [INFO] Parameter: version, Value: 0.1
-  [INFO] Parameter: groupId, Value: org.myorganization
-  [INFO] Parameter: artifactId, Value: flink-on-tez
-  [INFO] project created from Archetype in dir: 
/Users/kostas/Dropbox/flink-tez-quickstart-test/flink-on-tez
-  [INFO] 
------------------------------------------------------------------------
-  [INFO] BUILD SUCCESS
-  [INFO] 
------------------------------------------------------------------------
-  [INFO] Total time: 44.130 s
-  [INFO] Finished at: 2015-02-26T17:59:45+01:00
-  [INFO] Final Memory: 15M/309M
-  [INFO] 
------------------------------------------------------------------------
-  {% endhighlight %}
-  
-  The project contains an example called `YarnJob.java` that provides the 
skeleton 
-  for a Flink-on-Tez job. Program execution is currently done using Hadoop's 
`ProgramDriver`, 
-  see the `Driver.java` class for an example. Create the fat jar using 
-  `mvn -DskipTests clean package`. The resulting jar will be located in the 
`target/` directory. 
-  You can now execute a job as follows:
-  
-  {% highlight bash %}
-$ mvn -DskipTests clean package
-$ hadoop jar flink-on-tez/target/flink-on-tez-0.1-flink-fat-jar.jar yarnjob 
[command-line parameters]
-  {% endhighlight %}
-  
-  Flink programs that run on YARN using Tez as an execution engine need to use 
the `RemoteTezEnvironment` and 
-  register the class that contains the `main` method with that environment:
-  {% highlight java %}
-  public class WordCountExample {
-      public static void main(String[] args) throws Exception {
-          final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-  
-          DataSet<String> text = env.fromElements(
-              "Who's there?",
-              "I think I hear them. Stand, ho! Who's there?");
-  
-          DataSet<Tuple2<String, Integer>> wordCounts = text
-              .flatMap(new LineSplitter())
-              .groupBy(0)
-              .sum(1);
-  
-          wordCounts.print();
-      
-          env.registerMainClass(WordCountExample.class);
-          env.execute("Word Count Example");
-      }
-  
-      public static class LineSplitter implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-          @Override
-          public void flatMap(String line, Collector<Tuple2<String, Integer>> 
out) {
-              for (String word : line.split(" ")) {
-                  out.collect(new Tuple2<String, Integer>(word, 1));
-              }
-          }
-      }
-  }
-  {% endhighlight %}
-
-
-## How it works
-
-Flink on Tez reuses the Flink APIs, the Flink optimizer,
-and the Flink local runtime, including Flink's hash table and sort 
implementations. Tez
-replaces Flink's network stack and control plan, and is responsible for 
scheduling and
-network shuffles.
-
-The figure below shows how a Flink program passes through the Flink stack and 
generates
-a Tez DAG (instead of a JobGraph that would be created using normal Flink 
execution).
-
-<div style="text-align: center;">
-<img src="fig/flink_on_tez_translation.png" alt="Translation of a Flink 
program to a Tez DAG." height="600px" vspace="20px" style="text-align: 
center;"/>
-</div>
-
-All local processing, including memory management, sorting, and hashing is 
performed by
-Flink as usual. Local processing is encapsulated in Tez vertices, as seen in 
the figure
-below. Tez vertices are connected by edges. Tez is currently based on a 
key-value data
-model. In the current implementation, the elements that are processed by Flink 
operators
-are wrapped inside Tez values, and the Tez key field is used to indicate the 
index of the target task
-that the elements are destined to.
-
-<div style="text-align: center;">
-<img src="fig/flink_tez_vertex.png" alt="Encapsulation of Flink runtime inside 
Tez vertices." height="200px" vspace="20px" style="text-align: center;"/>
-</div>
-
-## Limitations
-
-Currently, Flink on Tez does not support all features of the Flink API. We are 
working
-to enable all of the missing features listed below. In the meantime, if your 
project depends on these features, we suggest
-to use [Flink on YARN]({{site.baseurl}}/setup/yarn_setup.html) or [Flink 
standalone]({{site.baseurl}}/quickstart/setup_quickstart.html).
-
-The following features are currently missing.
-
-- Dedicated client: jobs need to be submitted via Hadoop's command-line client
-
-- Self-joins: currently binary operators that receive the same input are not 
supported due to 
-  [TEZ-1190](https://issues.apache.org/jira/browse/TEZ-1190).
-
-- Iterative programs are currently not supported.
-
-- Broadcast variables are currently not supported.
-
-- Accummulators and counters are currently not supported.
-
-- Performance: The current implementation has not been heavily tested for 
performance, and misses several optimizations,
-  including task chaining.
-
-- Streaming API: Streaming programs will not currently compile to Tez DAGs.
-
-- Scala API: The current implementation has only been tested with the Java API.
-
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/pom.xml b/flink-contrib/flink-tez/pom.xml
deleted file mode 100644
index 412640a..0000000
--- a/flink-contrib/flink-tez/pom.xml
+++ /dev/null
@@ -1,224 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-
-    <parent>
-        <groupId>org.apache.flink</groupId>
-        <artifactId>flink-contrib-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
-        <relativePath>..</relativePath>
-    </parent>
-
-    <artifactId>flink-tez</artifactId>
-    <name>flink-tez</name>
-
-    <packaging>jar</packaging>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-java</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-runtime</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-optimizer</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-examples-batch</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-test-utils</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-tests</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.flink</groupId>
-                    <artifactId>${shading-artifact.name}</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-api</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-common</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-dag</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.tez</groupId>
-            <artifactId>tez-runtime-library</artifactId>
-            <version>${tez.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-client</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-common</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-        
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <version>${hadoop.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>commons-codec</groupId>
-            <artifactId>commons-codec</artifactId>
-            <version>1.4</version>
-        </dependency>
-
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>${guava.version}</version>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4.1</version>
-                <configuration>
-                    <descriptors>
-                        
<descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor>
-                    </descriptors>
-                    <archive>
-                        <manifest>
-                            
<mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass>
-                        </manifest>
-                    </archive>
-                </configuration>
-                <executions>
-                    <execution>
-                        <!--<id>assemble-all</id>-->
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml 
b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
deleted file mode 100644
index 504761a..0000000
--- a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~ http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
-       <id>flink-fat-jar</id>
-       <formats>
-               <format>jar</format>
-       </formats>
-       <includeBaseDirectory>false</includeBaseDirectory>
-       <dependencySets>
-               <dependencySet>
-                       <outputDirectory>/</outputDirectory>
-                       <useProjectArtifact>true</useProjectArtifact>
-                       <!--<excludes>
-                               <exclude>org.apache.flink:*</exclude>
-                       </excludes>-->
-                       <useTransitiveFiltering>true</useTransitiveFiltering>
-                       <unpack>true</unpack>
-                       <scope>runtime</scope>
-            <excludes>
-                <exclude>com.google.guava:guava</exclude>
-            </excludes>
-               </dependencySet>
-       </dependencySets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
deleted file mode 100644
index 4c091e5..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-
-public class LocalTezEnvironment extends ExecutionEnvironment {
-
-       TezExecutor executor;
-       Optimizer compiler;
-
-       private LocalTezEnvironment() {
-               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new Configuration());
-               executor = new TezExecutor(compiler, this.getParallelism());
-       }
-
-       public static LocalTezEnvironment create() {
-               return new LocalTezEnvironment();
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               TezConfiguration tezConf = new TezConfiguration();
-               tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
-               tezConf.set("fs.defaultFS", "file:///");
-               
tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
-               executor.setConfiguration(tezConf);
-               return executor.executePlan(createProgramPlan(jobName));
-       }
-
-       @Override
-       public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan(null, false);
-               return executor.getOptimizerPlanAsJSON(p);
-       }
-
-       public void setAsContext() {
-               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
-                       @Override
-                       public ExecutionEnvironment 
createExecutionEnvironment() {
-                               return LocalTezEnvironment.this;
-                       }
-               };
-               initializeContextEnvironment(factory);
-       }
-
-       @Override
-       public void startNewSession() throws Exception {
-               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
deleted file mode 100644
index 131937e..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.DataStatistics;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.costs.DefaultCostEstimator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.ClassUtil;
-import org.apache.hadoop.util.ToolRunner;
-
-
-public class RemoteTezEnvironment extends ExecutionEnvironment {
-
-       private static final Log LOG = 
LogFactory.getLog(RemoteTezEnvironment.class);
-       
-       private Optimizer compiler;
-       private TezExecutor executor;
-       private Path jarPath = null;
-       
-
-       public void registerMainClass (Class mainClass) {
-               jarPath = new Path(ClassUtil.findContainingJar(mainClass));
-               LOG.info ("Registering main class " + mainClass.getName() + " 
contained in " + jarPath.toString());
-       }
-
-       @Override
-       public JobExecutionResult execute(String jobName) throws Exception {
-               TezExecutorTool tool = new TezExecutorTool(executor, 
createProgramPlan());
-               if (jarPath != null) {
-                       tool.setJobJar(jarPath);
-               }
-               try {
-                       int executionResult = ToolRunner.run(new 
Configuration(), tool, new String[]{jobName});
-               }
-               finally {
-                       return new JobExecutionResult(null, -1, null);
-               }
-
-       }
-
-       @Override
-       public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan(null, false);
-               return executor.getOptimizerPlanAsJSON(p);
-       }
-
-       public static RemoteTezEnvironment create () {
-               return new RemoteTezEnvironment();
-       }
-
-       public RemoteTezEnvironment() {
-               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
-               executor = new TezExecutor(compiler, getParallelism());
-       }
-
-       @Override
-       public void startNewSession() throws Exception {
-               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
deleted file mode 100644
index 60449db..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.tez.dag.TezDAGGenerator;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TezExecutor extends PlanExecutor {
-
-       private static final Log LOG = LogFactory.getLog(TezExecutor.class);
-
-       private TezConfiguration tezConf;
-       private Optimizer compiler;
-       
-       private Path jarPath;
-
-       private long runTime = -1; //TODO get DAG execution time from Tez
-       private int parallelism;
-
-       public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int 
parallelism) {
-               this.tezConf = tezConf;
-               this.compiler = compiler;
-               this.parallelism = parallelism;
-       }
-
-       public TezExecutor(Optimizer compiler, int parallelism) {
-               this.tezConf = null;
-               this.compiler = compiler;
-               this.parallelism = parallelism;
-       }
-
-       public void setConfiguration (TezConfiguration tezConf) {
-               this.tezConf = tezConf;
-       }
-
-       private JobExecutionResult executePlanWithConf (TezConfiguration 
tezConf, Plan plan) throws Exception {
-
-               String jobName = plan.getJobName();
-
-               TezClient tezClient = TezClient.create(jobName, tezConf);
-               tezClient.start();
-               try {
-                       OptimizedPlan optPlan = getOptimizedPlan(plan, 
parallelism);
-                       TezDAGGenerator dagGenerator = new 
TezDAGGenerator(tezConf, new Configuration());
-                       DAG dag = dagGenerator.createDAG(optPlan);
-
-                       if (jarPath != null) {
-                               addLocalResource(tezConf, jarPath, dag);
-                       }
-
-                       tezClient.waitTillReady();
-                       LOG.info("Submitting DAG to Tez Client");
-                       DAGClient dagClient = tezClient.submitDAG(dag);
-
-                       LOG.info("Submitted DAG to Tez Client");
-
-                       // monitoring
-                       DAGStatus dagStatus = dagClient.waitForCompletion();
-
-                       if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-                               LOG.error (jobName + " failed with diagnostics: 
" + dagStatus.getDiagnostics());
-                               throw new RuntimeException(jobName + " failed 
with diagnostics: " + dagStatus.getDiagnostics());
-                       }
-                       LOG.info(jobName + " finished successfully");
-
-                       return new JobExecutionResult(null, runTime, null);
-
-               }
-               finally {
-                       tezClient.stop();
-               }
-       }
-
-       @Override
-       public void start() throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public void stop() throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public void endSession(JobID jobID) throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public boolean isRunning() {
-               return false;
-       }
-
-       @Override
-       public JobExecutionResult executePlan(Plan plan) throws Exception {
-               return executePlanWithConf(tezConf, plan);
-       }
-       
-       private static void addLocalResource (TezConfiguration tezConf, Path 
jarPath, DAG dag) {
-               
-               try {
-                       org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.get(tezConf);
-
-                       LOG.info("Jar path received is " + jarPath.toString());
-
-                       String jarFile = jarPath.getName();
-
-                       Path remoteJarPath = null;
-                       
-                       /*
-                       if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == 
null) {
-                               LOG.info("Tez staging directory is null, 
setting it.");
-                               Path stagingDir = new 
Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-                               LOG.info("Setting Tez staging directory to " + 
stagingDir.toString());
-                               
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-                               LOG.info("Set Tez staging directory to " + 
stagingDir.toString());
-                       }
-                       Path stagingDir = new 
Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
-                       LOG.info("Ensuring that Tez staging directory exists");
-                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
-                       LOG.info("Tez staging directory exists and is " + 
stagingDir.toString());
-                       */
-
-
-                       Path stagingDir = 
TezCommonUtils.getTezBaseStagingPath(tezConf);
-                       LOG.info("Tez staging path is " + stagingDir);
-                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
-                       LOG.info("Tez staging dir exists");
-                       
-                       remoteJarPath = fs.makeQualified(new Path(stagingDir, 
jarFile));
-                       LOG.info("Copying " + jarPath.toString() + " to " + 
remoteJarPath.toString());
-                       fs.copyFromLocalFile(jarPath, remoteJarPath);
-
-
-                       FileStatus remoteJarStatus = 
fs.getFileStatus(remoteJarPath);
-                       Credentials credentials = new Credentials();
-                       TokenCache.obtainTokensForNamenodes(credentials, new 
Path[]{remoteJarPath}, tezConf);
-
-                       Map<String, LocalResource> localResources = new 
TreeMap<String, LocalResource>();
-                       LocalResource jobJar = LocalResource.newInstance(
-                                       
ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-                                       LocalResourceType.FILE, 
LocalResourceVisibility.APPLICATION,
-                                       remoteJarStatus.getLen(), 
remoteJarStatus.getModificationTime());
-                       localResources.put(jarFile.toString(), jobJar);
-
-                       dag.addTaskLocalFiles(localResources);
-
-                       LOG.info("Added job jar as local resource.");
-               }
-               catch (Exception e) {
-                       System.out.println(e.getMessage());
-                       e.printStackTrace();
-                       System.exit(-1);
-               }
-       }
-       
-       public void setJobJar (Path jarPath) {
-               this.jarPath = jarPath;
-       }
-
-
-       @Override
-       public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-               OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
-               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON(optPlan);
-       }
-
-       public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
-               if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-                       p.setDefaultParallelism(parallelism);
-               }
-               return this.compiler.compile(p);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
deleted file mode 100644
index 09289fb..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Tool;
-import org.apache.tez.dag.api.TezConfiguration;
-
-
-public class TezExecutorTool extends Configured implements Tool {
-
-       private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
-
-       private TezExecutor executor;
-       Plan plan;
-       private Path jarPath = null;
-
-       public TezExecutorTool(TezExecutor executor, Plan plan) {
-               this.executor = executor;
-               this.plan = plan;
-       }
-
-       public void setJobJar (Path jarPath) {
-               this.jarPath = jarPath;
-       }
-
-       @Override
-       public int run(String[] args) throws Exception {
-               
-               Configuration conf = getConf();
-               
-               TezConfiguration tezConf;
-               if (conf != null) {
-                       tezConf = new TezConfiguration(conf);
-               } else {
-                       tezConf = new TezConfiguration();
-               }
-
-               UserGroupInformation.setConfiguration(tezConf);
-
-               executor.setConfiguration(tezConf);
-
-               try {
-                       if (jarPath != null) {
-                               executor.setJobJar(jarPath);
-                       }
-                       JobExecutionResult result = executor.executePlan(plan);
-               }
-               catch (Exception e) {
-                       LOG.error("Job execution failed due to: " + 
e.getMessage());
-                       throw new RuntimeException(e.getMessage());
-               }
-               return 0;
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
deleted file mode 100644
index 6597733..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkBroadcastEdge extends FlinkEdge {
-
-       public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedKVEdgeConfig edgeConfig =
-                                       (UnorderedKVEdgeConfig
-                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
-                                                       
.setFromConfiguration(tezConf)
-                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                                       .configureInput()
-                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer))
-                                                       )
-                                                       .done()
-                                                       .build();
-
-                       EdgeProperty property = 
edgeConfig.createDefaultBroadcastEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
deleted file mode 100644
index e3ddb9e..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSinkProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSinkVertex extends FlinkVertex {
-
-       public FlinkDataSinkVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       DataSinkProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
deleted file mode 100644
index 913b854..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSourceProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.runtime.input.FlinkInput;
-import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSourceVertex extends FlinkVertex {
-
-       public FlinkDataSourceVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-
-       @Override
-       public Vertex createVertex (TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       
taskConfig.setDatasourceProcessorName(this.getUniqueName());
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       DataSourceProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       InputDescriptor inputDescriptor = 
InputDescriptor.create(FlinkInput.class.getName());
-
-                       InputInitializerDescriptor inputInitializerDescriptor =
-                                       
InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       DataSourceDescriptor dataSourceDescriptor = 
DataSourceDescriptor.create(
-                                       inputDescriptor,
-                                       inputInitializerDescriptor,
-                                       new Credentials()
-                       );
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       cached.addDataSource("Input " + this.getUniqueName(), 
dataSourceDescriptor);
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
deleted file mode 100644
index 181e675..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.TezConfiguration;
-
-public abstract class FlinkEdge {
-
-       protected FlinkVertex source;
-       protected FlinkVertex target;
-       protected TypeSerializer<?> typeSerializer;
-       protected Edge cached;
-
-       protected FlinkEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               this.source = source;
-               this.target = target;
-               this.typeSerializer = typeSerializer;
-       }
-
-       public abstract Edge createEdge(TezConfiguration tezConf);
-
-       public Edge getEdge () {
-               return cached;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
deleted file mode 100644
index 4602e96..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkForwardEdge extends FlinkEdge {
-
-       public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedKVEdgeConfig edgeConfig =
-                                       (UnorderedKVEdgeConfig
-                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
-                                                       
.setFromConfiguration(tezConf)
-                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                                       .configureInput()
-                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(
-                                                                       
this.typeSerializer
-                                                       )))
-                                                       .done()
-                                                       .build();
-
-                       EdgeProperty property = 
edgeConfig.createDefaultOneToOneEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
deleted file mode 100644
index b5f8c2e..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.flink.tez.runtime.output.SimplePartitioner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkPartitionEdge extends FlinkEdge {
-
-       public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedPartitionedKVEdgeConfig edgeConfig =
-                                       (UnorderedPartitionedKVEdgeConfig
-                                               
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName(), 
SimplePartitioner.class.getName())
-                                       .setFromConfiguration(tezConf)
-                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                       .configureInput()
-                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer)))
-                                       .done()
-                                       .build();
-
-
-                       EdgeProperty property = 
edgeConfig.createDefaultEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Shuffle Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
deleted file mode 100644
index 2fbba36..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.RegularProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-
-public class FlinkProcessorVertex extends FlinkVertex {
-
-       public FlinkProcessorVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       RegularProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               } catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
deleted file mode 100644
index 0cf9990..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.UnionProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkUnionVertex extends FlinkVertex {
-
-       public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig 
taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       UnionProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
deleted file mode 100644
index 883acc6..0000000
--- 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public abstract class FlinkVertex {
-
-       protected Vertex cached;
-       private String taskName;
-       private int parallelism;
-       protected TezTaskConfig taskConfig;
-
-       // Tez-specific bookkeeping
-       protected String uniqueName; //Unique name in DAG
-       private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
-       private ArrayList<Integer> numberOfSubTasksInOutputs;
-
-       public TezTaskConfig getConfig() {
-               return taskConfig;
-       }
-
-       public FlinkVertex(String taskName, int parallelism, TezTaskConfig 
taskConfig) {
-               this.cached = null;
-               this.taskName = taskName;
-               this.parallelism = parallelism;
-               this.taskConfig = taskConfig;
-               this.uniqueName = taskName + UUID.randomUUID().toString();
-               this.inputPositions = new HashMap<FlinkVertex, 
ArrayList<Integer>>();
-               this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
-       }
-
-       public int getParallelism () {
-               return parallelism;
-       }
-
-       public void setParallelism (int parallelism) {
-               this.parallelism = parallelism;
-       }
-
-       public abstract Vertex createVertex (TezConfiguration conf);
-
-       public Vertex getVertex () {
-               return cached;
-       }
-
-       protected String getUniqueName () {
-               return uniqueName;
-       }
-
-       public void addInput (FlinkVertex vertex, int position) {
-               if (inputPositions.containsKey(vertex)) {
-                       inputPositions.get(vertex).add(position);
-               }
-               else {
-                       ArrayList<Integer> lst = new ArrayList<Integer>();
-                       lst.add(position);
-                       inputPositions.put(vertex,lst);
-               }
-       }
-
-       public void addNumberOfSubTasksInOutput (int subTasks, int position) {
-               if (numberOfSubTasksInOutputs.isEmpty()) {
-                       numberOfSubTasksInOutputs.add(-1);
-               }
-               int currSize = numberOfSubTasksInOutputs.size();
-               for (int i = currSize; i <= position; i++) {
-                       numberOfSubTasksInOutputs.add(i, -1);
-               }
-               numberOfSubTasksInOutputs.set(position, subTasks);
-       }
-
-       // Must be called before taskConfig is written to Tez configuration
-       protected void writeInputPositionsToConfig () {
-               HashMap<String,ArrayList<Integer>> toWrite = new 
HashMap<String, ArrayList<Integer>>();
-               for (FlinkVertex v: inputPositions.keySet()) {
-                       String name = v.getUniqueName();
-                       List<Integer> positions = inputPositions.get(v);
-                       toWrite.put(name, new ArrayList<Integer>(positions));
-               }
-               this.taskConfig.setInputPositions(toWrite);
-       }
-
-       // Must be called before taskConfig is written to Tez configuration
-       protected void writeSubTasksInOutputToConfig () {
-               
this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs);
-       }
-
-}

Reply via email to